In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

# Define the schema to match your WeatherAPI raw data
weather_schema = StructType([
    StructField("location", StructType([
        StructField("name", StringType()),
        StructField("country", StringType()),
        StructField("localtime", StringType())
    ])),
    StructField("current", StructType([
        StructField("temp_c", DoubleType()),
        StructField("humidity", IntegerType()),
        StructField("condition", StructType([
            StructField("text", StringType())
        ]))
    ]))
])



In [0]:
from pyspark.sql.functions import from_json, col, current_timestamp
# 1. Read from Bronze
df_bronze = spark.readStream.table("weather_bronze_raw")

# 2. Parse and Flatten with case-insensitive handling
df_silver = (df_bronze
    .withColumn("parsed", from_json(col("raw_payload"), weather_schema, {"allowCaseInsensitive": "true"}))
    .select(
        col("parsed.location.name").alias("city"),
        col("parsed.location.country").alias("country"),
        col("parsed.current.temp_c").alias("temp_celsius"),
        col("parsed.current.humidity").alias("humidity_pct"),
        col("parsed.current.condition.text").alias("condition"),
        col("event_hub_time"),
        col("ingested_at")
    )
)
# 3. Write to Silver Table - Overwriting the old mismatched schema
(df_silver.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/mnt/weather/checkpoints/silver_v2") # Use a new checkpoint for a clean start
    .option("overwriteSchema", "true") # This forces the table to accept your new column names
    .toTable("weather_silver_cleaned"))

In [0]:
display(spark.read.table("weather_silver_cleaned"))