
<div  style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://raw.githubusercontent.com/derar-alhussein/Databricks-Certified-Data-Engineer-Professional/main/Includes/images/customers_orders.png" width="60%">
</div>

In [0]:
%run ../Includes/Copy-Datasets

Data catalog: workspace
Schema: bookstore_eng_pro


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def batch_upsert(microBatchDF, batchId):
    window = Window.partitionBy("order_id", "customer_id").orderBy(F.col("_commit_timestamp").desc())
    
    (microBatchDF.filter(F.col("_change_type").isin(["insert", "update_postimage"]))
                 .withColumn("rank", F.rank().over(window))
                 .filter("rank = 1")
                 .drop("rank", "_change_type", "_commit_version")
                 .withColumnRenamed("_commit_timestamp", "processed_timestamp")
                 .createOrReplaceTempView("ranked_updates"))
    
    query = """
        MERGE INTO customers_orders c
        USING ranked_updates r
        ON c.order_id=r.order_id AND c.customer_id=r.customer_id
            WHEN MATCHED AND c.processed_timestamp < r.processed_timestamp
              THEN UPDATE SET *
            WHEN NOT MATCHED
              THEN INSERT *
    """
    
    microBatchDF.sparkSession.sql(query)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS customers_orders
(order_id STRING, order_timestamp Timestamp, customer_id STRING, quantity BIGINT, total BIGINT, books ARRAY<STRUCT<book_id STRING, quantity BIGINT, subtotal BIGINT>>, email STRING, first_name STRING, last_name STRING, gender STRING, street STRING, city STRING, country STRING, row_time TIMESTAMP, processed_timestamp TIMESTAMP)

In [0]:
def process_customers_orders():
    orders_df = spark.readStream.table("orders_silver")
    
    cdf_customers_df = (spark.readStream
                             .option("readChangeData", True)
                             .option("startingVersion", 2)
                             .table("customers_silver")
                       )

    query = (orders_df
                .join(cdf_customers_df, ["customer_id"], "inner")
                .writeStream
                    .foreachBatch(batch_upsert)
                    .option("checkpointLocation", f"{bookstore.checkpoint_path}/customers_orders")
                    .trigger(availableNow=True)
                    .start()
            )
    
    query.awaitTermination()
    
process_customers_orders()

25/12/29 09:57:30 Spark Server has not sent updates for Streaming Query 54e51c09-a4fc-44a3-9466-ee3f8c7d3f1b in60 seconds, but the query is still active. Marking query as in-progress. Spark Session ID is c84ebf7e-21dd-4a60-bdc1-f28583a996f4. This is typically not a problem.


In [0]:
%sql
SELECT * FROM customers_orders

order_id,order_timestamp,customer_id,quantity,total,books,email,first_name,last_name,gender,street,city,country,row_time,processed_timestamp
4989,2022-03-19T12:09:00.000Z,C00522,1,20,"List(List(B04, 1, 20))",cflatleygc@simplemachines.org,Casie,Flatley,Female,59 Quincy Way,Frederico Westphalen,Brazil,2022-04-04T20:51:00.000Z,2025-12-28T10:49:41.000Z
5041,2022-03-22T05:09:00.000Z,C00522,1,20,"List(List(B04, 1, 20))",cflatleygc@simplemachines.org,Casie,Flatley,Female,59 Quincy Way,Frederico Westphalen,Brazil,2022-04-04T20:51:00.000Z,2025-12-28T10:49:41.000Z
5043,2022-03-22T09:09:00.000Z,C00522,1,20,"List(List(B04, 1, 20))",cflatleygc@simplemachines.org,Casie,Flatley,Female,59 Quincy Way,Frederico Westphalen,Brazil,2022-04-04T20:51:00.000Z,2025-12-28T10:49:41.000Z
5066,2022-03-23T12:09:00.000Z,C00522,2,88,"List(List(B08, 1, 41), List(B05, 1, 47))",cflatleygc@simplemachines.org,Casie,Flatley,Female,59 Quincy Way,Frederico Westphalen,Brazil,2022-04-04T20:51:00.000Z,2025-12-28T10:49:41.000Z
5067,2022-03-23T13:09:00.000Z,C00522,1,20,"List(List(B04, 1, 20))",cflatleygc@simplemachines.org,Casie,Flatley,Female,59 Quincy Way,Frederico Westphalen,Brazil,2022-04-04T20:51:00.000Z,2025-12-28T10:49:41.000Z
5071,2022-03-23T17:09:00.000Z,C00522,1,20,"List(List(B04, 1, 20))",cflatleygc@simplemachines.org,Casie,Flatley,Female,59 Quincy Way,Frederico Westphalen,Brazil,2022-04-04T20:51:00.000Z,2025-12-28T10:49:41.000Z
5147,2022-03-27T17:09:00.000Z,C00522,3,92,"List(List(B09, 2, 48), List(B10, 1, 44))",cflatleygc@simplemachines.org,Casie,Flatley,Female,59 Quincy Way,Frederico Westphalen,Brazil,2022-04-04T20:51:00.000Z,2025-12-28T10:49:41.000Z
5361,2022-04-06T12:02:00.000Z,C00591,2,87,"List(List(B01, 1, 49), List(B11, 1, 38))",wgiveenmr@studiopress.com,Willabella,Giveen,Female,458 Ryan Hill,Vitry-le-François,France,2022-04-07T15:26:00.000Z,2025-12-28T10:49:41.000Z
5363,2022-04-06T15:02:00.000Z,C00591,1,47,"List(List(B05, 1, 47))",wgiveenmr@studiopress.com,Willabella,Giveen,Female,458 Ryan Hill,Vitry-le-François,France,2022-04-07T15:26:00.000Z,2025-12-28T10:49:41.000Z
5488,2022-04-12T09:02:00.000Z,C00591,1,47,"List(List(B05, 1, 47))",wgiveenmr@studiopress.com,Willabella,Giveen,Female,458 Ryan Hill,Vitry-le-François,France,2022-04-07T15:26:00.000Z,2025-12-28T10:49:41.000Z


In [0]:
bookstore.load_new_data()
bookstore.process_bronze()
bookstore.process_orders_silver()
bookstore.process_customers_silver()

process_customers_orders()

Loading kafka-streaming-06.json file to the bookstore dataset


25/12/29 10:00:32 Spark Server has not sent updates for Streaming Query b03b8d16-7e0c-468f-839a-96444f465b24 in60 seconds, but the query is still active. Marking query as in-progress. Spark Session ID is c84ebf7e-21dd-4a60-bdc1-f28583a996f4. This is typically not a problem.
25/12/29 10:00:32 Spark Server has not sent updates for Streaming Query b03b8d16-7e0c-468f-839a-96444f465b24 in60 seconds, but the query is still active. Marking query as in-progress. Spark Session ID is c84ebf7e-21dd-4a60-bdc1-f28583a996f4. This is typically not a problem.
25/12/29 10:00:33 Spark Server has not sent updates for Streaming Query b03b8d16-7e0c-468f-839a-96444f465b24 in60 seconds, but the query is still active. Marking query as in-progress. Spark Session ID is c84ebf7e-21dd-4a60-bdc1-f28583a996f4. This is typically not a problem.
25/12/29 10:00:34 Spark Server has not sent updates for Streaming Query b03b8d16-7e0c-468f-839a-96444f465b24 in60 seconds, but the query is still active. Marking query as in-p

In [0]:
%sql
SELECT count(*) FROM customers_orders

count(*)
571
