In [0]:
bronze_df = spark.table(
    "scf_bronze.raw_supply.bronze_food_supply_chain"
)
bronze_df.count()

In [0]:
bronze_df.printSchema()

In [0]:
from pyspark.sql.functions import col, to_date

typed_df = (
    bronze_df
    .withColumn("product_id", col("product_id").cast("int"))
    .withColumn("quantity_shipped", col("quantity_shipped").cast("int"))
    .withColumn("unit_cost", col("unit_cost").cast("double"))
    .withColumn("total_cost", col("total_cost").cast("double"))
    .withColumn("shipment_date", to_date(col("shipment_date")))
    .withColumn("delivery_date", to_date(col("delivery_date")))
)

In [0]:
non_null_df = typed_df.dropna(
    subset=[
        "product_id",
        "product_name",
        "quantity_shipped",
        "unit_cost",
        "shipment_date"
    ]
)

In [0]:
validated_df = non_null_df.filter(
    (col("quantity_shipped") > 0) &
    (col("unit_cost") > 0) &
    (col("delivery_date") >= col("shipment_date")) &
    (col("total_cost") >= col("unit_cost"))
)

In [0]:
from pyspark.sql.functions import round

silver_df = validated_df.withColumn(
    "total_cost",
    round(col("quantity_shipped") * col("unit_cost"), 2)
)

In [0]:
silver_df = silver_df.withColumn(
    "quality_check_status",
    col("quality_check_status")
).filter(
    col("quality_check_status").isin("Passed", "Failed")
)

In [0]:
silver_df = silver_df.dropDuplicates([
    "product_id",
    "supplier_name",
    "origin_city",
    "destination_warehouse",
    "shipment_date"
])

In [0]:
from pyspark.sql.functions import current_date
silver_df = silver_df.filter(
    (col("shipment_date") <= current_date()) &
    (col("delivery_date") <= current_date())
)

In [0]:
dim_product = (
  silver_df
  .select("product_id", "product_name")
  .dropDuplicates()
)

dim_product.write.format("delta") \
  .mode("overwrite") \
  .saveAsTable("scf_silver.dim.product")

In [0]:
dim_supplier = (
  silver_df
  .select("supplier_name")
  .dropDuplicates()
  .withColumnRenamed("supplier_name", "supplier")
)

dim_supplier.write.format("delta") \
  .mode("overwrite") \
  .saveAsTable("scf_silver.dim.supplier")

In [0]:
dim_location = (
  silver_df
  .select("origin_city", "destination_warehouse")
  .dropDuplicates()
)

dim_location.write.format("delta") \
  .mode("overwrite") \
  .saveAsTable("scf_silver.dim.location")

In [0]:
from pyspark.sql.functions import year, month, dayofmonth

dim_date = (
  silver_df
  .select("shipment_date")
  .dropDuplicates()
  .withColumn("year", year("shipment_date"))
  .withColumn("month", month("shipment_date"))
  .withColumn("day", dayofmonth("shipment_date"))
)

dim_date.write.format("delta") \
  .mode("overwrite") \
  .saveAsTable("scf_silver.dim.date")

In [0]:
fact_shipments = silver_df.select(
  "product_id",
  "supplier_name",
  "origin_city",
  "destination_warehouse",
  "shipment_date",
  "delivery_date",
  "shipping_mode",
  "quality_check_status",
  "quantity_shipped",
  "unit_cost",
  "total_cost"
)

fact_shipments.write.format("delta") \
  .mode("overwrite") \
  .saveAsTable("scf_silver.fact.shipments")