In [0]:
%run ../config/init_config


In [0]:
country_path = "file:/Workspace/Users/joelledidanera@gmail.com/03_Databricks_Medallion/includes/country.json"

country_df = spark.read.json(country_path)
display(country_df)

In [0]:
from pyspark.sql.functions import from_json, to_date, lit, col, row_number, broadcast
from pyspark.sql.window import Window

customer_schema = "customer_id STRING, name STRING, age INT, country_code STRING"

query = (spark.readStream
              .format("delta")
              .table("bronze_raw")
              .filter("source_type = 'customers'")
              .select(from_json("value", customer_schema).alias("v"), "processed_time")
              .select("v.*", "processed_time")
              .join(broadcast(country_df)).where(col("country_code") == country_df["code"])
              .writeStream
              .format("delta")
              .option("checkpointLocation", silver_checkpoint_path + "/customers")
              .trigger(availableNow=True)
              .table("customer_silver")
              )
                      

query.awaitTermination()
customer_silver = spark.table("customer_silver")
display(customer_silver)

In [0]:
sales_schema = "sale_id STRING, order_id STRING, vehicle_id STRING, sale_date STRING, amount FLOAT"

query = (spark.readStream
              .format("delta")
              .table("bronze_raw")
              .filter("source_type = 'sales'")
              .select(from_json("value", sales_schema).alias("v"), "processed_time")
              .select("v.*", "processed_time")
              .withColumn("sale_date", to_date("sale_date", "yyyy-mm-dd"))
              .writeStream
              .format("delta")
              .option("checkpointLocation", silver_checkpoint_path + "/sales")
              .trigger(availableNow=True)
              .table("sales_silver")
              )
                      

query.awaitTermination()
sales_silver = spark.table("sales_silver")
display(sales_silver)

In [0]:
orders_schema = "order_id STRING, customer_id STRING, order_date STRING"

query = (spark.readStream
              .format("delta")
              .table("bronze_raw")
              .filter("source_type = 'orders'")
              .select(from_json("value", orders_schema).alias("v"), "processed_time")
              .select("v.*", "processed_time")
              .withColumn("order_date", to_date("order_date", "yyyy-mm-dd"))
              .writeStream
              .format("delta")
              .option("checkpointLocation", silver_checkpoint_path + "/orders")
              .trigger(availableNow=True)
              .table("order_silver")
              )
                      

query.awaitTermination()
order_silver = spark.table("order_silver")
display(order_silver)

In [0]:

vehicle_schema = "vehicle_id STRING, brand STRING, price FLOAT, model STRING, country_code STRING"

query = (spark.readStream
              .format("delta")
              .table("bronze_raw")
              .filter("source_type = 'vehicles'")
              .select(from_json("value", vehicle_schema).alias("v"), "processed_time")
              .select("v.*", "processed_time")
              .join(broadcast(country_df)).where(col("country_code") == country_df["code"])
              .writeStream
              .format("delta")
              .option("checkpointLocation", silver_checkpoint_path + "/vehicles")
              .trigger(availableNow=True)
              .table("vehicle_silver")
              )
                      

query.awaitTermination()
vehicle_silver = spark.table("vehicle_silver")
display(vehicle_silver)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS current_vehicle_silver (
  vehicle_id STRING,
  brand STRING,
  model STRING,
  price DOUBLE,
  country_code STRING,
  country_label STRING,
  expired_at TIMESTAMP,
  is_current BOOLEAN
) USING delta;


In [0]:
merge_script = """
MERGE INTO current_vehicle_silver t
USING (
    SELECT *, s.vehicle_id as merge_id FROM update_vehicle s
    UNION ALL
    SELECT s.*, NULL as merge_id FROM update_vehicle s
    JOIN current_vehicle_silver t ON s.vehicle_id = t.vehicle_id
    WHERE t.is_current = true
    ) as vehicle_updates
ON t.vehicle_id = vehicle_updates.merge_id AND t.is_current = true
WHEN MATCHED THEN
  UPDATE SET t.is_current = false, t.expired_at = current_timestamp()
WHEN NOT MATCHED THEN
  INSERT *
""" 
def process_each_vehicle_batch(batch_df, batch_id):
    window = Window.partitionBy("vehicle_id").orderBy(col("processed_time").desc())

    batch_df = (
        batch_df
        .withColumn("row_num", row_number().over(window))
        .filter(col("row_num") == 1)  # garde la plus récente
        .drop("row_num")
        .withColumn("expired_at", lit(None).cast("timestamp"))
        .withColumn("is_current", lit(True).cast("boolean"))
        .withColumnRenamed("label", "country_label")
        .dropDuplicates(['vehicle_id'])
      )
    batch_df.createOrReplaceTempView("update_vehicle")
    #display(batch_df)
    batch_df.sparkSession.sql(merge_script)


query = (spark.readStream
              .table("vehicle_silver")
              .writeStream
              .foreachBatch(process_each_vehicle_batch)
              .option("mergeSchema", "true")
              .option("checkpointLocation", silver_checkpoint_path + "/current_vehicle")
              .trigger(availableNow=True)
              .start()
              )

query.awaitTermination()
current_vehicle_silver = spark.table("current_vehicle_silver")
display(current_vehicle_silver)

dbutils.fs.rm(silver_checkpoint_path + "/customers", True)
dbutils.fs.rm(silver_checkpoint_path + "/sales", True)
dbutils.fs.rm(silver_checkpoint_path + "/vehicles", True)
dbutils.fs.rm(silver_checkpoint_path + "/current_vehicle", True)
dbutils.fs.rm(bronze_checkpoint_path, True)

dbutils.fs.rm(silver_checkpoint_path + "/vehicles", recurse=True)