In [0]:
import requests
import pandas as pd

import json

from pyspark.sql.functions import col
from pyspark.sql.types import StringType, DoubleType
from pyspark.ml.feature import VectorAssembler, StandardScaler

from pyspark.sql.utils import AnalysisException

## Selected features from Open Meteo (from EDA)

In [0]:
# Features
feature_cols = [
    "temperature_2m", "precipitation", "wind_speed_10m",
    "wind_speed_80m", "wind_speed_120m", "wind_speed_180m",
    "cloud_cover"
]

## Open-Meteo request for historical data

In [0]:
url = "https://historical-forecast-api.open-meteo.com/v1/forecast"
params = {
    # Lisbon coordinates
	"latitude": 38.716885,
	"longitude": -9.140233,
	# 1 Year Historical data
	"start_date": "2024-05-01",
	"end_date": "2025-05-01",
	"hourly": feature_cols
}

response = requests.get(url, params=params)
data = response.json()
df = pd.DataFrame(data['hourly'])

In [0]:
df.dtypes

Out[24]: time                object
temperature_2m     float64
precipitation      float64
wind_speed_10m     float64
wind_speed_80m     float64
wind_speed_120m    float64
wind_speed_180m    float64
cloud_cover          int64
dtype: object

In [0]:
sdf = spark.createDataFrame(df)

# Cast columns to types
sdf = sdf.select(
    col("time").cast(StringType()),
    col("temperature_2m").cast(DoubleType()),
    col("precipitation").cast(DoubleType()),
    col("wind_speed_10m").cast(DoubleType()),
    col("wind_speed_80m").cast(DoubleType()),
    col("wind_speed_120m").cast(DoubleType()),
    col("wind_speed_180m").cast(DoubleType()),
    col("cloud_cover").cast(DoubleType())
)

sdf.printSchema()
display(sdf)

root
 |-- time: string (nullable = true)
 |-- temperature_2m: double (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- wind_speed_10m: double (nullable = true)
 |-- wind_speed_80m: double (nullable = true)
 |-- wind_speed_120m: double (nullable = true)
 |-- wind_speed_180m: double (nullable = true)
 |-- cloud_cover: double (nullable = true)



time,temperature_2m,precipitation,wind_speed_10m,wind_speed_80m,wind_speed_120m,wind_speed_180m,cloud_cover
2024-05-01T00:00,14.0,0.0,8.7,20.1,23.2,26.9,96.0
2024-05-01T01:00,13.4,0.0,11.9,26.3,29.1,31.1,73.0
2024-05-01T02:00,12.4,0.0,8.6,19.8,22.8,26.4,47.0
2024-05-01T03:00,12.2,0.0,7.4,22.3,26.6,29.4,39.0
2024-05-01T04:00,11.5,0.0,6.6,17.8,21.3,26.6,38.0
2024-05-01T05:00,11.3,0.0,7.5,20.1,23.4,26.0,49.0
2024-05-01T06:00,11.1,0.0,6.2,17.4,21.3,29.0,29.0
2024-05-01T07:00,12.1,0.0,6.9,18.6,22.1,30.0,26.0
2024-05-01T08:00,13.7,0.0,14.5,28.2,30.8,32.1,49.0
2024-05-01T09:00,14.6,0.0,14.2,26.5,27.7,28.9,46.0


## Assembly and Scaling
For all model trainings to use the same & not having to re-execute this.

In [0]:
# Combine different feature columns into one single column (adds a "features" column of type vector)
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
assembled_sdf = assembler.transform(sdf)

# Define scaler
scaler = StandardScaler(
    inputCol="features", # assembled vector
    outputCol="features_scaled", # standardized vector
    withMean=True, # mean 0
    withStd=True # std dev 1
)

scaler_model = scaler.fit(assembled_sdf)
scaled_sdf = scaler_model.transform(assembled_sdf)

# Save scaler model for usage in streaming
scaler_model.write().overwrite().save("/dbfs/tmp/scaler_model")

scaled_sdf.select("features", "features_scaled").show(truncate=False) # Check

+-----------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+
|features                           |features_scaled                                                                                                                                  |
+-----------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+
|[14.0,0.0,8.7,20.1,23.2,26.9,96.0] |[-0.6590445687508486,-0.19775002273369255,-0.10560424671887478,0.180274562944951,0.16579368256782015,0.20510626564808207,1.087019313747202]      |
|[13.4,0.0,11.9,26.3,29.1,31.1,73.0]|[-0.7750025477703008,-0.19775002273369255,0.5002305289146354,0.8329430943676615,0.7169921349709903,0.5525325483524708,0.49537596572697845]       |
|[12.4,0.0,8.6,19.8,22.8,26.4,47.0] |[-0.9682658461360546,-0.19775002273369255,-

## Saving to DBFS as a parquet file
As we are saving the data which is going to have a fixed schema and we don't have a need for ACID transactions, we're using parquet and not a delta lake.

In [0]:
# Permanent table name
permanent_table_name = "historical_weather_hourly_lisbon"

# Try to delete managed table
try:
    dbutils.fs.rm("dbfs:/user/hive/warehouse/historical_weather_hourly_lisbon", recurse=True)
    print("Deleted successfully.")
except Exception as e:
    print("Path may not exist or could not be deleted:", str(e))
# Save as a managed table as parquet
scaled_sdf.write.mode("overwrite").format("parquet").saveAsTable(permanent_table_name)

# Save used feature cols 
# TODO change tmp folder?
dbutils.fs.put("dbfs:/tmp/feature_cols.json", json.dumps(feature_cols), overwrite=True)

Deleted successfully.
Wrote 124 bytes.
Out[27]: True