In [0]:
from pyspark.sql.types import StructType, StringType
from pyspark.sql import functions as F

storage_account = "sbbapistorageaccount"
container = "data-container"
account_key = ""

spark.conf.set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
    account_key
)

bronze_weather_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/bronze/weather"

# ===============================
# 1️⃣ Load Bronze Data
# ===============================
df_weather_raw = spark.read.format("delta").load(bronze_weather_path + "/data")
df_params_raw  = spark.read.format("delta").load(bronze_weather_path + "/params")

print("✅ Bronze Weather Data and Parameters loaded")

In [0]:
# ===============================
# 2️⃣ Clean Parameters Table
# ===============================

# Define the schema
schema = StructType() \
    .add("shortname", StringType()) \
    .add("description_de", StringType()) \
    .add("description_fr", StringType()) \
    .add("description_it", StringType()) \
    .add("description_en", StringType()) \
    .add("group_de", StringType()) \
    .add("group_fr", StringType()) \
    .add("group_it", StringType()) \
    .add("group_en", StringType()) \
    .add("granularity", StringType()) \
    .add("decimals", StringType()) \
    .add("datatype", StringType()) \
    .add("unit", StringType())

# Extract the column with the raw CSV-like string
rdd = df_params_raw.select(df_params_raw.columns[0]).rdd.map(lambda row: row[0])

# Use Spark CSV parser on the RDD
df_parsed = (
    spark.read
    .schema(schema)
    .option("delimiter", ";")
    .option("quote", '"')
    .csv(rdd)
)

# Select useful English columns
df_params = df_parsed.select(
    "shortname",
    "description_en",
    "group_en",
    "granularity",
    "decimals",
    "datatype",
    "unit"
).na.drop()

print("✅ Parameters table cleaned")
display(df_params)

In [0]:
display(df_weather_raw)

In [0]:
# ===============================
# 3️⃣ Rename Weather Data
# ===============================

from pyspark.sql.functions import col

# Recrée un RDD depuis les lignes du fichier météo brut
rdd_weather = df_weather_raw.select(df_weather_raw.columns[0]).rdd.map(lambda row: row[0])

# Utilise Spark pour parser les lignes CSV avec `;` comme séparateur
df_weather_parsed = spark.read.csv(
    rdd_weather,
    sep=";",
    header=False,
    inferSchema=False
)

column_names = columns_raw.split("_") if len(columns_raw.split("_")) == df_weather_parsed.columns.__len__() else ['_c' + str(i) for i in range(df_weather_parsed.columns.__len__())]

df_weather_cleaned = df_weather_parsed.toDF(*column_names)

# Colonnes fixes au début
fixed_cols = ["station_abbr", "reference_timestamp"]

# Récupérer les shortnames depuis df_params
shortnames = [row["shortname"] for row in df_params.select("shortname").collect()]

# Faire correspondre les colonnes du DataFrame aux shortnames
# _c0 → station_abbr, _c1 → reference_timestamp, _c2, _c3, … → shortnames
dynamic_cols = shortnames[:len(df_weather_cleaned.columns) - len(fixed_cols)]

# Créer la liste finale des noms
column_names = fixed_cols + dynamic_cols

# Renommer les colonnes
df_weather_cleaned = df_weather_cleaned.toDF(*column_names)

# Créer le mapping shortname → description_en
param_mapping = {
    row["shortname"]: row["description_en"]
    for row in df_params.collect()
}

# Appliquer le mapping pour remplacer les shortnames par description_en
for i, col_name in enumerate(df_weather_cleaned.columns):
    if col_name in param_mapping:
        df_weather_cleaned = df_weather_cleaned.withColumnRenamed(col_name, param_mapping[col_name])

print("✅ Weather Data columns renamed with descriptions")
display(df_weather_cleaned)


In [0]:
# ===============================
# 5️⃣ Data Cleaning
# ===============================
# - Parse timestamp
# - Cast numeric columns
# - Focus on rain and snow parameters later (Silver should keep them ready)
df_weather = (
    df_weather
    .withColumn("reference_timestamp", F.to_timestamp("reference_timestamp", "dd.MM.yyyy HH:mm"))
    .withColumn("station_abbr", F.col("station_abbr"))
)

# Cast all numeric columns (except station_abbr and timestamp)
for col_name in df_weather.columns:
    if col_name not in ["station_abbr", "reference_timestamp"]:
        df_weather = df_weather.withColumn(col_name, F.col(col_name).cast("double"))

print("✅ Weather Data cleaned and typed")

In [0]:
# ===============================
# 6️⃣ Clean column names
# ===============================
import re

# Fonction pour nettoyer un nom de colonne
def clean_column_name(name):
    # Remplace les caractères invalides par '_'
    name = re.sub(r"[ ,;()/]", "_", name)
    # Supprime les '_' doublons
    name = re.sub(r"_+", "_", name)
    # Supprime '_' en début et fin
    name = name.strip("_")
    return name

# Appliquer sur toutes les colonnes
for col_name in df_weather_cleaned.columns:
    df_weather_cleaned = df_weather_cleaned.withColumnRenamed(col_name, clean_column_name(col_name))

# Vérification
print("✅ Colonnes nettoyées pour Delta :")
display(df_weather_cleaned)


In [0]:
# ===============================
# 7 Save Silver data
# ===============================
silver_weather_path_data = f"abfss://{container}@{storage_account}.dfs.core.windows.net/silver/weather/data"

(df_weather_cleaned.write.format("delta")
 .mode("overwrite")
 .option("mergeSchema", "true")
 .save(silver_weather_path_data))

print("✅ Silver Weather created successfully")

In [0]:
# ===============================
# 7 Save Silver params
# ===============================
silver_weather_path_params = f"abfss://{container}@{storage_account}.dfs.core.windows.net/silver/weather/params"

(df_params.write.format("delta")
 .mode("overwrite")
 .option("mergeSchema", "true")
 .save(silver_weather_path_params))

print("✅ Silver Weather created successfully")

In [0]:
# -------------------------------
# 8 Register SQL tables
# -------------------------------
spark.sql(f"""
CREATE TABLE IF NOT EXISTS silver_weather_data
USING DELTA
LOCATION '{silver_weather_path_data}'
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS silver_weather_parameters
USING DELTA
LOCATION '{silver_weather_path_params}'
""")


In [0]:
spark.sql(f"""
SELECT *
FROM silver_weather_parameters
LIMIT 3
""").show(truncate=False)