In [None]:
from pyspark.sql.functions import col, explode, arrays_zip
from pyspark.sql.types import StructType
from datetime import datetime, timedelta

# Read Bronze Layer JSON-File
df_raw = spark.read.option("multiline", "true").json(f"Files/Bronze_Layer_{start_date}.json")

# Step 1: Zip all daily weather arrays
df_zipped = df_raw.select(
    "city", "latitude", "longitude",
    arrays_zip(
        "daily.time",
        "daily.temperature_2m_max",
        "daily.temperature_2m_min",
        "daily.temperature_2m_mean",
        "daily.precipitation_probability_mean",
        "daily.weathercode"
    ).alias("daily")
)

# Step 2: Explode daily records
df_exploded = df_zipped.withColumn("daily", explode("daily"))

# Step 3: Flatten and enrich with time dimensions
df_silver = df_exploded.select(
    "city",
    "latitude",
    "longitude",
    col("daily.time").alias("date"),
    col("daily.temperature_2m_max").alias("temp_max"),
    col("daily.temperature_2m_min").alias("temp_min"),
    col("daily.temperature_2m_mean").alias("temp_mean"),
    col("daily.precipitation_probability_mean").alias("rain_prob"),
    col("daily.weathercode").alias("weather_code")
)

# Overwrite overlapping forecast data
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d")

date_range = [(start_dt + timedelta(days=i)).strftime("%Y-%m-%d") for i in range((end_dt - start_dt).days + 1)]

if spark.catalog.tableExists("Silver_Layer"):
    for d in date_range:
        spark.sql(f"DELETE FROM Silver_Layer WHERE date = '{d}'")

# Write Silver Layer table
df_silver.write.mode("append").format("delta").saveAsTable("Silver_Layer")

# Display result
display(df_silver)

In [4]:
df = spark.sql("SELECT * FROM weather_lakehouse.silver_layer LIMIT 1000")
display(df)

StatementMeta(, d9e59f41-b16d-4646-a13c-7554285743f2, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e95c8b11-a53d-435e-a55c-8cf76940f6ce)