## Extract JSONS to SparkPy Dataframes

In this script, the JSON is exploded into a dataframe. We provide a schema beforehand such that it knows how we want to structure our data. Very important is that we have a JSON with multiple lines. So in the spark.read.json function, we provide the schema, and the multiline=true parameters.

In [18]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType, LongType
from pyspark.sql.functions import explode, col
import os

jars_dir = "C:\sqljdbc_12.8\enu\jars"
jdbc_driver_path = [os.path.join(jars_dir, i) for i in os.listdir(jars_dir)]
jdbc_driver_path = ",".join(jdbc_driver_path)

# Update Spark configuration
conf = SparkConf().setMaster("local[*]") \
                  .set("spark.sql.debug.maxToStringFields", 1000) \
                  .set("spark.executor.heartbeatInterval", "200000") \
                  .set("spark.network.timeout", "300000") \
                  .set("spark.sql.execution.arrow.pyspark.enabled", "true") \
                  .set("spark.jars", jdbc_driver_path) \
                  .set("spark.ui.port", "4040") \
                  .set("spark.driver.cores", "8") \
                  .set("spark.executor.cores", "8") \
                  .set("spark.driver.memory", "16g")  \
                  .set("spark.executor.memory", "16g")  \
                  .set("spark.executor.instances", "2") \
                  .setAppName("PYSPARK_MSSQL_TUTORIAL")

spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")

# Define the schema
schema = StructType([
    StructField("WaarnemingenLijst", ArrayType(StructType([
        StructField("Locatie", StructType([
            StructField("Locatie_MessageID", LongType(), True),
            StructField("Coordinatenstelsel", StringType(), True),
            StructField("X", DoubleType(), True),
            StructField("Y", DoubleType(), True),
            StructField("Naam", StringType(), True),
            StructField("Code", StringType(), True)
        ]), True),
        StructField("MetingenLijst", ArrayType(StructType([
            StructField("Tijdstip", StringType(), True),
            StructField("Meetwaarde", StructType([
                StructField("Waarde_Numeriek", DoubleType(), True)
            ]), True)
        ]), True), True),
        StructField("AquoMetadata", StructType([
            StructField("Parameter_Wat_Omschrijving", StringType(), True),
            StructField("Compartiment", StructType([
                StructField("Code", StringType(), True)
            ]), True),
            StructField("Grootheid", StructType([
                StructField("Code", StringType(), True)
            ]), True),
            StructField("Eenheid", StructType([
                StructField("Code", StringType(), True)
            ]), True)
        ]), True)
    ]), True), True)
])
# Read the JSON file into a DataFrame using the schema
df = spark.read.json(    \
    path="c:/Users/Admin/Documents/Programming projects/GolfconditiesApp/Data/Raw/RWS_J6_Hm0.json",  \
    schema=schema,  \
    multiLine=True \
    )

# Select and explode the nested fields to get the desired columns
df_exploded = df.select(
    explode("WaarnemingenLijst").alias("Waarnemingen")
).select(
    col("Waarnemingen.Locatie.X").alias("X_coordinate"),
    col("Waarnemingen.Locatie.Y").alias("Y_coordinate"),
    col("Waarnemingen.Locatie.Naam").alias("Name"),
    col("Waarnemingen.Locatie.Code").alias("Location_Code"),
    col("Waarnemingen.AquoMetadata.Parameter_Wat_Omschrijving").alias("description"),
    col("Waarnemingen.AquoMetadata.Compartiment.Code").alias("compartiment_code"),
    col("Waarnemingen.AquoMetadata.Grootheid.Code").alias("parameter_code"),
    col("Waarnemingen.AquoMetadata.Eenheid.Code").alias("unit_code"),
    explode("Waarnemingen.MetingenLijst").alias("Metingen")
).select(
    col("Location_Code"),
    col("X_coordinate"),
    col("Y_coordinate"),
    col("Name"),
    col("description"),
    col("compartiment_code"),
    col("parameter_code"),
    col("unit_code"),
    col("Metingen.Tijdstip").alias("date_time"),
    col("Metingen.Meetwaarde.Waarde_Numeriek").alias("measurement_value")
)

# Show the resulting DataFrame
df_exploded.show()

+-------------+----------------+----------------+-----------+--------------------+-----------------+--------------+---------+--------------------+-----------------+
|Location_Code|    X_coordinate|    Y_coordinate|       Name|         description|compartiment_code|parameter_code|unit_code|           date_time|measurement_value|
+-------------+----------------+----------------+-----------+--------------------+-----------------+--------------+---------+--------------------+-----------------+
|           J6|496708.794769268|5963121.52220118|J6 platform|Significante golf...|               OW|           Hm0|       cm|2025-01-22T06:00:...|             77.0|
|           J6|496708.794769268|5963121.52220118|J6 platform|Significante golf...|               OW|           Hm0|       cm|2025-01-22T06:10:...|             74.0|
|           J6|496708.794769268|5963121.52220118|J6 platform|Significante golf...|               OW|           Hm0|       cm|2025-01-22T06:20:...|             66.0|
|         

## Loop through all json files available

Now that we know how to handle one json, we can try to get all the available jsons in one go.

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType, LongType, TimestampType
from pyspark.sql.functions import explode, col

# Initialize Spark session
spark = SparkSession.builder.appName("Read JSON").getOrCreate()
# Define the schema
schema = StructType([
    StructField("WaarnemingenLijst", ArrayType(StructType([
        StructField("Locatie", StructType([
            StructField("Locatie_MessageID", LongType(), True),
            StructField("Coordinatenstelsel", StringType(), True),
            StructField("X", DoubleType(), True),
            StructField("Y", DoubleType(), True),
            StructField("Naam", StringType(), True),
            StructField("Code", StringType(), True)
        ]), True),
        StructField("MetingenLijst", ArrayType(StructType([
            StructField("Tijdstip", TimestampType(), True),
            StructField("Meetwaarde", StructType([
                StructField("Waarde_Numeriek", DoubleType(), True)
            ]), True)
        ]), True), True),
        StructField("AquoMetadata", StructType([
            StructField("Parameter_Wat_Omschrijving", StringType(), True),
            StructField("Compartiment", StructType([
                StructField("Code", StringType(), True)
            ]), True),
            StructField("Grootheid", StructType([
                StructField("Code", StringType(), True)
            ]), True),
            StructField("Eenheid", StructType([
                StructField("Code", StringType(), True)
            ]), True)
        ]), True)
    ]), True), True)
])

#Provide the list of jsons to be read
files = [
'./../Data/Raw/RWS_J6_WINDRTG.json',
'./../Data/Raw/RWS_J6_WINDSHD.json',
'./../Data/Raw/RWS_J6_Hm0.json',
'./../Data/Raw/RWS_J6_Tm02.json',
'./../Data/Raw/RWS_SPY_WINDRTG.json',
'./../Data/Raw/RWS_SPY_WINDSHD.json',
'./../Data/Raw/RWS_SPY_Hm0.json',
'./../Data/Raw/RWS_SPY_Tm02.json',
'./../Data/Raw/RWS_K14_WINDRTG.json',
'./../Data/Raw/RWS_K14_WINDSHD.json',
'./../Data/Raw/RWS_K14_Hm0.json',
'./../Data/Raw/RWS_K14_Tm02.json',
'./../Data/Raw/RWS_BG2_WINDRTG.json',
'./../Data/Raw/RWS_BG2_WINDSHD.json',
'./../Data/Raw/RWS_BG2_Hm0.json',
'./../Data/Raw/RWS_BG2_Tm02.json', ]

# Read the JSON file into a DataFrame using the schema
df = spark.read.json(    \
    path=files,  \
    schema=schema,  \
    multiLine=True \
    )

# Select and explode the nested fields to get the desired columns
df_exploded = df.select(
    explode("WaarnemingenLijst").alias("Waarnemingen")
).select(
    col("Waarnemingen.Locatie.X").alias("X_coordinate"),
    col("Waarnemingen.Locatie.Y").alias("Y_coordinate"),
    col("Waarnemingen.Locatie.Naam").alias("name"),
    col("Waarnemingen.Locatie.Code").alias("location_code"),
    col("Waarnemingen.AquoMetadata.Parameter_Wat_Omschrijving").alias("description"),
    col("Waarnemingen.AquoMetadata.Compartiment.Code").alias("compartiment_code"),
    col("Waarnemingen.AquoMetadata.Grootheid.Code").alias("parameter_code"),
    col("Waarnemingen.AquoMetadata.Eenheid.Code").alias("unit_code"),
    explode("Waarnemingen.MetingenLijst").alias("Metingen")
).select(
    col("location_code"),
    col("X_coordinate"),
    col("Y_coordinate"),
    col("name"),
    col("description"),
    col("compartiment_code"),
    col("parameter_code"),
    col("unit_code"),
    col("Metingen.Tijdstip").alias("date_time"),
    col("Metingen.Meetwaarde.Waarde_Numeriek").alias("measurement_value")
)

# Show the resulting DataFrame
df_exploded.show()

+-------------+----------------+----------------+-----------+--------------------+-----------------+--------------+---------+-------------------+-----------------+
|location_code|    X_coordinate|    Y_coordinate|       name|         description|compartiment_code|parameter_code|unit_code|          date_time|measurement_value|
+-------------+----------------+----------------+-----------+--------------------+-----------------+--------------+---------+-------------------+-----------------+
|           J6|496708.794769268|5963121.52220118|J6 platform|Significante golf...|               OW|           Hm0|       cm|2025-01-22 06:00:00|             77.0|
|           J6|496708.794769268|5963121.52220118|J6 platform|Significante golf...|               OW|           Hm0|       cm|2025-01-22 06:10:00|             74.0|
|           J6|496708.794769268|5963121.52220118|J6 platform|Significante golf...|               OW|           Hm0|       cm|2025-01-22 06:20:00|             66.0|
|           J6|4

## Check if each date has a measurement for all 

In [20]:
from pyspark.sql.functions import count

df_count = df_exploded.groupBy("date_time").agg(count("*").alias("count"))
df_count.select("count").distinct().show()
df_count.show()


+-----+
|count|
+-----+
|   16|
+-----+

+-------------------+-----+
|          date_time|count|
+-------------------+-----+
|2025-01-26 08:00:00|   16|
|2025-01-16 07:10:00|   16|
|2025-01-16 07:30:00|   16|
|2025-01-18 01:40:00|   16|
|2025-01-19 23:30:00|   16|
|2025-01-21 03:00:00|   16|
|2025-01-21 14:20:00|   16|
|2025-01-21 19:50:00|   16|
|2025-01-08 09:30:00|   16|
|2025-01-08 20:10:00|   16|
|2025-01-11 13:00:00|   16|
|2025-01-13 04:10:00|   16|
|2025-01-14 19:00:00|   16|
|2025-01-02 02:20:00|   16|
|2025-01-02 21:20:00|   16|
|2025-01-02 21:30:00|   16|
|2025-01-04 03:20:00|   16|
|2025-01-07 10:50:00|   16|
|2025-01-08 04:40:00|   16|
|2025-01-08 07:30:00|   16|
+-------------------+-----+
only showing top 20 rows



### Convert to longitude and latitue

In [21]:
from pyproj import Transformer
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Create coordinate transformer
transformer = Transformer.from_crs("EPSG:25831", "EPSG:4326")

# Define UDFs to convert coordinates
def convert_x_to_lon(x, y):
    return transformer.transform(x, y)[0]

def convert_y_to_lat(x, y):
    return transformer.transform(x, y)[1]

convert_x_to_lon_udf = udf(lambda x, y: convert_x_to_lon(x, y), DoubleType())
convert_y_to_lat_udf = udf(lambda x, y: convert_y_to_lat(x, y), DoubleType())

# Create new columns for converted coordinates
df_converted = df_exploded.withColumn("Longitude", convert_x_to_lon_udf(df_exploded["X_coordinate"], df_exploded["Y_coordinate"])) \
                          .withColumn("Latitude", convert_y_to_lat_udf(df_exploded["X_coordinate"], df_exploded["Y_coordinate"]))

# Drop old columns
df_converted = df_converted.drop("X_coordinate", "Y_coordinate")

# Rename new columns to original names
df_converted = df_converted.withColumnRenamed("Longitude", "Y_coordinate") \
                           .withColumnRenamed("Latitude", "X_coordinate")

# Show the result
df_converted.show()

+-------------+-----------+--------------------+-----------------+--------------+---------+-------------------+-----------------+-----------------+-----------------+
|location_code|       name|         description|compartiment_code|parameter_code|unit_code|          date_time|measurement_value|     Y_coordinate|     X_coordinate|
+-------------+-----------+--------------------+-----------------+--------------+---------+-------------------+-----------------+-----------------+-----------------+
|           J6|J6 platform|Significante golf...|               OW|           Hm0|       cm|2025-01-22 06:00:00|             77.0|53.81663199999994|2.950009999999986|
|           J6|J6 platform|Significante golf...|               OW|           Hm0|       cm|2025-01-22 06:10:00|             74.0|53.81663199999994|2.950009999999986|
|           J6|J6 platform|Significante golf...|               OW|           Hm0|       cm|2025-01-22 06:20:00|             66.0|53.81663199999994|2.950009999999986|
|   

## Delete all values higher than 900 from the measurements (errors)

In [22]:
from pyspark.sql.functions import when

df_converted = df_converted.withColumn(
    "measurement_value",
    when(col("measurement_value") > 900, 0).otherwise(col("measurement_value"))
)

df_converted.show()

+-------------+-----------+--------------------+-----------------+--------------+---------+-------------------+-----------------+-----------------+-----------------+
|location_code|       name|         description|compartiment_code|parameter_code|unit_code|          date_time|measurement_value|     Y_coordinate|     X_coordinate|
+-------------+-----------+--------------------+-----------------+--------------+---------+-------------------+-----------------+-----------------+-----------------+
|           J6|J6 platform|Significante golf...|               OW|           Hm0|       cm|2025-01-22 06:00:00|             77.0|53.81663199999994|2.950009999999986|
|           J6|J6 platform|Significante golf...|               OW|           Hm0|       cm|2025-01-22 06:10:00|             74.0|53.81663199999994|2.950009999999986|
|           J6|J6 platform|Significante golf...|               OW|           Hm0|       cm|2025-01-22 06:20:00|             66.0|53.81663199999994|2.950009999999986|
|   

## Store dataset in SQL database

First we will store the total dataset in a sql database table as exploded_data

In [23]:
# For simplicity, convert the latest dataframe variable name to df_final before saving to the database

df_final = df_converted.sort("date_time",'location_code','parameter_code')
df_final.show()

+-------------+--------------------+--------------------+-----------------+--------------+---------+-------------------+-----------------+------------------+------------------+
|location_code|                name|         description|compartiment_code|parameter_code|unit_code|          date_time|measurement_value|      Y_coordinate|      X_coordinate|
+-------------+--------------------+--------------------+-----------------+--------------+---------+-------------------+-----------------+------------------+------------------+
|          BG2|Brouwershavense G...|Significante golf...|               OW|           Hm0|       cm|2025-01-01 10:00:00|            178.0| 51.76652700000006| 3.621746999999992|
|          BG2|Brouwershavense G...|Golfperiode bepaa...|               OW|          Tm02|        s|2025-01-01 10:00:00|              4.9| 51.76652700000006| 3.621746999999992|
|          BG2|Brouwershavense G...|Windrichting Luch...|               LT|       WINDRTG|    graad|2025-01-01 10:0

### Storing the exploded dataframe into the SQL database

In [24]:
import configparser

# Define the connection properties
config = configparser.ConfigParser()
config.read('./../credentials')

db_ip = "localhost:1433"
db_name = "data_structure_v3"
db_username = config['database']['username']
db_password = config['database']['password']


driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_url = f"jdbc:sqlserver://{db_ip};databaseName={db_name};username={db_username};password={db_password};TrustServerCertificate=True;"

# Write the DataFrame to the database
table_name = "DBO.exploded_data"

df_final.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", table_name) \
    .option("driver", driver) \
    .mode("overwrite") \
    .save()



### Create dimension tables
Than we will create dimension tables such that the data is distributed over different tables in the star schema, and linked to eachother with primary and foreign keys.

In [25]:
from pyspark.sql.functions import col, monotonically_increasing_id

# Create dimension tables with unique IDs
timestamp_dim = df_final.select(
    col("date_time")
).distinct().withColumn("timestamp_id", monotonically_increasing_id() + 1)

location_dim = df_final.select(
    col("Location_Code").alias("location_code"),
    col("X_coordinate"),
    col("Y_coordinate"),
    col("Name").alias("location_name")
).distinct()

parameter_dim = df_final.select(
    col("compartiment_code"),
    col("parameter_code"),
    col("description"),
    col("unit_code"),
).distinct().withColumn("parameter_id", monotonically_increasing_id() + 1)

location_dim.show()
timestamp_dim.show()
parameter_dim.show()


+-------------+------------------+------------------+--------------------+
|location_code|      X_coordinate|      Y_coordinate|       location_name|
+-------------+------------------+------------------+--------------------+
|           J6| 2.950009999999986| 53.81663199999994|         J6 platform|
|          SPY| 4.517361000000503| 52.46373600000007|IJgeul stroommeet...|
|          BG2| 3.621746999999992| 51.76652700000006|Brouwershavense G...|
|          K14|3.6333299999999977|53.266670000000005|        K14 platform|
+-------------+------------------+------------------+--------------------+

+-------------------+------------+
|          date_time|timestamp_id|
+-------------------+------------+
|2025-01-26 08:00:00|           1|
|2025-01-16 07:10:00|           2|
|2025-01-16 07:30:00|           3|
|2025-01-18 01:40:00|           4|
|2025-01-19 23:30:00|           5|
|2025-01-21 03:00:00|           6|
|2025-01-21 14:20:00|           7|
|2025-01-21 19:50:00|           8|
|2025-01-08 09

### Create measurement tables

The measurement tables are created from the final dataframe. The parameter ID's are added from the parameter dimension table.

In [26]:

windspeed_dim = df_final.filter(col("parameter_code") == "WINDSHD").select(
    col("date_time"),
    col("Location_Code").alias("location_code"),
    col("parameter_code"),
    col("compartiment_code"),
    col("unit_code"),
    col("description"),
    col("measurement_value").alias("windspeed_measurement")
).distinct().withColumn("windspeed_id", monotonically_increasing_id() + 1
).join(
    parameter_dim.select(
        col("parameter_id"),
        col("compartiment_code"),
        col("unit_code"),
        col("description")
    ), 
    on=["compartiment_code", "unit_code","description"], 
    how="left"
).select(
    col("date_time"),
    col("location_code"),
    col("parameter_code"),
    col("windspeed_id"),
    col("parameter_id"),
    col("windspeed_measurement")
)

winddirection_dim = df_final.filter(col("parameter_code") == "WINDRTG").select(
    col("date_time"),
    col("Location_Code").alias("location_code"),
    col("parameter_code"),
    col("compartiment_code"),
    col("unit_code"),
    col("description"),
    col("measurement_value").alias("winddirection_measurement")
).distinct().withColumn("winddirection_id", monotonically_increasing_id() + 1
).join(
    parameter_dim.select(
        col("parameter_id"),
        col("compartiment_code"),
        col("unit_code"),
        col("description")
    ), 
    on=["compartiment_code", "unit_code","description"], 
    how="left"
).select(
    col("date_time"),
    col("location_code"),
    col("parameter_code"),
    col("winddirection_id"),
    col("parameter_id"),
    col("winddirection_measurement")
)

waveheight_dim = df_final.filter(col("parameter_code") == "Hm0").select(
    col("date_time"),
    col("Location_Code").alias("location_code"),
    col("parameter_code"),
    col("compartiment_code"),
    col("unit_code"),
    col("description"),
    col("measurement_value").alias("waveheight_measurement")
).distinct().withColumn("waveheight_id", monotonically_increasing_id() + 1
).join(
    parameter_dim.select(
        col("parameter_id"),
        col("compartiment_code"),
        col("unit_code"),
        col("description")
    ), 
    on=["compartiment_code", "unit_code","description"], 
    how="left"
).select(
    col("date_time"),
    col("location_code"),
    col("parameter_code"),
    col("waveheight_id"),
    col("parameter_id"),
    col("waveheight_measurement")
)

waveperiod_dim = df_final.filter(col("parameter_code") == "Tm02").select(
    col("date_time"),
    col("Location_Code").alias("location_code"),
    col("parameter_code"),
    col("compartiment_code"),
    col("unit_code"),
    col("description"),
    col("measurement_value").alias("waveperiod_measurement")
).distinct().withColumn("waveperiod_id", monotonically_increasing_id() + 1
).join(
    parameter_dim.select(
        col("parameter_id"),
        col("compartiment_code"),
        col("unit_code"),
        col("description")
    ), 
    on=["compartiment_code", "unit_code","description"], 
    how="left"
).select(
    col("date_time"),
    col("location_code"),
    col("parameter_code"),
    col("waveperiod_id"),
    col("parameter_id"),
    col("waveperiod_measurement")
)

# Show the resulting DataFrames
windspeed_dim.show()
winddirection_dim.show()
waveheight_dim.show()
waveperiod_dim.show()


+-------------------+-------------+--------------+------------+------------+---------------------+
|          date_time|location_code|parameter_code|windspeed_id|parameter_id|windspeed_measurement|
+-------------------+-------------+--------------+------------+------------+---------------------+
|2025-01-22 16:20:00|           J6|       WINDSHD|           1|           4|                 0.78|
|2025-01-24 18:20:00|           J6|       WINDSHD|           2|           4|                13.66|
|2025-01-24 21:10:00|           J6|       WINDSHD|           3|           4|                11.78|
|2025-01-15 08:10:00|           J6|       WINDSHD|           4|           4|                 2.23|
|2025-01-17 00:20:00|           J6|       WINDSHD|           5|           4|                 8.78|
|2025-01-18 05:00:00|           J6|       WINDSHD|           6|           4|                 4.64|
|2025-01-18 06:20:00|           J6|       WINDSHD|           7|           4|                 4.07|
|2025-01-1

### Create the fact table

In [27]:

# Create fact table with distinct date_time and location_code
location_measurement_fact = df_final.select(
    col("date_time"),
    col("Location_Code").alias("location_code"),
).distinct().withColumn("measurement_id", monotonically_increasing_id() + 1)

# Add measurements for specific measurement types
location_measurement_fact = location_measurement_fact.join(
    df_final.filter(col("parameter_code") == "WINDSHD").select(
        col("date_time"),
        col("Location_Code").alias("location_code"),
        col("measurement_value").alias("windspeed_measurement")
    ),
    on=["date_time", "location_code"],
    how="left"
).join(
    df_final.filter(col("parameter_code") == "WINDRTG").select(
        col("date_time"),
        col("Location_Code").alias("location_code"),
        col("measurement_value").alias("winddirection_measurement")
    ),
    on=["date_time", "location_code"],
    how="left"
).join(
    df_final.filter(col("parameter_code") == "Hm0").select(
        col("date_time"),
        col("Location_Code").alias("location_code"),
        col("measurement_value").alias("waveheight_measurement")
    ),
    on=["date_time", "location_code"],
    how="left"
).join(
    df_final.filter(col("parameter_code") == "Tm02").select(
        col("date_time"),
        col("Location_Code").alias("location_code"),
        col("measurement_value").alias("waveperiod_measurement")
    ),
    on=["date_time", "location_code"],
    how="left"
).join(
    location_dim,
    on="location_code",
    how="left"
).join(
    timestamp_dim,
    on="date_time",
    how="left"
).join(
    windspeed_dim,
    on=["date_time", "location_code", "windspeed_measurement"],
    how="left"
).join(
    winddirection_dim,
    on=["date_time", "location_code","winddirection_measurement"],
    how="left"
).join(
    waveheight_dim,
    on=["date_time", "location_code","waveheight_measurement"],
    how="left"
).join(
    waveperiod_dim,
    on=["date_time", "location_code", "waveperiod_measurement"],
    how="left"
)

location_measurement_fact.show()

+-------------------+-------------+----------------------+----------------------+-------------------------+---------------------+--------------+-----------------+-----------------+-------------+------------+--------------+------------+------------+--------------+----------------+------------+--------------+-------------+------------+--------------+-------------+------------+
|          date_time|location_code|waveperiod_measurement|waveheight_measurement|winddirection_measurement|windspeed_measurement|measurement_id|     X_coordinate|     Y_coordinate|location_name|timestamp_id|parameter_code|windspeed_id|parameter_id|parameter_code|winddirection_id|parameter_id|parameter_code|waveheight_id|parameter_id|parameter_code|waveperiod_id|parameter_id|
+-------------------+-------------+----------------------+----------------------+-------------------------+---------------------+--------------+-----------------+-----------------+-------------+------------+--------------+------------+---------

## Add surf level categories to the fact table

In [28]:

from pyspark.sql.functions import when, col

location_measurement_fact = location_measurement_fact.withColumn(
    "category",
    when(col("waveperiod_measurement") < 4, "Choppy")
    .when((col("winddirection_measurement") >= 90) & (col("winddirection_measurement") <= 140) & (col("windspeed_measurement") > 28), "Danger")
    .when((col("waveheight_measurement") <= 50) & (col("location_code").isin("SPY")), "Flat")
    .when((col("waveheight_measurement") <= 100) & (col("location_code").isin("SPY")), "Beginner")
    .when((col("waveheight_measurement") <= 150) & (col("location_code").isin("SPY")), "Intermediate")
    .when((col("waveheight_measurement") <= 250) & (col("location_code").isin("SPY")), "Advanced")
    .when((col("waveheight_measurement") > 250) & (col("location_code").isin("SPY")), "Danger")
    .when((col("waveheight_measurement") <= 50) & (col("location_code").isin("BG2")), "Flat")
    .when((col("waveheight_measurement") <= 100) & (col("location_code").isin("BG2")), "Beginner")
    .when((col("waveheight_measurement") <= 150) & (col("location_code").isin("BG2")), "Intermediate")
    .when((col("waveheight_measurement") <= 250) & (col("location_code").isin("BG2")), "Advanced")
    .when((col("waveheight_measurement") > 250) & (col("location_code").isin("BG2")), "Danger")
    .when(col("location_code").isin("J6"), "Too far out")
    .when(col("location_code").isin("K14"), "Too far out")
    .otherwise("Undefined")
)
location_measurement_fact.show()

+-------------------+-------------+----------------------+----------------------+-------------------------+---------------------+--------------+-----------------+-----------------+-------------+------------+--------------+------------+------------+--------------+----------------+------------+--------------+-------------+------------+--------------+-------------+------------+-----------+
|          date_time|location_code|waveperiod_measurement|waveheight_measurement|winddirection_measurement|windspeed_measurement|measurement_id|     X_coordinate|     Y_coordinate|location_name|timestamp_id|parameter_code|windspeed_id|parameter_id|parameter_code|winddirection_id|parameter_id|parameter_code|waveheight_id|parameter_id|parameter_code|waveperiod_id|parameter_id|   category|
+-------------------+-------------+----------------------+----------------------+-------------------------+---------------------+--------------+-----------------+-----------------+-------------+------------+-------------

Filter only the nessecary columns

In [29]:

location_measurement_fact = location_measurement_fact.select(
    col("measurement_id"),
    col("timestamp_id"),
    col("location_code"),
    col("windspeed_id"),
    col("winddirection_id"),
    col("waveheight_id"),
    col("waveperiod_id"),
    col("category")
).sort("date_time",'location_code')

location_measurement_fact.show()

+--------------+------------+-------------+------------+----------------+-------------+-------------+-----------+
|measurement_id|timestamp_id|location_code|windspeed_id|winddirection_id|waveheight_id|waveperiod_id|   category|
+--------------+------------+-------------+------------+----------------+-------------+-------------+-----------+
|         12310|         546|          BG2|        9955|            9899|         8926|         7232|   Advanced|
|          9093|         546|           J6|        1853|            2132|         1565|        10490|Too far out|
|          6632|         546|          K14|       13215|           12405|        14922|        13878|Too far out|
|         10435|         546|          SPY|        6896|            4262|         5451|         2548|     Danger|
|         12883|        3748|          BG2|        8374|            8962|        10423|         5419|   Advanced|
|           976|        3748|           J6|        2844|            2044|         3352| 

## Add wind direction strings to the winddirection dimension table

In [30]:
from pyspark.sql.functions import when

winddirection_dim = winddirection_dim.withColumn(
    "direction",
    when((col("winddirection_measurement") >= 348.75) | (col("winddirection_measurement") < 11.25), "N")
    .when((col("winddirection_measurement") >= 11.25) & (col("winddirection_measurement") < 33.75), "NNE")
    .when((col("winddirection_measurement") >= 33.75) & (col("winddirection_measurement") < 56.25), "NE")
    .when((col("winddirection_measurement") >= 56.25) & (col("winddirection_measurement") < 78.75), "ENE")
    .when((col("winddirection_measurement") >= 78.75) & (col("winddirection_measurement") < 101.25), "E")
    .when((col("winddirection_measurement") >= 101.25) & (col("winddirection_measurement") < 123.75), "ESE")
    .when((col("winddirection_measurement") >= 123.75) & (col("winddirection_measurement") < 146.25), "SE")
    .when((col("winddirection_measurement") >= 146.25) & (col("winddirection_measurement") < 168.75), "SSE")
    .when((col("winddirection_measurement") >= 168.75) & (col("winddirection_measurement") < 191.25), "S")
    .when((col("winddirection_measurement") >= 191.25) & (col("winddirection_measurement") < 213.75), "SSW")
    .when((col("winddirection_measurement") >= 213.75) & (col("winddirection_measurement") < 236.25), "SW")
    .when((col("winddirection_measurement") >= 236.25) & (col("winddirection_measurement") < 258.75), "WSW")
    .when((col("winddirection_measurement") >= 258.75) & (col("winddirection_measurement") < 281.25), "W")
    .when((col("winddirection_measurement") >= 281.25) & (col("winddirection_measurement") < 303.75), "WNW")
    .when((col("winddirection_measurement") >= 303.75) & (col("winddirection_measurement") < 326.25), "NW")
    .when((col("winddirection_measurement") >= 326.25) & (col("winddirection_measurement") < 348.75), "NNW")
    .otherwise("Unknown")
)

winddirection_dim.show()

+-------------------+-------------+--------------+----------------+------------+-------------------------+---------+
|          date_time|location_code|parameter_code|winddirection_id|parameter_id|winddirection_measurement|direction|
+-------------------+-------------+--------------+----------------+------------+-------------------------+---------+
|2025-01-22 17:50:00|           J6|       WINDRTG|               1|           3|                    316.2|       NW|
|2025-01-23 21:20:00|           J6|       WINDRTG|               2|           3|                    252.3|      WSW|
|2025-01-24 00:30:00|           J6|       WINDRTG|               3|           3|                    257.9|      WSW|
|2025-01-25 00:20:00|           J6|       WINDRTG|               4|           3|                    228.5|       SW|
|2025-01-25 15:20:00|           J6|       WINDRTG|               5|           3|                    274.7|        W|
|2025-01-16 20:20:00|           J6|       WINDRTG|              

Clean up the measurement values inside the dimension tables

## Clean up the columns of the measurement tables 

In [31]:
windspeed_dim = windspeed_dim.select(
    col("windspeed_id"),
    col("windspeed_measurement").alias("measurement_value"),
    col("parameter_id")
)
winddirection_dim = winddirection_dim.select(
    col("winddirection_id"),
    col("winddirection_measurement").alias("measurement_value"),
    col("parameter_id")
)
waveheight_dim = waveheight_dim.select(
    col("waveheight_id"),
    col("waveheight_measurement").alias("measurement_value"),
    col("parameter_id")
)
waveperiod_dim = waveperiod_dim.select(
    col("waveperiod_id"),
    col("waveperiod_measurement").alias("measurement_value"),
    col("parameter_id")
)
windspeed_dim.show()
winddirection_dim.show()
waveheight_dim.show()
waveperiod_dim.show()

+------------+-----------------+------------+
|windspeed_id|measurement_value|parameter_id|
+------------+-----------------+------------+
|           1|             0.78|           4|
|           2|            13.66|           4|
|           3|            11.78|           4|
|           4|             2.23|           4|
|           5|             8.78|           4|
|           6|             4.64|           4|
|           7|             4.07|           4|
|           8|             3.59|           4|
|           9|              4.8|           4|
|          10|             6.41|           4|
|          11|             7.72|           4|
|          12|            10.11|           4|
|          13|            12.22|           4|
|          14|             7.26|           4|
|          15|             2.83|           4|
|          16|             7.64|           4|
|          17|              6.5|           4|
|          18|            10.63|           4|
|          19|            11.75|  

## Save in database

In [32]:
import configparser

# Define the connection properties
config = configparser.ConfigParser()
config.read('./../credentials')

db_ip = "localhost:1433"
db_name = "data_structure_v3"
db_username = config['database']['username']
db_password = config['database']['password']

driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_url = f"jdbc:sqlserver://{db_ip};databaseName={db_name};username={db_username};password={db_password};TrustServerCertificate=True;"

# Write the dimension tables to the database
location_dim.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "location_dim") \
    .option("driver", driver) \
    .mode("overwrite") \
    .save()

timestamp_dim.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "timestamp_dim") \
    .option("driver", driver) \
    .mode("overwrite") \
    .save()

parameter_dim.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "parameter_dim") \
    .option("driver", driver) \
    .mode("overwrite") \
    .save()

# write the measurements to the database
windspeed_dim.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "windspeed_dim") \
    .option("driver", driver) \
    .mode("overwrite") \
    .save()
winddirection_dim.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "winddirection_dim") \
    .option("driver", driver) \
    .mode("overwrite") \
    .save()
waveheight_dim.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "waveheight_dim") \
    .option("driver", driver) \
    .mode("overwrite") \
    .save()
waveperiod_dim.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "waveperiod_dim") \
    .option("driver", driver) \
    .mode("overwrite") \
    .save()
# Write the fact table to the database
location_measurement_fact.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "location_measurement_fact") \
    .option("driver", driver) \
    .mode("overwrite") \
    .save()

# Stop the SparkSession
spark.stop()