In [None]:
from pyspark.sql.functions import when, col, lit

# Step 1: Read raw CSV from Unity Catalog volume
df_raw = (
    spark.read.option("header", True)
              .option("inferSchema", True)
              .csv("dbfs:/Volumes/etl-pipeline/default/ops_data/sample_ops_data.csv")
)

# Step 2: Transform data
df_clean = (
    df_raw.withColumn("incident_type", when(col("incident_type").isNull() | (col("incident_type") == ""), lit("None")).otherwise(col("incident_type")))
           .withColumn("delay_flag", when(col("delay_minutes") > 0, lit("Delayed")).otherwise(lit("On time")))
)

# Step 3: Save cleaned data as managed Unity Catalog table
df_clean.write.mode("overwrite").saveAsTable("`etl-pipeline`.default.sample_ops_data_cleaned")

# Step 4: Validation summary
print("✅ Total Rows:", df_clean.count())
df_clean.groupBy("incident_type").count().show()
df_clean.groupBy("delay_flag").count().show()