In [0]:
from pyspark.sql import functions as F
bronze_path = "/Volumes/logistics/bronze/shipments_vol/"

all_files_df = (
    spark.createDataFrame(dbutils.fs.ls(bronze_path))
    .filter(F.col("name").endswith(".csv"))
    .select(F.col("name").alias("file_name"))
)
all_files_df.display()

In [0]:
processed_df = spark.table("logistics.control.processed_files")

new_files_df = all_files_df.join(
    processed_df,
    on="file_name",
    how="left_anti"
)
new_files_df.display()

In [0]:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

new_files = [row.file_name for row in new_files_df.collect()]

for file_name in new_files:

    file_path = f"{bronze_path}/{file_name}"

    # Read CSV
    df = (
        spark.read
            .format("csv")
            .option("header", "true")
            .load(file_path)
    )

    # Ingest metadata
    df = (
        df
        .withColumn("ingest_ts", F.current_timestamp())
        .withColumn("ingest_date", F.current_date())
        .withColumn("source_file", F.lit(file_name))
    )

    # Normalize columns
    for c in df.columns:
        df = df.withColumnRenamed(c, c.lower().replace(" ", "_"))

    # Trim + case
    for c in ["carrier_id", "delivery_status"]:
        df = df.withColumn(c, F.upper(F.trim(F.col(c))))

    # Cast types
    df = (
        df
        .withColumn("shipment_cost", F.col("shipment_cost").cast("double"))
        .withColumn("shipment_date", F.to_date("shipment_date"))
        .withColumn("delivery_date", F.to_date("delivery_date"))
    )

    # Empty â†’ null
    df = df.select([
        F.when(F.trim(F.col(c)) == "", None).otherwise(F.col(c)).alias(c)
        for c in df.columns
    ])

    # Dedup within file
    w = Window.partitionBy("shipment_id").orderBy(F.col("ingest_ts").desc())
    df = df.withColumn("rn", F.row_number().over(w)).filter("rn=1").drop("rn")

    # UNKNOWN carrier
    df = df.withColumn("carrier_id", F.coalesce(F.col("carrier_id"), F.lit("UNKNOWN")))

    # Cost rounding
    df = df.withColumn("shipment_cost", F.round(F.col("shipment_cost"), 2))

    # Outlier detection
    median_cost = df.approxQuantile("shipment_cost", [0.5], 0.01)[0]
    df = df.withColumn("dq_cost_outlier", F.col("shipment_cost") > median_cost * 10)

    # DQ flags
    df = (
        df
        .withColumn("dq_cost_null", F.col("shipment_cost").isNull())
        .withColumn("dq_cost_negative", F.col("shipment_cost") < 0)
        .withColumn("dq_cost_zero", F.col("shipment_cost") == 0)
        .withColumn("dq_date_anomaly", F.col("shipment_date") > F.col("delivery_date"))
        .withColumn(
            "dq_invalid_status",
            ~F.col("delivery_status").isin("DELIVERED","IN_TRANSIT","CANCELLED")
        )
    )

    # Score + severity
    df = df.withColumn(
        "dq_score",
        F.expr("""
            int(dq_cost_null) +
            int(dq_cost_negative) +
            int(dq_cost_zero) +
            int(dq_cost_outlier) +
            int(dq_date_anomaly) +
            int(dq_invalid_status)
        """)
    )

    df = df.withColumn(
        "dq_severity",
        F.when(F.col("dq_score") >= 2, "CRITICAL")
             .when(F.col("dq_score") == 1, "MAJOR")
             .otherwise("MINOR")
    )

    df = df.withColumn("is_quarantine", F.col("dq_severity") == "CRITICAL")

    # Append to Silver
    df.write \
      .format("delta") \
      .mode("append") \
      .saveAsTable("logistics.silver.shipments")

    # Mark file as processed
    spark.createDataFrame(
        [(file_name,)],
        ["file_name"]
    ).withColumn(
        "processed_ts", F.current_timestamp()
    ).write.mode("append").saveAsTable("logistics.control.processed_files")


In [0]:
%sql
SELECT COUNT(*) AS total_records
FROM logistics.silver.shipments;

In [0]:
%sql
SELECT 
    COUNT(DISTINCT shipment_id) AS distinct_shipments
FROM logistics.silver.shipments;


In [0]:
%sql
select * FROM logistics.control.processed_files

In [0]:
%sql
SELECT 
    source_file,
    COUNT(*) AS records_loaded
FROM logistics.silver.shipments
GROUP BY source_file
ORDER BY source_file;


In [0]:
%sql
SELECT COUNT(*) AS files_processed
FROM logistics.control.processed_files;


In [0]:
%sql
SELECT COUNT(*) AS null_cost_records
FROM logistics.silver.shipments
WHERE dq_cost_null = true;


In [0]:
%sql
SELECT COUNT(*) AS date_anomaly_records
FROM logistics.silver.shipments
WHERE dq_date_anomaly = true;


In [0]:
%sql
SELECT shipment_id, shipment_date, delivery_date
FROM logistics.silver.shipments
WHERE dq_date_anomaly = true
LIMIT 20;


In [0]:
%sql
SELECT 
    dq_severity,
    COUNT(*) AS record_count
FROM logistics.silver.shipments
GROUP BY dq_severity
ORDER BY dq_severity;


In [0]:
%sql
SELECT
    COUNT(*) AS total_records,
    SUM(CASE WHEN dq_severity = 'CRITICAL' THEN 1 ELSE 0 END) AS critical_records,
    SUM(CASE WHEN dq_severity = 'MAJOR' THEN 1 ELSE 0 END) AS major_records,
    SUM(CASE WHEN dq_severity = 'MINOR' THEN 1 ELSE 0 END) AS minor_records
FROM logistics.silver.shipments;


In [0]:
from pyspark.sql.functions import col
shipments_df = spark.table("logistics.silver.shipments")
silver_cleaned_df = shipments_df.filter(col("dq_severity") == "MINOR")
quarantine_df = shipments_df.filter(
    col("dq_severity").isin("MAJOR", "CRITICAL")
)


In [0]:

silver_cleaned_df.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("logistics.silver.silver_cleaned")
quarantine_df.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("logistics.silver.quarantine")


In [0]:
%sql
SELECT COUNT(*) FROM logistics.silver.silver_cleaned;
SELECT COUNT(*) FROM logistics.silver.quarantine;


In [0]:
silver_cleaned_df = spark.table("logistics.silver.silver_cleaned")

silver_cleaned_df \
    .coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("abfss://silver@ltimc1sacc.dfs.core.windows.net/silver_cleaned_csv/")

quarantine_df = spark.table("logistics.silver.quarantine")

quarantine_df \
    .coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("abfss://silver@ltimc1sacc.dfs.core.windows.net/quarantine_csv/")


**Gold Layer**

dim_date

In [0]:
%sql
CREATE OR REPLACE TABLE logistics.gold.dim_date AS
SELECT
  CAST(date_format(shipment_date, 'yyyyMMdd') AS INT) AS date_sk,
  shipment_date AS full_date,
  year(shipment_date) AS year,
  month(shipment_date) AS month,
  day(shipment_date) AS day
FROM (
  SELECT DISTINCT shipment_date
  FROM logistics.silver.silver_cleaned
);


In [0]:
%sql
SELECT * FROM logistics.gold.dim_date

In [0]:
regions_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/Volumes/logistics/gold/dimtables/regions.csv")

regions_df.createOrReplaceTempView("regions_stg")

In [0]:
%sql
CREATE OR REPLACE TABLE logistics.gold.dim_region AS
SELECT
  row_number() OVER (ORDER BY region_id) AS region_sk,
  region_id,
  region_name
FROM regions_stg;


In [0]:
%sql
SELECT * FROM logistics.gold.dim_region

In [0]:
warehouses_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/Volumes/logistics/gold/dimtables/warehouses.csv")

warehouses_df.createOrReplaceTempView("warehouses_stg")

In [0]:
%sql
CREATE OR REPLACE TABLE logistics.gold.dim_warehouse AS
SELECT
  row_number() OVER (ORDER BY warehouse_id) AS warehouse_sk,
  warehouse_id,
  warehouse_name,
  city,
  capacity_tpd
FROM warehouses_stg;

In [0]:
%sql
SELECT * FROM logistics.gold.dim_warehouse

In [0]:
carriers_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/Volumes/logistics/gold/dimtables/carriers.csv")

carriers_df.createOrReplaceTempView("carriers_stg")


In [0]:
%sql
CREATE OR REPLACE TABLE logistics.gold.dim_carrier AS
SELECT
  row_number() OVER (ORDER BY carrier_id) AS carrier_sk,
  carrier_id,
  carrier_name,
  mode
FROM carriers_stg;

In [0]:
%sql
SELECT * FROM logistics.gold.dim_carrier

In [0]:
%sql
CREATE OR REPLACE TABLE logistics.gold.fact_shipments AS
SELECT
    s.shipment_id,

    COALESCE(dc.carrier_sk, -1) AS carrier_sk,
    dw.warehouse_sk,
    dr.region_sk,
    dd.date_sk,

    s.shipment_cost,
    s.delivery_days,
    s.priority_level,
    s.is_fragile,
    s.delivery_status,
    s.payment_type

FROM logistics.silver.silver_cleaned s

LEFT JOIN logistics.gold.dim_carrier dc
    ON s.carrier_id = dc.carrier_id

LEFT JOIN logistics.gold.dim_warehouse dw
    ON s.warehouse_id = dw.warehouse_id

LEFT JOIN logistics.gold.dim_region dr
    ON s.region_id = dr.region_id

LEFT JOIN logistics.gold.dim_date dd
    ON s.shipment_date = dd.full_date

WHERE s.shipment_cost IS NOT NULL;


In [0]:
%sql
SELECT * FROM logistics.gold.fact_shipments

In [0]:
%sql
SELECT
    COUNT(*) AS total_rows,
    SUM(CASE WHEN carrier_sk IS NULL THEN 1 ELSE 0 END) AS null_carrier,
    SUM(CASE WHEN warehouse_sk IS NULL THEN 1 ELSE 0 END) AS null_warehouse,
    SUM(CASE WHEN region_sk IS NULL THEN 1 ELSE 0 END) AS null_region
FROM logistics.gold.fact_shipments;
