In [0]:
"""# 03 Silver Clean Transformations

This notebook transforms Bronze pharmacy data into the Silver layer.

Responsibilities:
- Data quality checks
- Business-rule filtering
- Type normalization
- Feature enrichment

Source: bronze_pharmacy_events  
Target: silver_pharmacy_events
"""

In [0]:
from pyspark.sql import functions as F


In [0]:
bronze_df = spark.table("bronze_pharmacy_events")

print("Bronze count:", bronze_df.count())
bronze_df.show(5)


In [0]:
silver_df = (
    bronze_df
    .filter(F.col("price_including_gst") > 0)
    .filter(F.col("category").isNotNull())
    .filter(F.col("mfg_date").isNotNull())
    .filter(F.col("expiry_date").isNotNull())
)


In [0]:
silver_df = silver_df.dropDuplicates(
    ["medicine_brand_name", "batch_number", "mfg_date"]
)


In [0]:
silver_df = (
    silver_df
    .withColumn("mfg_date_dt", F.to_date(F.concat(F.col("mfg_date"), F.lit("-01"))))
    .withColumn("expiry_date_dt", F.to_date(F.concat(F.col("expiry_date"), F.lit("-01"))))
)


In [0]:
silver_df = (
    silver_df
    .withColumn(
        "shelf_life_months",
        F.months_between(F.col("expiry_date_dt"), F.col("mfg_date_dt"))
    )
    .withColumn(
        "near_expiry_flag",
        F.when(
            F.months_between(F.col("expiry_date_dt"), F.current_date()) <= 6,
            F.lit(1)
        ).otherwise(F.lit(0))
    )
)


In [0]:
silver_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver_pharmacy_events")


In [0]:
%sql
SELECT COUNT(*) AS total_records
FROM silver_pharmacy_events;

SELECT
  category,
  COUNT(*) AS records,
  SUM(near_expiry_flag) AS near_expiry_count
FROM silver_pharmacy_events
GROUP BY category
ORDER BY records DESC;


In [0]:
spark.table("silver_pharmacy_events").printSchema()


In [0]:
"""## Silver Layer Contract

✔ Cleaned & validated data  
✔ Business rules enforced  
✔ Date fields normalized  
✔ Expiry risk identified  
✔ Ready for aggregations & ML  

This table is the trusted source for Gold analytics.
"""