# Create a machine learning model to classify conditions to a surfers level

## Prepare the data
For the machine learning model, we will only take one location with four parameters

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType, LongType, TimestampType
from pyspark.sql.functions import explode, col

# Initialize Spark session
spark = SparkSession.builder.appName("Read JSON").getOrCreate()
# Define the schema
schema = StructType([
    StructField("WaarnemingenLijst", ArrayType(StructType([
        StructField("Locatie", StructType([
            StructField("Code", StringType(), True)
        ]), True),
        StructField("MetingenLijst", ArrayType(StructType([
            StructField("Tijdstip", TimestampType(), True),
            StructField("Meetwaarde", StructType([
                StructField("Waarde_Numeriek", DoubleType(), True)
            ]), True)
        ]), True), True),
        StructField("AquoMetadata", StructType([
            StructField("Parameter_Wat_Omschrijving", StringType(), True),
            StructField("Grootheid", StructType([
                StructField("Code", StringType(), True)
            ]), True),
        ]), True)
    ]), True), True)
])

#Provide the list of jsons to be read
files = [
'./../Data/Raw/RWS_SPY_WINDRTG.json',
'./../Data/Raw/RWS_SPY_WINDSHD.json',
'./../Data/Raw/RWS_SPY_Hm0.json',
'./../Data/Raw/RWS_SPY_Tm02.json',
]

# Read the JSON file into a DataFrame using the schema
df = spark.read.json(    \
    path=files,  \
    schema=schema,  \
    multiLine=True \
    )

# Select and explode the nested fields to get the desired columns
df_exploded = df.select(
    explode("WaarnemingenLijst").alias("Waarnemingen")
).select(
    col("Waarnemingen.Locatie.Code").alias("location_code"),
    col("Waarnemingen.AquoMetadata.Grootheid.Code").alias("parameter_code"),
    explode("Waarnemingen.MetingenLijst").alias("Metingen")
).select(
    col("location_code"),
    col("parameter_code"),
    col("Metingen.Tijdstip").alias("date_time"),
    col("Metingen.Meetwaarde.Waarde_Numeriek").alias("measurement_value")
)

# Show the resulting DataFrame
df_exploded.show()

### Clean the data

Delete all values higher than 900 from the measurements (errors)

In [None]:
from pyspark.sql.functions import when

df_final = df_exploded.withColumn(
    "measurement_value",
    when(col("measurement_value") > 900, 0).otherwise(col("measurement_value"))
)

df_final.show()

### Create the base dataset for machine learning

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

# Create fact table with distinct date_time and location_code
dataset_0 = df_final.select(
    col("date_time"),
    col("location_code"),
).distinct().withColumn("measurement_id", monotonically_increasing_id() + 1)

# Add measurements for specific measurement types
dataset_0 = dataset_0.join(
    df_final.filter(col("parameter_code") == "WINDSHD").select(
        col("date_time"),
        col("Location_Code").alias("location_code"),
        col("measurement_value").alias("windspeed_measurement")
    ),
    on=["date_time", "location_code"],
    how="left"
).join(
    df_final.filter(col("parameter_code") == "WINDRTG").select(
        col("date_time"),
        col("Location_Code").alias("location_code"),
        col("measurement_value").alias("winddirection_measurement")
    ),
    on=["date_time", "location_code"],
    how="left"
).join(
    df_final.filter(col("parameter_code") == "Hm0").select(
        col("date_time"),
        col("Location_Code").alias("location_code"),
        col("measurement_value").alias("waveheight_measurement")
    ),
    on=["date_time", "location_code"],
    how="left"
).join(
    df_final.filter(col("parameter_code") == "Tm02").select(
        col("date_time"),
        col("Location_Code").alias("location_code"),
        col("measurement_value").alias("waveperiod_measurement")
    ),
    on=["date_time", "location_code"],
    how="left"
)
dataset_0.show()
print(f"Number of rows in dataset_0: {dataset_0.count()}")

### Label the data
Add the surf level category labels to the dataset

In [None]:
from pyspark.sql.functions import when, col
import math
import random

dataset_classified = dataset_0.withColumn(
    "category",
    when(col("waveperiod_measurement") < 4, "Choppy")
    # Onshore winds
    .when((col("winddirection_measurement") >= 90 + random.uniform(-20,20)) & (col("winddirection_measurement") <= 140) & (col("windspeed_measurement") > 90), "Danger")
    .when((col("waveheight_measurement") <= 150 + random.uniform(-20,20)) & (col("windspeed_measurement") <= 20) & (col("winddirection_measurement") <= 90) & (col("winddirection_measurement") >= 45) & (col("winddirection_measurement") <= 180 ) , "Advanced")
    .when((col("waveheight_measurement") <= 100 + random.uniform(-20,20)) & (col("windspeed_measurement") <= 20) & (col("winddirection_measurement") <= 90) & (col("winddirection_measurement") >= 45) & (col("winddirection_measurement") <= 180 ) , "Intermediate")
    .when((col("waveheight_measurement") <= 50 + random.uniform(-20,0)) & (col("windspeed_measurement") <= 20) & (col("winddirection_measurement") <= 90) & (col("winddirection_measurement") >= 45) & (col("winddirection_measurement") <= 180 ) , "Beginner")
    # Offshore winds
    .when((col("waveheight_measurement") <= 250 + random.uniform(-20,20)) & (col("windspeed_measurement") <= 20) & (col("winddirection_measurement") <= 90) & (col("winddirection_measurement") >= 225) & (col("winddirection_measurement") <= 360 ) , "Advanced")
    .when((col("waveheight_measurement") <= 200 + random.uniform(-20,20)) & (col("windspeed_measurement") <= 20) & (col("winddirection_measurement") <= 90) & (col("winddirection_measurement") >= 225) & (col("winddirection_measurement") <= 360 ) , "Intermediate")
    .when((col("waveheight_measurement") <= 150 + random.uniform(-20,0)) & (col("windspeed_measurement") <= 20) & (col("winddirection_measurement") <= 90) & (col("winddirection_measurement") >= 225) & (col("winddirection_measurement") <= 360 ) , "Beginner")
    .when(col("waveheight_measurement") <= 50 + random.uniform(-20,0), "Flat")
    # Side shore winds
    .when(col("waveheight_measurement") <= 100 + random.uniform(-20,20), "Beginner")
    .when(col("waveheight_measurement") <= 150 + random.uniform(-20,20), "Intermediate")
    .when(col("waveheight_measurement") <= 250 + random.uniform(-20,20), "Advanced")
    .when(col("waveheight_measurement") > 250 + random.uniform(-20,0), "Danger")
    .otherwise("Undefined")
)

dataset_classified.show()

### Visualize the classified dataset 

In [None]:
import matplotlib.pyplot as plt

# Convert to Pandas DataFrame
pandas_df = dataset_classified.select("winddirection_measurement", "waveheight_measurement", "category").toPandas()

# Define colors for each category
colors = {
    'Danger': 'red',
    'Advanced': 'blue',
    'Intermediate': 'green',
    'Flat': 'purple',
    'Choppy': 'orange',
    'Beginner': 'brown'
}

# Plot the scatter plot
plt.figure(figsize=(10, 6))
for category, color in colors.items():
    subset = pandas_df[pandas_df['category'] == category]
    plt.scatter(subset['winddirection_measurement'], subset['waveheight_measurement'], alpha=0.5, label=category, color=color)

plt.xlabel('Wind Direction Measurement')
plt.ylabel('Wave Height Measurement')
plt.title('Wave Height Measurement by Wind Direction and Category')
plt.legend()
plt.show()

### Clean columns
Keep only the relevant columns for the training and validation of the machinelearning model

In [None]:

dataset_classified = dataset_classified.select(
    col("windspeed_measurement"),
    col("winddirection_measurement"),
    col("waveheight_measurement"),
    col("waveperiod_measurement"),
    col("category")
)

dataset_classified.show()

### Split the dataset

The dataset is first split into a train and a test set.

In [None]:
from sklearn.model_selection import train_test_split

X = dataset_classified.select(
    col("windspeed_measurement"),
    col("winddirection_measurement"),
    col("waveheight_measurement"),
    col("waveperiod_measurement")   
).toPandas()

y = dataset_classified.select("category").toPandas()

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)
print(f"Number of rows in X_train: {X_train.count()}")
print(f"Number of rows in X_test: {X_test.count()}")
print(f"Number of rows in y_train: {y_train.count()}")
print(f"Number of rows in y_test: {y_test.count()}")


### Balance the category occurance in the dataset

In [None]:
import matplotlib.pyplot as plt

# Convert to Pandas DataFrame
pandas_df = dataset_classified.groupBy('category').count().toPandas()

# Sort the DataFrame by count in descending order
pandas_df = pandas_df.sort_values(by='count', ascending=False)

# Plot the histogram
plt.figure(figsize=(10, 6))
plt.bar(pandas_df['category'], pandas_df['count'])
plt.xlabel('Category')
plt.ylabel('Count')
plt.title('Category Distribution')
plt.xticks(rotation=45)
plt.show()

We can balance the categories in the train dataset here

In [None]:
from imblearn.over_sampling import SMOTE

smote = SMOTE(random_state=15)
X_train_balanced, y_train_balanced = smote.fit_resample(X_train, y_train)

## Train the model


In [None]:
import time
from memory_profiler import memory_usage
from sklearn.svm import SVC
from sklearn.metrics import classification_report, confusion_matrix

# Create the SVC model
svc_model = SVC(kernel='linear', decision_function_shape='ovo', )
# Fit the model
start_time = time.time()
svc_model.fit(X_train_balanced, y_train_balanced)
training_time = time.time() - start_time
# Predict the categories
start_time = time.time()
y_pred_svc_0v0 = svc_model.predict(X_test)
prediction_time = time.time() - start_time
# Print the classification report  
print("y_pred_svc_0v0 classification_report:")
print(classification_report(y_test, y_pred_svc_0v0))
# Print the confusion matrix
print("y_pred_svc_0v0 confusion matrix:")
print(confusion_matrix(y_test, y_pred_svc_0v0))


### Performance metrics

In [None]:
# Measure memory usage
mem_usage = memory_usage((svc_model.fit, (X_train, y_train)))

print(f"Training time: {training_time} seconds")
print(f"Prediction time: {prediction_time} seconds")
print(f"Memory usage: {max(mem_usage) - min(mem_usage)} MiB")