**_Purpose: Read Bronze → clean/transform → save as Silver._**

In [0]:
# Step 1: Read Bronze table
bronze_df = spark.table("weather_bronze")

# Step 2: Transform and clean
from pyspark.sql.functions import col, to_timestamp, lit

silver_df = bronze_df.select(
    col("id"),
    col("time").alias("hour"),
    col("temperature_2m").cast("float"),
    col("relative_humidity_2m").cast("float"),
    col("precipitation").cast("float"),
    to_timestamp(col("extracted_at")).alias("extracted_at")  # Ensure timestamp type
)


# Convert hour to timestamp
silver_df = silver_df.withColumn("hour", to_timestamp(col("hour")))

# Remove duplicates by 'id'
silver_df = silver_df.dropDuplicates(["id"])

# Add source metadata
silver_df = silver_df.withColumn("source", lit("OpenWeatherMap"))

# Preview
silver_df.show(5)


+---+-------------------+--------------+--------------------+-------------+-------------------+--------------+
| id|               hour|temperature_2m|relative_humidity_2m|precipitation|       extracted_at|        source|
+---+-------------------+--------------+--------------------+-------------+-------------------+--------------+
|117|2025-09-26 21:00:00|          21.9|                62.0|          0.0|2025-09-22 19:53:36|OpenWeatherMap|
| 62|2025-09-24 14:00:00|          22.8|                81.0|          0.2|2025-09-22 19:53:36|OpenWeatherMap|
|  8|2025-09-22 08:00:00|          16.4|                91.0|          0.0|2025-09-22 19:53:36|OpenWeatherMap|
| 38|2025-09-23 14:00:00|          30.5|                41.0|          0.0|2025-09-22 19:53:36|OpenWeatherMap|
| 39|2025-09-23 15:00:00|          30.9|                40.0|          0.0|2025-09-22 19:53:36|OpenWeatherMap|
+---+-------------------+--------------+--------------------+-------------+-------------------+--------------+
o

In [0]:
# Step 3: Save as Silver Delta table with incremental append
# Use mergeSchema or overwriteSchema if columns might change in the future


silver_df.write.format("delta") \
    .mode("append") \
    .option("overwriteSchema", "true") \
    .saveAsTable("weather_silver")

# Quick check
spark.sql("SELECT * FROM weather_silver LIMIT 5").show()

+---+-------------------+--------------+--------------------+-------------+-------------------+--------------+
| id|               hour|temperature_2m|relative_humidity_2m|precipitation|       extracted_at|        source|
+---+-------------------+--------------+--------------------+-------------+-------------------+--------------+
|117|2025-09-26 21:00:00|          21.9|                62.0|          0.0|2025-09-22 19:53:36|OpenWeatherMap|
| 62|2025-09-24 14:00:00|          22.8|                81.0|          0.2|2025-09-22 19:53:36|OpenWeatherMap|
|  8|2025-09-22 08:00:00|          16.4|                91.0|          0.0|2025-09-22 19:53:36|OpenWeatherMap|
| 38|2025-09-23 14:00:00|          30.5|                41.0|          0.0|2025-09-22 19:53:36|OpenWeatherMap|
| 39|2025-09-23 15:00:00|          30.9|                40.0|          0.0|2025-09-22 19:53:36|OpenWeatherMap|
+---+-------------------+--------------+--------------------+-------------+-------------------+--------------+

