In [0]:
df = spark.table("workspace.default.weather_raw")
df.show(5)


+---+---------+-----------+------------+---------+
| id|     city|temperature|  event_date|is_active|
+---+---------+-----------+------------+---------+
|  1|  Chennai|       34.7|  2024-01-01|     true|
|  2|    Delhi|       33.6|  2024-01-01|     true|
|  3|Bangalore|       NULL|INVALID_DATE|    false|
|  4|Bangalore|       38.9|INVALID_DATE|     true|
|  5|    Delhi|       27.5|INVALID_DATE|     NULL|
+---+---------+-----------+------------+---------+
only showing top 5 rows


In [0]:
bronze_df = spark.table("workspace.default.weather_raw")


In [0]:
from pyspark.sql.functions import col, when, try_to_date

silver_df = (
    bronze_df
    .withColumn(
        "temperature",
        when(col("temperature").isNull(), 25.0).otherwise(col("temperature"))
    )
    .withColumn(
        "event_date",
        try_to_date(col("event_date"), "yyyy-MM-dd")
    )
    .withColumn(
        "is_active",
        when(col("is_active").isNull(), False).otherwise(col("is_active"))
    )
    .filter(col("event_date").isNotNull())
)

silver_df.show(5)


+---+---------+-----------+----------+---------+
| id|     city|temperature|event_date|is_active|
+---+---------+-----------+----------+---------+
|  1|  Chennai|       34.7|2024-01-01|     true|
|  2|    Delhi|       33.6|2024-01-01|     true|
|  6|Bangalore|       31.9|2024-01-02|    false|
|  7|Hyderabad|       25.0|2024-01-01|    false|
|  8|Bangalore|       31.1|2024-01-02|    false|
+---+---------+-----------+----------+---------+
only showing top 5 rows


In [0]:
silver_df.write.mode("overwrite").saveAsTable(
    "workspace.default.weather_silver"
)


In [0]:
df = spark.table("workspace.default.weather_silver")

assert df.count() > 0, " No data in silver table"
assert df.filter(col("temperature").isNull()).count() == 0, " Null temperatures"
assert df.filter(col("event_date").isNull()).count() == 0, " Invalid dates"

print(" All unit tests passed")


 All unit tests passed


In [0]:
import logging
import time
from pyspark.sql.functions import col, when, try_to_date

logger = logging.getLogger("weather_pipeline")
logger.setLevel(logging.INFO)

if not logger.handlers:
    handler = logging.StreamHandler()
    formatter = logging.Formatter(
        "%(asctime)s | %(levelname)s | %(name)s | %(message)s"
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)

start_time = time.time()
logger.info("Pipeline execution started")

try:
    logger.info("Reading Bronze table: workspace.default.weather_raw")
    bronze_df = spark.table("workspace.default.weather_raw")

    bronze_count = bronze_df.count()
    logger.info(f"Bronze row count: {bronze_count}")

    if bronze_count == 0:
        logger.warning("Bronze table is empty")

    logger.info("Starting Silver transformations")

    silver_df = (
        bronze_df
        .withColumn(
            "temperature",
            when(col("temperature").isNull(), 25.0).otherwise(col("temperature"))
        )
        .withColumn(
            "event_date",
            try_to_date(col("event_date"), "yyyy-MM-dd")
        )
        .withColumn(
            "is_active",
            when(col("is_active").isNull(), False).otherwise(col("is_active"))
        )
        .filter(col("event_date").isNotNull())
    )

    silver_count = silver_df.count()
    logger.info(f"Silver row count after cleaning: {silver_count}")

    null_temp_count = silver_df.filter(col("temperature").isNull()).count()
    logger.info(f"Null temperature count (should be 0): {null_temp_count}")

    if null_temp_count > 0:
        logger.warning("Unexpected null temperatures detected")

    logger.info("Writing Silver table: workspace.default.weather_silver")

    silver_df.write.mode("overwrite").saveAsTable(
        "workspace.default.weather_silver"
    )

    logger.info("Silver table written successfully")

except Exception as e:
    logger.error("Pipeline execution failed", exc_info=True)
    raise

finally:
    end_time = time.time()
    duration = round(end_time - start_time, 2)
    logger.info(f"Pipeline execution completed in {duration} seconds")


2026-01-27 10:57:47,491 | INFO | weather_pipeline | Pipeline execution started
2026-01-27 10:57:47,492 | INFO | weather_pipeline | Reading Bronze table: workspace.default.weather_raw
2026-01-27 10:57:47,974 | INFO | weather_pipeline | Bronze row count: 100
2026-01-27 10:57:47,975 | INFO | weather_pipeline | Starting Silver transformations
2026-01-27 10:57:48,732 | INFO | weather_pipeline | Silver row count after cleaning: 67
2026-01-27 10:57:49,328 | INFO | weather_pipeline | Null temperature count (should be 0): 0
2026-01-27 10:57:49,329 | INFO | weather_pipeline | Writing Silver table: workspace.default.weather_silver
2026-01-27 10:57:53,632 | INFO | weather_pipeline | Silver table written successfully
2026-01-27 10:57:53,632 | INFO | weather_pipeline | Pipeline execution completed in 6.14 seconds
