# 1. Import libraries and loading config

In [23]:
#libraries
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, lit, concat_ws, col
import pyspark.sql.functions as F
import os
import json
from pathlib import Path
from datetime import datetime

#loading config
with open(os.getcwd()+"/config.json") as f:
    config = json.load(f)

# 2. Creating Parquet files

## 2.1. Creating stations files

### 2.1.1. Defining schema

In [24]:
#generated using copilot
stationSchemaDmi = StructType([
    StructField("type", StringType(), True),
    StructField("features", ArrayType(
        StructType([
            StructField("geometry", StructType([
                StructField("coordinates", ArrayType(DoubleType()), True),
                StructField("type", StringType(), True)
            ]), True),
            StructField("id", StringType(), True),
            StructField("type", StringType(), True),
            StructField("properties", StructType([
                StructField("anemometerHeight", StringType(), True),
                StructField("barometerHeight", StringType(), True),
                StructField("country", StringType(), True),
                StructField("created", StringType(), True),
                StructField("name", StringType(), True),
                StructField("operationFrom", StringType(), True),
                StructField("operationTo", StringType(), True),
                StructField("owner", StringType(), True),
                StructField("parameterId", ArrayType(StringType()), True),
                StructField("regionId", StringType(), True),
                StructField("stationHeight", DoubleType(), True),
                StructField("stationId", StringType(), True),
                StructField("status", StringType(), True),
                StructField("type", StringType(), True),
                StructField("updated", StringType(), True),
                StructField("validFrom", StringType(), True),
                StructField("validTo", StringType(), True),
                StructField("wmoCountryCode", StringType(), True),
                StructField("wmoStationId", StringType(), True)
            ]), True)
        ])
    ), True),
    StructField("timeStamp", StringType(), True),
    StructField("numberReturned", IntegerType(), True),
    StructField("links", ArrayType(
        StructType([
            StructField("href", StringType(), True),
            StructField("rel", StringType(), True),
            StructField("type", StringType(), True),
            StructField("title", StringType(), True)
        ])
    ), True)
])

#generated using copilot
stationSchemaFrost = StructType([
    StructField("@context", StringType(), True),
    StructField("@type", StringType(), True),
    StructField("apiVersion", StringType(), True),
    StructField("license", StringType(), True),
    StructField("createdAt", StringType(), True),
    StructField("queryTime", DoubleType(), True),
    StructField("currentItemCount", IntegerType(), True),
    StructField("itemsPerPage", IntegerType(), True),
    StructField("offset", IntegerType(), True),
    StructField("totalItemCount", IntegerType(), True),
    StructField("currentLink", StringType(), True),
    StructField("data", ArrayType(
        StructType([
            StructField("@type", StringType(), True),
            StructField("id", StringType(), True),
            StructField("name", StringType(), True),
            StructField("shortName", StringType(), True),
            StructField("country", StringType(), True),
            StructField("countryCode", StringType(), True),
            StructField("geometry", StructType([
                StructField("@type", StringType(), True),
                StructField("coordinates", ArrayType(DoubleType()), True),
                StructField("nearest", BooleanType(), True)
            ]), True),
            StructField("masl", DoubleType(), True),
            StructField("validFrom", StringType(), True),
            StructField("county", StringType(), True),
            StructField("countyId", IntegerType(), True),
            StructField("municipality", StringType(), True),
            StructField("municipalityId", IntegerType(), True),
            StructField("ontologyId", IntegerType(), True),
            StructField("stationHolders", ArrayType(StringType()), True),
            StructField("externalIds", ArrayType(StringType()), True),
            StructField("wigosId", StringType(), True),
            StructField("wmoId", IntegerType(), True),
            StructField("shipCodes", ArrayType(StringType()), True),
            StructField("icaoCodes", ArrayType(StringType()), True)
        ])
    ), True)
])


### 2.1.2. Data cleaning and filtering

In [None]:
# paths for station files extracted from config file
curr_path = os.getcwd()
path_dmi = curr_path+config['import']['dmiExportPath']+'/dmiStations*'
path_frost_met = curr_path+config['import']['frostExportPath']+'/frostStations*'

spark = (
    SparkSession.builder
    .appName("weatherSilverLayerApp")
    .getOrCreate()
)

# construction dmi_df
dmi_station_df = spark.read.option("multiLine", True).schema(stationSchemaDmi).json(path_dmi) # read all json file from path
dmi_station_df_features = dmi_station_df.select(explode("features").alias("feature"))
# flattening structure, please inspect json files for detailed levels
dmi_df = dmi_station_df_features.select("feature.id", 
                                     "feature.properties.name",
                                     "feature.properties.country"
)
dmi_df = dmi_df.withColumn("source", lit('DMI')) # adding source column
# converting so that country codes matches frost data
dmi_df = dmi_df.withColumn('country',
              F.when(dmi_df.country == "DNK", "DK")
              .when(dmi_df.country == "GRL", "GL")
              .when(dmi_df.country == "FRO", "FO"))

# applying uppercase to name column

dmi_df = dmi_df.withColumn('name',
              F.upper(dmi_df.name))

#construction frost_df
frost_station_df = spark.read.option("multiLine", True).schema(stationSchemaFrost).json(path_frost_met) # read all json file from path
frost_station_df_features = frost_station_df.select(explode("data").alias("station"))
# flattening structure, please inspect json files for detailed levels
frost_df = frost_station_df_features.select("station.id",
                                        "station.name",
                                        "station.countryCode",
)
frost_df = frost_df.withColumn("source", lit('FROST_MET')) # adding source column
frost_df = frost_df.withColumnRenamed("countryCode", "country")

# applying uppercase to name column

frost_df = frost_df.withColumn('name',
              F.upper(frost_df.name))

# appending and dropping duplicates
dmi_df = dmi_df.drop_duplicates(["id"])
frost_df = frost_df.drop_duplicates(["id"])
frost_df.show(500)


25/07/03 13:00:53 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: /Users/nicklas.stiborg/Documents/medallion_architecture_project/bronze/dmi/dmiStations*.
java.io.FileNotFoundException: File /Users/nicklas.stiborg/Documents/medallion_architecture_project/bronze/dmi/dmiStations* does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$cata

+---------+--------------------+-----------+---------+
|       id|                name|countryCode|   source|
+---------+--------------------+-----------+---------+
|      GF1|       ØSTFOLD FYLKE|       NULL|FROST_MET|
|     GF10|    VEST-AGDER FYLKE|       NULL|FROST_MET|
|     GF11|      ROGALAND FYLKE|       NULL|FROST_MET|
|     GF12|     HORDALAND FYLKE|       NULL|FROST_MET|
|     GF14|SOGN OG FJORDANE ...|       NULL|FROST_MET|
|     GF15|MØRE OG ROMSDAL F...|       NULL|FROST_MET|
|     GF16| SØR-TRØNDELAG FYLKE|       NULL|FROST_MET|
|     GF17|NORD-TRØNDELAG FYLKE|       NULL|FROST_MET|
|     GF18|      NORDLAND FYLKE|       NULL|FROST_MET|
|     GF19|         TROMS FYLKE|       NULL|FROST_MET|
|      GF2|      AKERSHUS FYLKE|       NULL|FROST_MET|
|     GF20|      FINNMARK FYLKE|       NULL|FROST_MET|
|      GF3|          OSLO FYLKE|       NULL|FROST_MET|
|      GF4|       HEDMARK FYLKE|       NULL|FROST_MET|
|      GF5|       OPPLAND FYLKE|       NULL|FROST_MET|
|      GF6

### 2.1.3. Exporting to Parquet

In [26]:
dmi_export_location = os.getcwd() + config['export']['dmiDeltaPathStation']
frost_export_location = os.getcwd() + config['export']['frostDeltaPathStation']
dmi_df.write.format('parquet').save(dmi_export_location + str(datetime.today().strftime("%Y-%m-%d_%H-%M-%S")), mode="overwrite")
frost_df.write.format('parquet').save(frost_export_location + str(datetime.today().strftime("%Y-%m-%d_%H-%M-%S")), mode="overwrite")

## 2.2. Creating readings files

### 2.2.1. Defining schema

In [27]:
# generated using copilot
readingsDmiSchema = StructType([
    StructField("type", StringType(), True),
    StructField("features", ArrayType(
        StructType([
            StructField("type", StringType(), True),
            StructField("id", StringType(), True),
            StructField("geometry", StructType([
                StructField("type", StringType(), True),
                StructField("coordinates", ArrayType(DoubleType()), True)
            ]), True),
            StructField("properties", StructType([
                StructField("created", StringType(), True),  # or TimestampType() if parsed
                StructField("observed", StringType(), True),  # or TimestampType() if parsed
                StructField("parameterId", StringType(), True),
                StructField("stationId", StringType(), True),
                StructField("value", DoubleType(), True)
            ]), True)
        ])
    ), True)
])


# generated using copilot
readingsFrostSchema = StructType([
    StructField("@context", StringType(), True),
    StructField("@type", StringType(), True),
    StructField("apiVersion", StringType(), True),
    StructField("license", StringType(), True),
    StructField("createdAt", StringType(), True),
    StructField("queryTime", DoubleType(), True),
    StructField("currentItemCount", IntegerType(), True),
    StructField("itemsPerPage", IntegerType(), True),
    StructField("offset", IntegerType(), True),
    StructField("totalItemCount", IntegerType(), True),
    StructField("currentLink", StringType(), True),
    StructField("data", ArrayType(
        StructType([
            StructField("sourceId", StringType(), True),
            StructField("referenceTime", StringType(), True),
            StructField("observations", ArrayType(
                StructType([
                    StructField("elementId", StringType(), True),
                    StructField("value", DoubleType(), True),
                    StructField("unit", StringType(), True),
                    StructField("level", StructType([
                        StructField("levelType", StringType(), True),
                        StructField("unit", StringType(), True),
                        StructField("value", DoubleType(), True)
                    ]), True),
                    StructField("timeOffset", StringType(), True),
                    StructField("timeResolution", StringType(), True),
                    StructField("timeSeriesId", IntegerType(), True),
                    StructField("performanceCategory", StringType(), True),
                    StructField("exposureCategory", StringType(), True),
                    StructField("qualityCode", IntegerType(), True)
                ])
            ), True)
        ])
    ), True)
])


### 2.2.2. Data cleaning and filtering

In [28]:
#paths for station files extracted from config file
curr_path = os.getcwd()
path_dmi = curr_path+config['import']['dmiExportPath']
path_frost_met = curr_path+config['import']['frostExportPath']

spark = (
    SparkSession.builder
    .appName("weatherSilverLayerApp")
    .getOrCreate()
)

# list iterations to get all files in dir that matches dmiAirTemperature
filesList = [
    str(p) for p in Path(path_dmi).rglob("*.*")
    if p.parent != path_dmi and p.is_file() and "dmiAirTemperature" in p.name
]

dmi_weather_readings_df = spark.read.option("multiLine", True).schema(readingsDmiSchema).json(filesList) # read all json file from path
dmi_weather_readings_df_features = dmi_weather_readings_df.select(explode("features").alias("feature"))
# flattening structure, please inspect json files for detailed levels
dmi_weather_readings_df = dmi_weather_readings_df_features.select("feature.id", 
                                     "feature.properties.observed",
                                     "feature.properties.stationId",
                                     "feature.properties.value"
)
dmi_weather_readings_df = dmi_weather_readings_df.withColumn("source", lit('DMI')) # adding source column

# list iterations to get all files in dir that matches dmiAirTemperature
filesList = [
    str(p) for p in Path(path_frost_met).rglob("*.*")
    if p.parent != path_frost_met and p.is_file() and "frostMetAirTemperature" in p.name
]

frost_weather_readings_df = spark.read.option("multiLine", True).schema(readingsFrostSchema).json(filesList) # read all json file from path
frost_weather_readings_df_features = frost_weather_readings_df.select(explode("data").alias("data"))
# flattening structure, please inspect json files for detailed levels
frost_weather_readings_df = frost_weather_readings_df_features.select("data.sourceId", 
                                     "data.referenceTime",
                                     "data.observations.value"
)
frost_weather_readings_df = frost_weather_readings_df.withColumn("source", lit('FROST_MET')) # adding source column

#renaming columns to match DMIs column names
rename_map = {
    "sourceId": "stationId",
    "referenceTime": "observed",
}
for old_col, new_col in rename_map.items():
    frost_weather_readings_df = frost_weather_readings_df.withColumnRenamed(old_col, new_col)

#concating station id and timestamp to create a unique id, this will make the delta update easier when loading into the SQL
frost_weather_readings_df = frost_weather_readings_df.withColumn("sourceId", concat_ws("_", frost_weather_readings_df.stationId, frost_weather_readings_df.observed))

#converting type of value from list to float
frost_weather_readings_df = frost_weather_readings_df.withColumn("value", col("value")[0].cast("float"))

frost_weather_readings_df.show()
dmi_weather_readings_df.show()

+---------+--------------------+-----+---------+--------------------+
|stationId|            observed|value|   source|            sourceId|
+---------+--------------------+-----+---------+--------------------+
| SN1135:0|2025-06-29T00:00:...| 14.2|FROST_MET|SN1135:0_2025-06-...|
| SN1135:0|2025-06-29T00:10:...| 14.2|FROST_MET|SN1135:0_2025-06-...|
| SN1135:0|2025-06-29T00:20:...| 14.2|FROST_MET|SN1135:0_2025-06-...|
| SN1135:0|2025-06-29T00:30:...| 14.1|FROST_MET|SN1135:0_2025-06-...|
| SN1135:0|2025-06-29T00:40:...| 14.0|FROST_MET|SN1135:0_2025-06-...|
| SN1135:0|2025-06-29T00:50:...| 14.0|FROST_MET|SN1135:0_2025-06-...|
| SN1135:0|2025-06-29T01:00:...| 13.8|FROST_MET|SN1135:0_2025-06-...|
| SN1135:0|2025-06-29T01:10:...| 13.7|FROST_MET|SN1135:0_2025-06-...|
| SN1135:0|2025-06-29T01:20:...| 13.6|FROST_MET|SN1135:0_2025-06-...|
| SN1135:0|2025-06-29T01:30:...| 13.6|FROST_MET|SN1135:0_2025-06-...|
| SN1135:0|2025-06-29T01:40:...| 13.5|FROST_MET|SN1135:0_2025-06-...|
| SN1135:0|2025-06-2

### 2.2.3. Exporting to Parquet

In [29]:
dmi_export_location = os.getcwd() + config['export']['dmiDeltaPathWr']
frost_export_location = os.getcwd() + config['export']['frostDeltaPathWr']
dmi_weather_readings_df.write.format('parquet').save(dmi_export_location + str(datetime.today().strftime("%Y-%m-%d_%H-%M-%S")), mode="overwrite")
frost_weather_readings_df.write.format('parquet').save(frost_export_location + str(datetime.today().strftime("%Y-%m-%d_%H-%M-%S")), mode="overwrite")

25/07/03 12:55:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/07/03 12:55:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/07/03 12:55:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/07/03 12:55:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/07/03 12:55:52 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
