In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window 

#Chemins 
BRONZE_PATH = "dbfs:/delta/bronze" 
SILVER_PATH = "dbfs:/delta/silver"
SILVER_TABLE = "delta.`dbfs:/delta/silver`"

#Conf business 
BUSINESS_KEY = "ID"
PARTITION_COL = "accident_date"

In [0]:
df_bronze = spark.read.format("delta").load(BRONZE_PATH)
print("Count bronze", df_bronze.count())

def simple_profile(df, n_unique_sample=50):
    cols = df.columns
    schema = [(c, str(df.schema[c].dataType)) for c in cols ]
    nulls = [(c, df.filter(df[c].isNull()).count()) for c in cols]
    distincts = [(c, df.select(c).distinct().count()) for c in cols]
    profile = [(c, t, n, d) for (c,t),(c2,n),(c3,d) in zip(schema,nulls,distincts)]
    return profile 
profile = simple_profile(df_bronze)
for c,t,n,d in profile:
    print(f"{c} {t} {n} {d}")

In [0]:
def parse_time(col, fmt=None):
    if fmt:
        return F.to_timestamp(F.col, fmt)
    else:
        return F.to_timestamp(F.col(col))

def safe_cast(col, dtype):
        return F.when(F.col(col).rlike(r'^-?\d+(\.\d+)?$'), F.col(col).cast(dtype)).otherwise(F.lit(None))  

In [0]:
df_bronze = spark.read.format("delta").load("dbfs:/delta/bronze")

df = df_bronze

df = df.withColumn("accident_date", F.to_date(F.col("Start_Time")))

df = df.withColumn("start_time_ts", parse_time("Start_Time")) \
       .withColumn("end_time_ts", parse_time("End_Time"))

df = df.withColumn("start_lat", safe_cast("Start_Lat", DoubleType())) \
       .withColumn("start_lng", safe_cast("Start_Lng", DoubleType())) \
       .withColumn("end_lat", safe_cast("End_Lat", DoubleType())) \
       .withColumn("end_lng", safe_cast("End_Lng", DoubleType()))

if "Severity" in df.columns:
    df = df.withColumn("severity", safe_cast("Severity", IntegerType()))

cols_keep = [
      BUSINESS_KEY if BUSINESS_KEY in df.columns else F.monotonically_increasing_id().alias("gen_id"),
      "start_time_ts","end_time_ts","accident_date",
      "start_lat","start_lng","end_lat","end_lng",
      "severity","City","State","Zipcode","Country","Weather_Condition"
]

select_cols = []
for c in ["ID","start_time_ts","end_time_ts","accident_date","start_lat","start_lng","end_lat","end_lng","severity","City","State","Zipcode","Country","Weather_Condition"]:
    if c in df.columns:
        select_cols.append(F.col(c))

if BUSINESS_KEY not in df.columns:
    df = df.withColumn("ID", F.sha2(F.concat_ws("||", F.col("start_time_ts").cast("string"), F.col("start_lat").cast("string"), F.col("start_lng").cast("string")),256))

final_cols = ["ID","start_time_ts","end_time_ts","accident_date","start_lat","start_lng","severity","City","State","Zipcode","Country","Weather_Condition"]
final_cols = [c for c in final_cols if c in df.columns]
df_silver_prep = df.select(*final_cols)

display(df_silver_prep.limit(50))
print("Count rows candidate silver:", df_silver_prep.count())




In [0]:
if "_ingest_timestamp" in df.columns:
    w = Window.partitionBy("ID").orderBy(F.col("_ingest_timestamp").desc())
else:
    w = Window.partitionBy("ID").orderBy(F.col("start_time_ts").desc_nulls_last())

df_dedup = df_silver_prep.withColumn("rn", F.row_number().over(w)).filter(F.col("rn") == 1).drop("rn")

print("Count after dedup:", df_dedup.count())
display(df_dedup.limit(20))

In [0]:
df_q = df_dedup.filter((F.col("start_lat").isNotNull())& F.col("start_lng").isNotNull())

if "end_time_ts" in df_q.columns :
    df_q = df_q.withColumn("trip_duration_sec", F.unix_timestamp("end_time_ts")- F.unix_timestamp("start_time_ts"))
    df_q = df_q.filter( (F.col("trip_duration_sec").isNull()) | ((F.col("trip_duration_sec") >= 0) & (F.col("trip_duration_sec") < 86400)) )

if "severity" in df_q.columns:
    df_q = df_q.withColumn("severity", F.when((F.col("severity") >= 1) & (F.col("severity") <= 5),F.col("severity")).otherwise(None))

for c in ["City","State","Zipcode","Country"]:
    if c in df_q.columns:
        df_q = df_q.withColumn(c, F.coalesce(F.col(c), F.lit("Unknown")))

print("Count after Q rules:", df_q.count())
display(df_q.limit(20))


In [0]:
(
    df_q
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .partitionBy("accident_date")
    .save(SILVER_PATH)
)

print("Silver written to:", SILVER_PATH)

In [0]:
df_silver = spark.read.format("delta").load("dbfs:/delta/silver")
display(df_silver)

In [0]:
from delta.tables import DeltaTable

source_df = df_q

TEMP_BATCH = "dbfs:/delta/batches/temp_batch"
source_df.write.format("delta").mode("overwrite").save(TEMP_BATCH)

delta_silver = DeltaTable.forPath(spark, SILVER_PATH)
source = spark.read.format("delta").load(TEMP_BATCH)

(
  delta_silver.alias("t")
  .merge(
    source.alias("s"), "t.ID = s.ID"
  )
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
  ) 

print("Merge completed") 

In [0]:
def reprocess_partition(partition_date_str, batch_df):

    temp = f"dbfs:/delta/batches/reprocess_{partition_date_str}"
    batch_df.write.format("delta").mode("overwrite").save(temp)

    delta_silver = DeltaTable.forPath(spark, SILVER_PATH)
    src = spark.read.format("delta").load(temp)
    (
      delta_silver.alias("t")
      .merge(
        src.alias("s"), "t.ID = s.ID"
      )
      .whenMatchedUpdateAll()
      .whenNotMatchedInsertAll()
      .execute()
      ) 

    print(f"Reprocess completed for {partition_date_str}")

    

In [0]:
from datetime import datetime

Checks = []

df_s = spark.read.format("delta").load(SILVER_PATH)

Checks.append(("row_count", df_s.count()>0))

crit_cols = ["ID","start_time_ts","start_lat","start_lng",]
null_checks = {c: df_s.filter(F.col(c).isNull()).count() for c in crit_cols}
for c, n in null_checks.items():
    Checks.append((f"null_{c}", n == 0))  

if "severity" in df_s.columns:
    invalid_sev = df_s.filter((F.col("severity").isNotNull()) & ~((F.col("severity")>=1)&(F.col("severity")<=5))).count()
    Checks.append(("severity_valid", invalid_sev==0))

for name, ok in Checks:
    print(f"{name} -> {'OK' if ok else 'FAILED'}")

dq = spark.createDataFrame([ (datetime.now().isoformat(), "silver_quality", all([ok for _,ok in Checks])) ],
                           ["check_time","check","status"])
dq.write.format("delta").mode("append").save("dbfs:/delta/quality_checks")