From daabe976671795e2fedf58a156f880cf259024fe Mon Sep 17 00:00:00 2001 From: Alexander Vanadio Date: Mon, 25 Mar 2024 15:43:03 -0400 Subject: [PATCH 01/12] adding initial UC configuration and code --- .../01-CDC-CDF-simple-pipeline.py | 24 ++++++++++++------- .../cdc-pipeline/_resources/00-setup.py | 1 + .../cdc-pipeline/_resources/bundle_config.py | 17 ++++++++++++- 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py b/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py index 5c63845a..302cd5e0 100644 --- a/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py +++ b/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py @@ -84,7 +84,7 @@ (bronzeDF.withColumn("file_name", input_file_name()).writeStream .option("checkpointLocation", cloud_storage_path+"/checkpoint_cdc_raw") .trigger(processingTime='10 seconds') - .table("clients_cdc")) + .table(f"{catalog}.{db}.clients_cdc")) time.sleep(20) @@ -109,11 +109,17 @@ # COMMAND ---------- -# DBTITLE 1,We can now create our client table using standard SQL command +# DBTITLE 1,We can now create our client table using a standard SQL command # MAGIC %sql # MAGIC -- we can add NOT NULL in our ID field (or even more advanced constraint) -# MAGIC CREATE TABLE IF NOT EXISTS retail_client_silver (id BIGINT NOT NULL, name STRING, address STRING, email STRING, operation STRING) -# MAGIC TBLPROPERTIES (delta.enableChangeDataFeed = true, delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true); +# MAGIC CREATE TABLE IF NOT EXISTS retail_client_silver ( +# MAGIC id BIGINT NOT NULL, +# MAGIC name STRING, +# MAGIC address STRING, +# MAGIC email STRING, +# MAGIC operation STRING, +# MAGIC CONSTRAINT id_pk PRIMARY KEY(id)) +# MAGIC TBLPROPERTIES (delta.enableChangeDataFeed = true, delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true); # COMMAND ---------- @@ -135,7 +141,7 @@ def merge_stream(df, i): WHEN NOT MATCHED AND source.operation != 'DELETE' THEN INSERT *""") spark.readStream \ - .table("clients_cdc") \ + .table(f"{catalog}.{db}.clients_cdc") \ .writeStream \ .foreachBatch(merge_stream) \ .option("checkpointLocation", cloud_storage_path+"/checkpoint_clients_cdc") \ @@ -153,11 +159,11 @@ def merge_stream(df, i): # MAGIC %md # MAGIC ### Testing the first CDC layer -# MAGIC Let's send a new CDC entry to simulate an update and a DELETE for the ID 1 and 2 +# MAGIC Let's send a new CDC entry to simulate an update and a DELETE for the ID 1000 and 2000 # COMMAND ---------- -# DBTITLE 1,Let's UPDATE id=1 and DELETE the row with id=2 +# DBTITLE 1,Let's UPDATE id=1000 and DELETE the row with id=2000 # MAGIC %sql # MAGIC insert into clients_cdc (id, name, address, email, operation_date, operation, _rescued_data, file_name) values # MAGIC (1000, "Quentin", "Paris 75020", "quentin.ambard@databricks.com", now(), "UPDATE", null, null), @@ -231,7 +237,7 @@ def merge_stream(df, i): changes = spark.read.format("delta") \ .option("readChangeData", "true") \ .option("startingVersion", int(last_version) -1) \ - .table("retail_client_silver") + .table(f"{catalog}.{db}.retail_client_silver") display(changes) # COMMAND ---------- @@ -280,7 +286,7 @@ def upsertToDelta(data, batchId): (spark.readStream .option("readChangeData", "true") .option("startingVersion", 1) - .table("retail_client_silver") + .table(f"{catalog}.{db}.retail_client_silver") .withColumn("gold_data", lit("Delta CDF is Awesome")) .writeStream .foreachBatch(upsertToDelta) diff --git a/product_demos/cdc-pipeline/_resources/00-setup.py b/product_demos/cdc-pipeline/_resources/00-setup.py index cf5ca476..1c66e80a 100644 --- a/product_demos/cdc-pipeline/_resources/00-setup.py +++ b/product_demos/cdc-pipeline/_resources/00-setup.py @@ -16,6 +16,7 @@ raw_data_location = cloud_storage_path+"/delta_cdf" if reset_all_data or is_folder_empty(raw_data_location+"/user_csv"): + spark.sql(f"USE {catalog}.{db}") spark.sql("""DROP TABLE if exists clients_cdc""") spark.sql("""DROP TABLE if exists retail_client_silver""") #data generation on another notebook to avoid installing libraries (takes a few seconds to setup pip env) diff --git a/product_demos/cdc-pipeline/_resources/bundle_config.py b/product_demos/cdc-pipeline/_resources/bundle_config.py index ed25da29..415d91b6 100644 --- a/product_demos/cdc-pipeline/_resources/bundle_config.py +++ b/product_demos/cdc-pipeline/_resources/bundle_config.py @@ -53,5 +53,20 @@ "title": "Delta Lake Performance & operation", "description": "Programatically ingest multiple CDC flows to synch all your database." } - ] + ], + "cluster": { + "spark_version": "14.3.x-scala2.12", + "spark_conf": { + "spark.master": "local[*, 4]", + "spark.databricks.cluster.profile": "singleNode" + }, + "custom_tags": { + "ResourceClass": "SingleNode" + }, + "num_workers": 0, + "single_user_name": "{{CURRENT_USER}}", + "data_security_mode": "SINGLE_USER", + "node_type_id": "m5.large", + "driver_node_type_id": "m5.large", + } } From 735269fcf9e7c8d97522779d4e4d82da037822e1 Mon Sep 17 00:00:00 2001 From: alexvanadio-databricks Date: Tue, 26 Mar 2024 17:29:29 +0000 Subject: [PATCH 02/12] modified gold use case, added documentation, changed some streaming semantics --- .../01-CDC-CDF-simple-pipeline.py | 221 +++++++++++++----- .../cdc-pipeline/_resources/00-setup.py | 4 +- 2 files changed, 169 insertions(+), 56 deletions(-) diff --git a/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py b/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py index 302cd5e0..4914182a 100644 --- a/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py +++ b/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py @@ -73,20 +73,18 @@ # DBTITLE 1,We need to keep the cdc information, however csv isn't a efficient storage. Let's put that in a Delta table instead: bronzeDF = (spark.readStream - .format("cloudFiles") - .option("cloudFiles.format", "csv") - #.option("cloudFiles.maxFilesPerTrigger", "1") #Simulate streaming, remove in production - .option("cloudFiles.inferColumnTypes", "true") - .option("cloudFiles.schemaLocation", cloud_storage_path+"/schema_cdc_raw") - .option("cloudFiles.schemaHints", "id bigint, operation_date timestamp") - .load(raw_data_location+'/user_csv')) + .format("cloudFiles") + .option("cloudFiles.format", "csv") + .option("cloudFiles.inferColumnTypes", "true") + .option("cloudFiles.schemaLocation", cloud_storage_path+"/schema_cdc_raw") + .option("cloudFiles.schemaHints", "id bigint, operation_date timestamp") + .load(raw_data_location+'/user_csv')) (bronzeDF.withColumn("file_name", input_file_name()).writeStream .option("checkpointLocation", cloud_storage_path+"/checkpoint_cdc_raw") - .trigger(processingTime='10 seconds') - .table(f"{catalog}.{db}.clients_cdc")) - -time.sleep(20) + .trigger(availableNow=True) + .table(f"`{catalog}`.`{dbName}`.clients_cdc") + .awaitTermination()) # COMMAND ---------- @@ -124,11 +122,26 @@ # COMMAND ---------- # DBTITLE 1,And run our MERGE statement the upsert the CDC information in our final table -#for each batch / incremental update from the raw cdc table, we'll run a MERGE on the silver table -def merge_stream(df, i): +def merge_stream(df: DataFrame, i): + """ + Processes a microbatch of CDC (Change Data Capture) data to merge it into the 'retail_client_silver' table. + This method performs deduplication and upserts or deletes records based on the operation specified in each row. + + Args: + df (DataFrame): The DataFrame representing the microbatch of CDC data. + i (int): The batch ID, not directly used in this process. + + The method performs these steps: + 1. Temporarily registers the DataFrame as 'clients_cdc_microbatch' to allow SQL operations. + 2. Deduplicates the incoming data by 'id', keeping the latest operation for each 'id'. + 3. Executes a MERGE SQL operation on 'retail_client_silver': + - Deletes records if the latest operation for an 'id' is 'DELETE'. + - Updates records for an 'id' if the latest operation is not 'DELETE'. + - Inserts new records if an 'id' does not exist in 'retail_client_silver' and the operation is not 'DELETE'. + """ + df.createOrReplaceTempView("clients_cdc_microbatch") - #First we need to dedup the incoming data based on ID (we can have multiple update of the same row in our incoming data) - #Then we run the merge (upsert or delete). We could do it with a window and filter on rank() == 1 too + df._jdf.sparkSession().sql("""MERGE INTO retail_client_silver target USING (select id, name, address, email, operation from @@ -140,15 +153,20 @@ def merge_stream(df, i): WHEN MATCHED AND source.operation != 'DELETE' THEN UPDATE SET * WHEN NOT MATCHED AND source.operation != 'DELETE' THEN INSERT *""") -spark.readStream \ - .table(f"{catalog}.{db}.clients_cdc") \ - .writeStream \ - .foreachBatch(merge_stream) \ - .option("checkpointLocation", cloud_storage_path+"/checkpoint_clients_cdc") \ - .trigger(processingTime='10 seconds') \ - .start() +def trigger_silver_stream(): + """ + Initiates a structured streaming process that reads change data capture (CDC) records from a specified table and processes them in batches using a custom merge function. The process is designed to handle streaming updates efficiently, applying changes to a 'silver' table based on the incoming stream. + """ + (spark.readStream + .table(f"`{catalog}`.`{dbName}`.clients_cdc") + .writeStream + .foreachBatch(merge_stream) + .option("checkpointLocation", cloud_storage_path+"/checkpoint_clients_cdc") + .trigger(availableNow=True) + .start() + .awaitTermination()) -time.sleep(20) +trigger_silver_stream() # COMMAND ---------- @@ -166,18 +184,18 @@ def merge_stream(df, i): # DBTITLE 1,Let's UPDATE id=1000 and DELETE the row with id=2000 # MAGIC %sql # MAGIC insert into clients_cdc (id, name, address, email, operation_date, operation, _rescued_data, file_name) values -# MAGIC (1000, "Quentin", "Paris 75020", "quentin.ambard@databricks.com", now(), "UPDATE", null, null), -# MAGIC (2000, null, null, null, now(), "DELETE", null, null); +# MAGIC (1000, "Quentin", "123 Paper Street, VA 75020", "quentin.ambard@databricks.com", now(), "UPDATE", null, null), +# MAGIC (2000, null, null, null, now(), "DELETE", null, null); +# MAGIC # MAGIC select * from clients_cdc where id in (1000, 2000); # COMMAND ---------- -#wait for the stream to get the new data -time.sleep(20) +# explicitly trigger the stream in our example; It's equally easy to just have the stream run 24/7 +trigger_silver_stream() # COMMAND ---------- -# DBTITLE 1,Wait a few seconds for the stream to catch the new entry in the CDC table and check the results in the main table # MAGIC %sql # MAGIC select * from retail_client_silver where id in (1000, 2000); # MAGIC -- Note that ID 1000 has been updated, and ID 2000 is deleted @@ -209,7 +227,7 @@ def merge_stream(df, i): # MAGIC ALTER TABLE retail_client_silver SET TBLPROPERTIES (delta.enableChangeDataFeed = true); # MAGIC # MAGIC -- Delta Lake CDF works using table_changes function: -# MAGIC SELECT * FROM table_changes('retail_client_silver', 1) order by id +# MAGIC SELECT * FROM table_changes('retail_client_silver', 1) order by id # COMMAND ---------- @@ -237,16 +255,14 @@ def merge_stream(df, i): changes = spark.read.format("delta") \ .option("readChangeData", "true") \ .option("startingVersion", int(last_version) -1) \ - .table(f"{catalog}.{db}.retail_client_silver") + .table(f"`{catalog}`.`{dbName}`.retail_client_silver") display(changes) # COMMAND ---------- # MAGIC %md ### Synchronizing our downstream GOLD table based from the Silver changes # MAGIC -# MAGIC Let's now say that we want to perform another table enhancement and propagate these changes downstream. -# MAGIC -# MAGIC To keep this example simple, we'll just add a column name `gold_data` with random data, but in real world this could be an aggregation, a join with another datasource, an ML model etc. +# MAGIC Let's now say that we want to know how many people there currently are by state. # MAGIC # MAGIC The same logic as the Silver layer must be implemented. Since we now consume the CDF data, we also need to perform a deduplication stage. Let's do it using the python APIs this time for the example. # MAGIC @@ -256,47 +272,142 @@ def merge_stream(df, i): # DBTITLE 1,Let's create or final GOLD table: retail_client_gold # MAGIC %sql -# MAGIC CREATE TABLE IF NOT EXISTS retail_client_gold (id BIGINT NOT NULL, name STRING, address STRING, email STRING, gold_data STRING); +# MAGIC CREATE TABLE IF NOT EXISTS retail_client_gold (state STRING, count LONG); # COMMAND ---------- +# MAGIC %md +# MAGIC +# MAGIC Now we can create our initial Gold table using the latest version of our Siler table. Keep in mind that we are **not** looking at the Change Data Feed (CDF) here. We are utilizing the latest version of our siler table that is synced with our external table. + +# COMMAND ---------- + +from pyspark.sql.functions import regexp_extract + +state_pattern = "([A-Z]{2}) [0-9]{5}" + +(spark.read + .table(f"`{catalog}`.`{dbName}`.retail_client_silver") + .withColumn("state", regexp_extract("address", state_pattern, 1)) + .groupBy("state") + .count() + .orderBy("state") + .write + .mode("overwrite") + .saveAsTable(f"`{catalog}`.`{dbName}`.retail_client_gold")) + +spark.sql("SELECT * FROM retail_client_gold ORDER BY state").display() + + +# COMMAND ---------- + +# DBTITLE 1,Define the MERGE behavior from pyspark.sql.window import Window -from pyspark.sql.functions import dense_rank, regexp_replace, lit +from pyspark.sql.functions import dense_rank, regexp_replace, lit, sum as psum, expr + +def updateGoldCounts(data: DataFrame, batchId): + """ + Updates gold counts for a dataset based on changes captured in a Change Data Feed (CDF). + It deduplicates records, extracts states from addresses, and calculates the net change in counts by state + before merging these changes into a gold table. + + Args: + data (DataFrame): The input DataFrame containing the CDF records. + batchId (str): The batch ID for the current update. Not directly used in the process + + The method follows these steps: + 1. Deduplicates the data based on the 'id' field, keeping only the most recent update for each id. + 2. Extracts the 'state' from the 'address' field using a regular expression. + 3. Calculates a 'value' for each record to represent the net change in counts (1 for inserts and post-updates, + -1 for deletes and pre-updates, and a large negative number for unrecognized change types as an error state). + 4. Aggregates these values by 'state' to get the net change in counts per state. + 5. Merges these aggregated changes into the 'retail_client_gold' DeltaTable, updating the 'count' for each state + based on the calculated net change. + """ -#Function to upsert `microBatchOutputDF` into Delta table using MERGE -def upsertToDelta(data, batchId): - #First we need to deduplicate based on the id and take the most recent update windowSpec = Window.partitionBy("id").orderBy(col("_commit_version").desc()) - #Select only the first value - #getting the latest change is still needed if the cdc contains multiple time the same id. We can rank over the id and get the most recent _commit_version - data_deduplicated = data.withColumn("rank", dense_rank().over(windowSpec)).where("rank = 1 and _change_type!='update_preimage'").drop("_commit_version", "rank") - - #Add some data cleaning for the gold layer to remove quotes from the address - data_deduplicated = data_deduplicated.withColumn("address", regexp_replace(col("address"), "\"", "")) + data_deduplicated = data.withColumn("rank", dense_rank().over(windowSpec)).where("rank = 1").drop("_commit_version", "rank") + + deduped_with_state = data_deduplicated.withColumn("state", regexp_extract("address", state_pattern, 1)) + + date_pre_aggregation = (deduped_with_state.withColumn("value", expr(""" + CASE + WHEN _change_type = 'insert' THEN 1 + WHEN _change_type = 'delete' THEN -1 + WHEN _change_type = 'update_preimage' THEN -1 + WHEN _change_type = 'update_postimage' THEN 1 + ELSE -9999 + END + """))) + + aggregated_by_state = (date_pre_aggregation.groupBy("state") + .agg(psum("value").alias("offset"))) - #run the merge in the gold table directly (DeltaTable.forName(spark, "retail_client_gold").alias("target") - .merge(data_deduplicated.alias("source"), "source.id = target.id") - .whenMatchedDelete("source._change_type = 'delete'") - .whenMatchedUpdateAll("source._change_type != 'delete'") - .whenNotMatchedInsertAll("source._change_type != 'delete'") + .merge(aggregated_by_state.alias("source"), "source.state = target.state") + .whenMatchedUpdate(set={ + "count": expr("target.count + source.offset") + }) .execute()) +# COMMAND ---------- + +# DBTITLE 1,Start the gold stream +last_version = str(DeltaTable.forName(spark, "retail_client_silver").history(1).head()["version"]) + (spark.readStream .option("readChangeData", "true") - .option("startingVersion", 1) - .table(f"{catalog}.{db}.retail_client_silver") - .withColumn("gold_data", lit("Delta CDF is Awesome")) + .option("startingVersion", last_version) + .table(f"`{catalog}`.`{dbName}`.retail_client_silver") .writeStream - .foreachBatch(upsertToDelta) + .trigger(processingTime="5 seconds") + .foreachBatch(updateGoldCounts) .start()) -time.sleep(20) +# COMMAND ---------- + +# MAGIC %sql +# MAGIC +# MAGIC insert into clients_cdc (id, name, address, email, operation_date, operation, _rescued_data, file_name) values +# MAGIC (77777, "Alexander", "0311 Donovan MewsHammondmouth, NJ 51685", "alexander@databricks.com", now(), "APPEND", null, null), +# MAGIC (88888, "Faith", "48764 Howard Forge Apt. 421Vanessaside, NJ 79393", "faith@databricks.com", now(), "APPEND", null, null), +# MAGIC (1000, null, null, null, now(), "DELETE", null, null); + +# COMMAND ---------- + +# pull the CDC changes from bronze through silver +trigger_silver_stream() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC Let's make sure the new records made it into silver and the deleted record is gone. + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC +# MAGIC select * from retail_client_silver where id in (77777, 88888, 1000); + +# COMMAND ---------- + +# wait for the gold stream to trigger +time.sleep(10) # COMMAND ---------- -# MAGIC %sql SELECT * FROM retail_client_gold +# MAGIC %md +# MAGIC If everything is working properly, we expect to see the NJ count increase by 2. We also deleted a person who lived in VA, so we should see that decrease by 1. +# MAGIC +# MAGIC Please feel free to experiment with other scenarios by inserting change reecords. For example, what should happen when someone from NJ updates their record but is still in NJ? What about when someone moves from one state to another? + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC SELECT * +# MAGIC FROM retail_client_gold +# MAGIC WHERE state in ('NJ', 'VA') # COMMAND ---------- diff --git a/product_demos/cdc-pipeline/_resources/00-setup.py b/product_demos/cdc-pipeline/_resources/00-setup.py index 1c66e80a..eee750b7 100644 --- a/product_demos/cdc-pipeline/_resources/00-setup.py +++ b/product_demos/cdc-pipeline/_resources/00-setup.py @@ -16,9 +16,11 @@ raw_data_location = cloud_storage_path+"/delta_cdf" if reset_all_data or is_folder_empty(raw_data_location+"/user_csv"): - spark.sql(f"USE {catalog}.{db}") + print(f"USING `{catalog}`.`{dbName}` to drop all tables") + spark.sql(f"USE `{catalog}`.`{dbName}`") spark.sql("""DROP TABLE if exists clients_cdc""") spark.sql("""DROP TABLE if exists retail_client_silver""") + spark.sql("""DROP TABLE if exists retail_client_gold""") #data generation on another notebook to avoid installing libraries (takes a few seconds to setup pip env) print(f"Generating data under {raw_data_location} , please wait a few sec...") path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get() From 48667edfe4f9c8f732fa5dbabcf99d9fcd88980a Mon Sep 17 00:00:00 2001 From: alexvanadio-databricks Date: Tue, 26 Mar 2024 18:40:17 +0000 Subject: [PATCH 03/12] more interesting narrative --- .../01-CDC-CDF-simple-pipeline.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py b/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py index 4914182a..4ff6666d 100644 --- a/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py +++ b/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py @@ -184,7 +184,7 @@ def trigger_silver_stream(): # DBTITLE 1,Let's UPDATE id=1000 and DELETE the row with id=2000 # MAGIC %sql # MAGIC insert into clients_cdc (id, name, address, email, operation_date, operation, _rescued_data, file_name) values -# MAGIC (1000, "Quentin", "123 Paper Street, VA 75020", "quentin.ambard@databricks.com", now(), "UPDATE", null, null), +# MAGIC (1000, "Quentin", "123 Paper Street, UT 75020", "quentin.ambard@databricks.com", now(), "UPDATE", null, null), # MAGIC (2000, null, null, null, now(), "DELETE", null, null); # MAGIC # MAGIC select * from clients_cdc where id in (1000, 2000); @@ -278,7 +278,7 @@ def trigger_silver_stream(): # MAGIC %md # MAGIC -# MAGIC Now we can create our initial Gold table using the latest version of our Siler table. Keep in mind that we are **not** looking at the Change Data Feed (CDF) here. We are utilizing the latest version of our siler table that is synced with our external table. +# MAGIC Now we can create our initial Gold table using the latest version of our Silver table. Keep in mind that we are **not** looking at the Change Data Feed (CDF) here. We are utilizing the latest version of our siler table that is synced with our external table. Also note that some of these states are not real, and only for demonstration. # COMMAND ---------- @@ -296,7 +296,7 @@ def trigger_silver_stream(): .mode("overwrite") .saveAsTable(f"`{catalog}`.`{dbName}`.retail_client_gold")) -spark.sql("SELECT * FROM retail_client_gold ORDER BY state").display() +spark.sql("SELECT * FROM retail_client_gold ORDER BY count DESC LIMIT 10").display() # COMMAND ---------- @@ -370,8 +370,8 @@ def updateGoldCounts(data: DataFrame, batchId): # MAGIC %sql # MAGIC # MAGIC insert into clients_cdc (id, name, address, email, operation_date, operation, _rescued_data, file_name) values -# MAGIC (77777, "Alexander", "0311 Donovan MewsHammondmouth, NJ 51685", "alexander@databricks.com", now(), "APPEND", null, null), -# MAGIC (88888, "Faith", "48764 Howard Forge Apt. 421Vanessaside, NJ 79393", "faith@databricks.com", now(), "APPEND", null, null), +# MAGIC (77777, "Alexander", "0311 Donovan MewsHammondmouth, MT 51685", "alexander@databricks.com", now(), "APPEND", null, null), +# MAGIC (88888, "Faith", "48764 Howard Forge Apt. 421Vanessaside, MT 79393", "faith@databricks.com", now(), "APPEND", null, null), # MAGIC (1000, null, null, null, now(), "DELETE", null, null); # COMMAND ---------- @@ -398,16 +398,13 @@ def updateGoldCounts(data: DataFrame, batchId): # COMMAND ---------- # MAGIC %md -# MAGIC If everything is working properly, we expect to see the NJ count increase by 2. We also deleted a person who lived in VA, so we should see that decrease by 1. +# MAGIC If everything is working properly, we expect to see the MO count increase by 2. We also deleted a person who lived in UT, so we should see that decrease by 1. # MAGIC -# MAGIC Please feel free to experiment with other scenarios by inserting change reecords. For example, what should happen when someone from NJ updates their record but is still in NJ? What about when someone moves from one state to another? +# MAGIC Please feel free to experiment with other scenarios by inserting change reecords. For example, what should happen when someone from MO updates their record but is still in MO? What about when someone moves from one state to another? # COMMAND ---------- -# MAGIC %sql -# MAGIC SELECT * -# MAGIC FROM retail_client_gold -# MAGIC WHERE state in ('NJ', 'VA') +# MAGIC %sql SELECT * FROM retail_client_gold ORDER BY count DESC LIMIT 10 # COMMAND ---------- From 99a9409f5ea6286880b717800d3a21b964c348ab Mon Sep 17 00:00:00 2001 From: Alexander Vanadio Date: Tue, 26 Mar 2024 15:42:58 -0400 Subject: [PATCH 04/12] updated setup scripts to use v2 global; added method to global --- _resources/00-global-setup-v2.py | 15 ++++++++++++++ .../cdc-pipeline/_resources/00-setup.py | 20 ++++++++++--------- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/_resources/00-global-setup-v2.py b/_resources/00-global-setup-v2.py index 88ea5e47..c5bfcdbf 100644 --- a/_resources/00-global-setup-v2.py +++ b/_resources/00-global-setup-v2.py @@ -16,9 +16,24 @@ import requests import collections import os +import re class DBDemos(): + @staticmethod + def get_current_user(remove_symbols: False) -> str: + current_user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user') + + if not remove_symbols: + return current_user + + if current_user.rfind('@') > 0: + current_user_no_at = current_user[:current_user.rfind('@')] + else: + current_user_no_at = current_user + + return re.sub(r'\W+', '_', current_user_no_at) + @staticmethod def setup_schema(catalog, db, reset_all_data, volume_name = None): diff --git a/product_demos/cdc-pipeline/_resources/00-setup.py b/product_demos/cdc-pipeline/_resources/00-setup.py index eee750b7..274b73f4 100644 --- a/product_demos/cdc-pipeline/_resources/00-setup.py +++ b/product_demos/cdc-pipeline/_resources/00-setup.py @@ -3,24 +3,26 @@ # COMMAND ---------- -# MAGIC %run ../../../_resources/00-global-setup $reset_all_data=$reset_all_data $db_prefix=retail +# MAGIC %run ../../../_resources/00-global-setup-v2 $reset_all_data=$reset_all_data # COMMAND ---------- -import json -import time -from pyspark.sql.window import Window -from pyspark.sql.functions import row_number - reset_all_data = dbutils.widgets.get("reset_all_data") == "true" + +current_user = DBDemos.get_current_user() +catalog = "dbdemos" +database = DBDemos.get_current_user(remove_symbols=True) +cloud_storage_path = f"/Users/{current_user}/demos/{catalog}_{database}" raw_data_location = cloud_storage_path+"/delta_cdf" -if reset_all_data or is_folder_empty(raw_data_location+"/user_csv"): - print(f"USING `{catalog}`.`{dbName}` to drop all tables") - spark.sql(f"USE `{catalog}`.`{dbName}`") +if reset_all_data or DBDemos.is_folder_empty(raw_data_location+"/user_csv"): + DBDemos.setup_schema(catalog=catalog, db=database, reset_all_data=reset_all_data) + + spark.sql(f"USE {catalog}.{database}") spark.sql("""DROP TABLE if exists clients_cdc""") spark.sql("""DROP TABLE if exists retail_client_silver""") spark.sql("""DROP TABLE if exists retail_client_gold""") + #data generation on another notebook to avoid installing libraries (takes a few seconds to setup pip env) print(f"Generating data under {raw_data_location} , please wait a few sec...") path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get() From 3de2493cdbdbf3bf90978951b350e8a992650013 Mon Sep 17 00:00:00 2001 From: alexvanadio-databricks Date: Wed, 27 Mar 2024 16:59:43 +0000 Subject: [PATCH 05/12] added workaround code to fix bug in global setup --- _resources/00-global-setup-v2.py | 13 +++++++++++-- product_demos/cdc-pipeline/_resources/00-setup.py | 5 +++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/_resources/00-global-setup-v2.py b/_resources/00-global-setup-v2.py index c5bfcdbf..049b765a 100644 --- a/_resources/00-global-setup-v2.py +++ b/_resources/00-global-setup-v2.py @@ -18,10 +18,19 @@ import os import re - class DBDemos(): @staticmethod - def get_current_user(remove_symbols: False) -> str: + def get_current_user(remove_symbols: bool = False) -> str: + """ + Retrieves the current Databricks notebook user's email address, with an option to remove symbols and replace them with underscores. + + Args: + remove_symbols (bool): If True, removes all non-word characters from the email address, replacing them with underscores. Defaults to False. + + Returns: + str: The current user's email address. If `remove_symbols` is True, returns the email with symbols replaced by underscores. + """ + current_user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user') if not remove_symbols: diff --git a/product_demos/cdc-pipeline/_resources/00-setup.py b/product_demos/cdc-pipeline/_resources/00-setup.py index 274b73f4..d4a7b859 100644 --- a/product_demos/cdc-pipeline/_resources/00-setup.py +++ b/product_demos/cdc-pipeline/_resources/00-setup.py @@ -12,6 +12,11 @@ current_user = DBDemos.get_current_user() catalog = "dbdemos" database = DBDemos.get_current_user(remove_symbols=True) + +# WE NEED THIS BECAUSE OF AN ENCAPSULATION ISSUE (BUG?) linked below. +# https://github.com/databricks-demos/dbdemos-notebooks/blob/main/_resources/00-global-setup-v2.py#L44 +dbName = database + cloud_storage_path = f"/Users/{current_user}/demos/{catalog}_{database}" raw_data_location = cloud_storage_path+"/delta_cdf" From 92a4c13c70bdf4b062e3a335c29fe9c44c7e536a Mon Sep 17 00:00:00 2001 From: alexvanadio-databricks Date: Thu, 28 Mar 2024 14:56:36 +0000 Subject: [PATCH 06/12] removed get_current_user method from global v2 --- _resources/00-global-setup-v2.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/_resources/00-global-setup-v2.py b/_resources/00-global-setup-v2.py index 049b765a..71b60b10 100644 --- a/_resources/00-global-setup-v2.py +++ b/_resources/00-global-setup-v2.py @@ -19,30 +19,6 @@ import re class DBDemos(): - @staticmethod - def get_current_user(remove_symbols: bool = False) -> str: - """ - Retrieves the current Databricks notebook user's email address, with an option to remove symbols and replace them with underscores. - - Args: - remove_symbols (bool): If True, removes all non-word characters from the email address, replacing them with underscores. Defaults to False. - - Returns: - str: The current user's email address. If `remove_symbols` is True, returns the email with symbols replaced by underscores. - """ - - current_user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user') - - if not remove_symbols: - return current_user - - if current_user.rfind('@') > 0: - current_user_no_at = current_user[:current_user.rfind('@')] - else: - current_user_no_at = current_user - - return re.sub(r'\W+', '_', current_user_no_at) - @staticmethod def setup_schema(catalog, db, reset_all_data, volume_name = None): From dc73a81706cfd389f76c977c9e9e3dcfe44f25a0 Mon Sep 17 00:00:00 2001 From: alexvanadio-databricks Date: Thu, 28 Mar 2024 14:56:52 +0000 Subject: [PATCH 07/12] moving towards volumes instead of dbfs --- .../cdc-pipeline/_resources/00-setup.py | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/product_demos/cdc-pipeline/_resources/00-setup.py b/product_demos/cdc-pipeline/_resources/00-setup.py index d4a7b859..f6848578 100644 --- a/product_demos/cdc-pipeline/_resources/00-setup.py +++ b/product_demos/cdc-pipeline/_resources/00-setup.py @@ -7,20 +7,21 @@ # COMMAND ---------- -reset_all_data = dbutils.widgets.get("reset_all_data") == "true" +# Note: We do not recommend to change the catalog here as it won't impact all the demo resources such as DLT pipeline and Dashboards. Instead, please re-install the demo with a specific catalog and schema using +# dbdemos.install("lakehouse-retail-c360", catalog="..", schema="...") -current_user = DBDemos.get_current_user() catalog = "dbdemos" -database = DBDemos.get_current_user(remove_symbols=True) +db = "cdc_pipeline" +volume_name = "data" +reset_all_data = dbutils.widgets.get("reset_all_data") == "true" -# WE NEED THIS BECAUSE OF AN ENCAPSULATION ISSUE (BUG?) linked below. -# https://github.com/databricks-demos/dbdemos-notebooks/blob/main/_resources/00-global-setup-v2.py#L44 -dbName = database -cloud_storage_path = f"/Users/{current_user}/demos/{catalog}_{database}" -raw_data_location = cloud_storage_path+"/delta_cdf" +DBDemos.setup_schema(catalog, db, reset_all_data, volume_name) +volume_folder = f"/Volumes/{catalog}/{db}/{volume_name}" + +# COMMAND ---------- -if reset_all_data or DBDemos.is_folder_empty(raw_data_location+"/user_csv"): +if reset_all_data or DBDemos.is_folder_empty(volume_folder+"/user_csv"): DBDemos.setup_schema(catalog=catalog, db=database, reset_all_data=reset_all_data) spark.sql(f"USE {catalog}.{database}") @@ -29,11 +30,11 @@ spark.sql("""DROP TABLE if exists retail_client_gold""") #data generation on another notebook to avoid installing libraries (takes a few seconds to setup pip env) - print(f"Generating data under {raw_data_location} , please wait a few sec...") + print(f"Generating data under {volume_folder} , please wait a few sec...") path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get() parent_count = path[path.rfind("Delta-Lake-CDC-CDF"):].count('/') - 1 prefix = "./" if parent_count == 0 else parent_count*"../" prefix = f'{prefix}_resources/' - dbutils.notebook.run(prefix+"01-load-data", 120, {"raw_data_location": raw_data_location}) + dbutils.notebook.run(prefix+"01-load-data", 120, {"raw_data_location": volume_folder}) else: print("data already existing. Run with reset_all_data=true to force a data cleanup for your local demo.") From cc6b441cc2d9e47c2ee67f2506cca8b694f16dd6 Mon Sep 17 00:00:00 2001 From: Alexander Vanadio Date: Wed, 27 Mar 2024 13:39:09 -0400 Subject: [PATCH 08/12] use db instead of dbName fixes #11 --- _resources/00-global-setup-v2.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/_resources/00-global-setup-v2.py b/_resources/00-global-setup-v2.py index 71b60b10..5d791142 100644 --- a/_resources/00-global-setup-v2.py +++ b/_resources/00-global-setup-v2.py @@ -41,24 +41,24 @@ def use_and_create_db(catalog, dbName, cloud_storage_path = None): spark.sql(f"CREATE CATALOG IF NOT EXISTS `{catalog}`") if catalog == 'dbdemos': spark.sql(f"ALTER CATALOG `{catalog}` OWNER TO `account users`") - use_and_create_db(catalog, dbName) + use_and_create_db(catalog, db) if catalog == 'dbdemos': try: - spark.sql(f"GRANT CREATE, USAGE on DATABASE `{catalog}`.`{dbName}` TO `account users`") - spark.sql(f"ALTER SCHEMA `{catalog}`.`{dbName}` OWNER TO `account users`") - for t in spark.sql(f'SHOW TABLES in {catalog}.{dbName}').collect(): + spark.sql(f"GRANT CREATE, USAGE on DATABASE `{catalog}`.`{db}` TO `account users`") + spark.sql(f"ALTER SCHEMA `{catalog}`.`{db}` OWNER TO `account users`") + for t in spark.sql(f'SHOW TABLES in {catalog}.{db}').collect(): try: - spark.sql(f'GRANT ALL PRIVILEGES ON TABLE {catalog}.{dbName}.{t["tableName"]} TO `account users`') - spark.sql(f'ALTER TABLE {catalog}.{dbName}.{t["tableName"]} OWNER TO `account users`') + spark.sql(f'GRANT ALL PRIVILEGES ON TABLE {catalog}.{db}.{t["tableName"]} TO `account users`') + spark.sql(f'ALTER TABLE {catalog}.{db}.{t["tableName"]} OWNER TO `account users`') except Exception as e: if "NOT_IMPLEMENTED.TRANSFER_MATERIALIZED_VIEW_OWNERSHIP" not in str(e) and "STREAMING_TABLE_OPERATION_NOT_ALLOWED.UNSUPPORTED_OPERATION" not in str(e) : - print(f'WARN: Couldn t set table {catalog}.{dbName}.{t["tableName"]} owner to account users, error: {e}') + print(f'WARN: Couldn t set table {catalog}.{db}.{t["tableName"]} owner to account users, error: {e}') except Exception as e: print("Couldn't grant access to the schema to all users:"+str(e)) - print(f"using catalog.database `{catalog}`.`{dbName}`") - spark.sql(f"""USE `{catalog}`.`{dbName}`""") + print(f"using catalog.database `{catalog}`.`{db}`") + spark.sql(f"""USE `{catalog}`.`{db}`""") if volume_name: spark.sql(f'CREATE VOLUME IF NOT EXISTS {volume_name};') From 04f20082b3e82070d5eafedea4a435664f89fb94 Mon Sep 17 00:00:00 2001 From: alexvanadio-databricks Date: Thu, 28 Mar 2024 15:44:18 +0000 Subject: [PATCH 09/12] demo is now working with volumes --- .../01-CDC-CDF-simple-pipeline.py | 30 +++++++++++-------- .../cdc-pipeline/_resources/00-setup.py | 22 +++++++++----- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py b/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py index 4ff6666d..130d5336 100644 --- a/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py +++ b/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py @@ -60,8 +60,14 @@ # COMMAND ---------- +from pyspark.sql.functions import input_file_name, col +from pyspark.sql import DataFrame +import time + +# COMMAND ---------- + # DBTITLE 1,Let's explore our incoming data. We receive CSV files with client information -cdc_raw_data = spark.read.option('header', "true").csv(raw_data_location+'/user_csv') +cdc_raw_data = spark.read.option('header', "true").csv(volume_folder+'/user_csv') display(cdc_raw_data) # COMMAND ---------- @@ -76,14 +82,14 @@ .format("cloudFiles") .option("cloudFiles.format", "csv") .option("cloudFiles.inferColumnTypes", "true") - .option("cloudFiles.schemaLocation", cloud_storage_path+"/schema_cdc_raw") + .option("cloudFiles.schemaLocation", volume_folder+"/schema_cdc_raw") .option("cloudFiles.schemaHints", "id bigint, operation_date timestamp") - .load(raw_data_location+'/user_csv')) + .load(volume_folder+'/user_csv')) (bronzeDF.withColumn("file_name", input_file_name()).writeStream - .option("checkpointLocation", cloud_storage_path+"/checkpoint_cdc_raw") + .option("checkpointLocation", volume_folder+"/checkpoint_cdc_raw") .trigger(availableNow=True) - .table(f"`{catalog}`.`{dbName}`.clients_cdc") + .table(f"`{catalog}`.`{db}`.clients_cdc") .awaitTermination()) # COMMAND ---------- @@ -158,10 +164,10 @@ def trigger_silver_stream(): Initiates a structured streaming process that reads change data capture (CDC) records from a specified table and processes them in batches using a custom merge function. The process is designed to handle streaming updates efficiently, applying changes to a 'silver' table based on the incoming stream. """ (spark.readStream - .table(f"`{catalog}`.`{dbName}`.clients_cdc") + .table(f"`{catalog}`.`{db}`.clients_cdc") .writeStream .foreachBatch(merge_stream) - .option("checkpointLocation", cloud_storage_path+"/checkpoint_clients_cdc") + .option("checkpointLocation", volume_folder+"/checkpoint_clients_cdc") .trigger(availableNow=True) .start() .awaitTermination()) @@ -255,7 +261,7 @@ def trigger_silver_stream(): changes = spark.read.format("delta") \ .option("readChangeData", "true") \ .option("startingVersion", int(last_version) -1) \ - .table(f"`{catalog}`.`{dbName}`.retail_client_silver") + .table(f"`{catalog}`.`{db}`.retail_client_silver") display(changes) # COMMAND ---------- @@ -287,14 +293,14 @@ def trigger_silver_stream(): state_pattern = "([A-Z]{2}) [0-9]{5}" (spark.read - .table(f"`{catalog}`.`{dbName}`.retail_client_silver") + .table(f"`{catalog}`.`{db}`.retail_client_silver") .withColumn("state", regexp_extract("address", state_pattern, 1)) .groupBy("state") .count() .orderBy("state") .write .mode("overwrite") - .saveAsTable(f"`{catalog}`.`{dbName}`.retail_client_gold")) + .saveAsTable(f"`{catalog}`.`{db}`.retail_client_gold")) spark.sql("SELECT * FROM retail_client_gold ORDER BY count DESC LIMIT 10").display() @@ -359,7 +365,7 @@ def updateGoldCounts(data: DataFrame, batchId): (spark.readStream .option("readChangeData", "true") .option("startingVersion", last_version) - .table(f"`{catalog}`.`{dbName}`.retail_client_silver") + .table(f"`{catalog}`.`{db}`.retail_client_silver") .writeStream .trigger(processingTime="5 seconds") .foreachBatch(updateGoldCounts) @@ -439,4 +445,4 @@ def updateGoldCounts(data: DataFrame, batchId): # COMMAND ---------- # DBTITLE 1,Make sure we stop all actives streams -stop_all_streams() +DBDemos.stop_all_streams() diff --git a/product_demos/cdc-pipeline/_resources/00-setup.py b/product_demos/cdc-pipeline/_resources/00-setup.py index f6848578..22de0368 100644 --- a/product_demos/cdc-pipeline/_resources/00-setup.py +++ b/product_demos/cdc-pipeline/_resources/00-setup.py @@ -3,28 +3,34 @@ # COMMAND ---------- -# MAGIC %run ../../../_resources/00-global-setup-v2 $reset_all_data=$reset_all_data +# MAGIC %run ../../../_resources/00-global-setup-v2 # COMMAND ---------- -# Note: We do not recommend to change the catalog here as it won't impact all the demo resources such as DLT pipeline and Dashboards. Instead, please re-install the demo with a specific catalog and schema using -# dbdemos.install("lakehouse-retail-c360", catalog="..", schema="...") +# Note: End users should not modify this code. +# Instead, use dbdemos.install("cdc-pipeline", catalog="..", schema="...") catalog = "dbdemos" db = "cdc_pipeline" -volume_name = "data" +volume_name = "cdc_pipeline" reset_all_data = dbutils.widgets.get("reset_all_data") == "true" - DBDemos.setup_schema(catalog, db, reset_all_data, volume_name) volume_folder = f"/Volumes/{catalog}/{db}/{volume_name}" # COMMAND ---------- -if reset_all_data or DBDemos.is_folder_empty(volume_folder+"/user_csv"): - DBDemos.setup_schema(catalog=catalog, db=database, reset_all_data=reset_all_data) +total_databases = spark.sql(f"show databases in {catalog} like '{db}'").count() +assert (total_databases == 1), f"There should be exactly one database [{db}] within catalog [{catalog}]" + +total_volumes = spark.sql(f"show volumes in `{catalog}`.`{db}`").count() +assert (total_volumes == 1), f"There should be exactly one volume [{volume_name}] within {catalog}.{db}" - spark.sql(f"USE {catalog}.{database}") +# COMMAND ---------- + +if reset_all_data or DBDemos.is_folder_empty(volume_folder+"/user_csv"): + # delete all data + spark.sql(f"USE `{catalog}`.`{db}`") spark.sql("""DROP TABLE if exists clients_cdc""") spark.sql("""DROP TABLE if exists retail_client_silver""") spark.sql("""DROP TABLE if exists retail_client_gold""") From 92b0c7e70d1a1a1cd36e2090bf863efb38e50cd2 Mon Sep 17 00:00:00 2001 From: alexvanadio-databricks Date: Thu, 28 Mar 2024 16:03:53 +0000 Subject: [PATCH 10/12] replaced df._jdf.sparkSession() for df.sparkSession --- product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py b/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py index 130d5336..045fa9a8 100644 --- a/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py +++ b/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py @@ -148,7 +148,7 @@ def merge_stream(df: DataFrame, i): df.createOrReplaceTempView("clients_cdc_microbatch") - df._jdf.sparkSession().sql("""MERGE INTO retail_client_silver target + df.sparkSession.sql("""MERGE INTO retail_client_silver target USING (select id, name, address, email, operation from (SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY operation_date DESC) as rank from clients_cdc_microbatch) From b8ff426079609a7655a33744f92189209c03c4a2 Mon Sep 17 00:00:00 2001 From: alexvanadio-databricks Date: Thu, 28 Mar 2024 16:04:16 +0000 Subject: [PATCH 11/12] minimal UC modifications to full-multi-tables notebook --- .../02-CDC-CDF-full-multi-tables.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/product_demos/cdc-pipeline/02-CDC-CDF-full-multi-tables.py b/product_demos/cdc-pipeline/02-CDC-CDF-full-multi-tables.py index da070595..3dd32a70 100644 --- a/product_demos/cdc-pipeline/02-CDC-CDF-full-multi-tables.py +++ b/product_demos/cdc-pipeline/02-CDC-CDF-full-multi-tables.py @@ -57,8 +57,16 @@ # COMMAND ---------- +from concurrent.futures import ThreadPoolExecutor +from collections import deque +from delta.tables import * +from pyspark.sql.functions import input_file_name, col, row_number +from pyspark.sql.window import Window + +# COMMAND ---------- + # DBTITLE 1,Let's explore our raw cdc data. We have 2 tables we want to sync (transactions and users) -base_folder = f"{raw_data_location}/cdc" +base_folder = f"{volume_folder}/cdc" display(dbutils.fs.ls(base_folder)) # COMMAND ---------- @@ -67,7 +75,7 @@ # COMMAND ---------- -dbutils.fs.rm(f"{cloud_storage_path}/cdc_full", True) +dbutils.fs.rm(f"{volume_folder}/cdc_full", True) # COMMAND ---------- @@ -78,13 +86,13 @@ def update_bronze_layer(path, bronze_table): (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "csv") - .option("cloudFiles.schemaLocation", f"{cloud_storage_path}/cdc_full/schemas/{bronze_table}") + .option("cloudFiles.schemaLocation", f"{volume_folder}/cdc_full/schemas/{bronze_table}") .option("cloudFiles.schemaHints", "id bigint, operation_date timestamp") .option("cloudFiles.inferColumnTypes", "true") .load(path) .withColumn("file_name", input_file_name()) .writeStream - .option("checkpointLocation", f"{cloud_storage_path}/cdc_full/checkpoints/{bronze_table}") + .option("checkpointLocation", f"{volume_folder}/cdc_full/checkpoints/{bronze_table}") .option("mergeSchema", "true") .trigger(once=True) .table(bronze_table).awaitTermination()) @@ -121,7 +129,7 @@ def merge_stream(updates, i): .table(bronze_table) .writeStream .foreachBatch(merge_stream) - .option("checkpointLocation", f"{cloud_storage_path}/cdc_full/checkpoints/{silver_table}") + .option("checkpointLocation", f"{volume_folder}/cdc_full/checkpoints/{silver_table}") .trigger(once=True) .start().awaitTermination()) @@ -133,10 +141,6 @@ def merge_stream(updates, i): # COMMAND ---------- -from concurrent.futures import ThreadPoolExecutor -from collections import deque -from delta.tables import * - def refresh_cdc_table(table): try: #update the bronze table @@ -189,4 +193,4 @@ def refresh_cdc_table(table): # COMMAND ---------- # DBTITLE 1,Make sure we stop all actives streams -stop_all_streams() +DBDemos.stop_all_streams() From 11af6896c515d3ec44d5486e04c040e3c84f2b11 Mon Sep 17 00:00:00 2001 From: alexvanadio-databricks Date: Wed, 10 Apr 2024 18:47:24 +0000 Subject: [PATCH 12/12] manually reverted back to original gold use case --- .../01-CDC-CDF-simple-pipeline.py | 151 +++++------------- .../cdc-pipeline/_resources/00-setup.py | 6 +- .../cdc-pipeline/_resources/01-load-data.py | 8 +- 3 files changed, 50 insertions(+), 115 deletions(-) diff --git a/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py b/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py index 045fa9a8..4efda25d 100644 --- a/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py +++ b/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py @@ -86,7 +86,7 @@ .option("cloudFiles.schemaHints", "id bigint, operation_date timestamp") .load(volume_folder+'/user_csv')) -(bronzeDF.withColumn("file_name", input_file_name()).writeStream +(bronzeDF.withColumn("file_name", col("_metadata.file_path")).writeStream .option("checkpointLocation", volume_folder+"/checkpoint_cdc_raw") .trigger(availableNow=True) .table(f"`{catalog}`.`{db}`.clients_cdc") @@ -123,7 +123,11 @@ # MAGIC email STRING, # MAGIC operation STRING, # MAGIC CONSTRAINT id_pk PRIMARY KEY(id)) -# MAGIC TBLPROPERTIES (delta.enableChangeDataFeed = true, delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true); +# MAGIC TBLPROPERTIES ( +# MAGIC delta.enableChangeDataFeed = true, +# MAGIC delta.autoOptimize.optimizeWrite = true, +# MAGIC delta.autoOptimize.autoCompact = true +# MAGIC ); # COMMAND ---------- @@ -268,7 +272,9 @@ def trigger_silver_stream(): # MAGIC %md ### Synchronizing our downstream GOLD table based from the Silver changes # MAGIC -# MAGIC Let's now say that we want to know how many people there currently are by state. +# MAGIC Let's now say that we want to perform another table enhancement and propagate these changes downstream. +# MAGIC +# MAGIC To keep this example simple, we'll just add a column name `gold_data` with random data, but in real world this could be an aggregation, a join with another datasource, an ML model etc. # MAGIC # MAGIC The same logic as the Silver layer must be implemented. Since we now consume the CDF data, we also need to perform a deduplication stage. Let's do it using the python APIs this time for the example. # MAGIC @@ -278,7 +284,13 @@ def trigger_silver_stream(): # DBTITLE 1,Let's create or final GOLD table: retail_client_gold # MAGIC %sql -# MAGIC CREATE TABLE IF NOT EXISTS retail_client_gold (state STRING, count LONG); +# MAGIC CREATE TABLE IF NOT EXISTS retail_client_gold ( +# MAGIC id BIGINT NOT NULL, +# MAGIC name STRING, +# MAGIC address STRING, +# MAGIC email STRING, +# MAGIC gold_data STRING, +# MAGIC CONSTRAINT gold_id_pk PRIMARY KEY(id)); # COMMAND ---------- @@ -288,129 +300,44 @@ def trigger_silver_stream(): # COMMAND ---------- -from pyspark.sql.functions import regexp_extract - -state_pattern = "([A-Z]{2}) [0-9]{5}" - -(spark.read - .table(f"`{catalog}`.`{db}`.retail_client_silver") - .withColumn("state", regexp_extract("address", state_pattern, 1)) - .groupBy("state") - .count() - .orderBy("state") - .write - .mode("overwrite") - .saveAsTable(f"`{catalog}`.`{db}`.retail_client_gold")) - -spark.sql("SELECT * FROM retail_client_gold ORDER BY count DESC LIMIT 10").display() - - -# COMMAND ---------- - -# DBTITLE 1,Define the MERGE behavior from pyspark.sql.window import Window -from pyspark.sql.functions import dense_rank, regexp_replace, lit, sum as psum, expr - -def updateGoldCounts(data: DataFrame, batchId): - """ - Updates gold counts for a dataset based on changes captured in a Change Data Feed (CDF). - It deduplicates records, extracts states from addresses, and calculates the net change in counts by state - before merging these changes into a gold table. - - Args: - data (DataFrame): The input DataFrame containing the CDF records. - batchId (str): The batch ID for the current update. Not directly used in the process - - The method follows these steps: - 1. Deduplicates the data based on the 'id' field, keeping only the most recent update for each id. - 2. Extracts the 'state' from the 'address' field using a regular expression. - 3. Calculates a 'value' for each record to represent the net change in counts (1 for inserts and post-updates, - -1 for deletes and pre-updates, and a large negative number for unrecognized change types as an error state). - 4. Aggregates these values by 'state' to get the net change in counts per state. - 5. Merges these aggregated changes into the 'retail_client_gold' DeltaTable, updating the 'count' for each state - based on the calculated net change. - """ +from pyspark.sql.functions import dense_rank, regexp_replace, lit +#Function to upsert `microBatchOutputDF` into Delta table using MERGE +def upsertToDelta(data, batchId): + #First we need to deduplicate based on the id and take the most recent update windowSpec = Window.partitionBy("id").orderBy(col("_commit_version").desc()) - data_deduplicated = data.withColumn("rank", dense_rank().over(windowSpec)).where("rank = 1").drop("_commit_version", "rank") - - deduped_with_state = data_deduplicated.withColumn("state", regexp_extract("address", state_pattern, 1)) - - date_pre_aggregation = (deduped_with_state.withColumn("value", expr(""" - CASE - WHEN _change_type = 'insert' THEN 1 - WHEN _change_type = 'delete' THEN -1 - WHEN _change_type = 'update_preimage' THEN -1 - WHEN _change_type = 'update_postimage' THEN 1 - ELSE -9999 - END - """))) - - aggregated_by_state = (date_pre_aggregation.groupBy("state") - .agg(psum("value").alias("offset"))) + #Select only the first value + #getting the latest change is still needed if the cdc contains multiple time the same id. We can rank over the id and get the most recent _commit_version + data_deduplicated = data.withColumn("rank", dense_rank().over(windowSpec)).where("rank = 1 and _change_type!='update_preimage'").drop("_commit_version", "rank") + + #Add some data cleaning for the gold layer to remove quotes from the address + data_deduplicated = data_deduplicated.withColumn("address", regexp_replace(col("address"), "\"", "")) - (DeltaTable.forName(spark, "retail_client_gold").alias("target") - .merge(aggregated_by_state.alias("source"), "source.state = target.state") - .whenMatchedUpdate(set={ - "count": expr("target.count + source.offset") - }) + #run the merge in the gold table directly + (DeltaTable.forName(data.sparkSession, "retail_client_gold").alias("target") + .merge(data_deduplicated.alias("source"), "source.id = target.id") + .whenMatchedDelete("source._change_type = 'delete'") + .whenMatchedUpdateAll("source._change_type != 'delete'") + .whenNotMatchedInsertAll("source._change_type != 'delete'") .execute()) -# COMMAND ---------- - -# DBTITLE 1,Start the gold stream -last_version = str(DeltaTable.forName(spark, "retail_client_silver").history(1).head()["version"]) - (spark.readStream .option("readChangeData", "true") - .option("startingVersion", last_version) - .table(f"`{catalog}`.`{db}`.retail_client_silver") + .option("startingVersion", 1) + .table("retail_client_silver") + .withColumn("gold_data", lit("Delta CDF is Awesome")) .writeStream - .trigger(processingTime="5 seconds") - .foreachBatch(updateGoldCounts) + .foreachBatch(upsertToDelta) .start()) -# COMMAND ---------- - -# MAGIC %sql -# MAGIC -# MAGIC insert into clients_cdc (id, name, address, email, operation_date, operation, _rescued_data, file_name) values -# MAGIC (77777, "Alexander", "0311 Donovan MewsHammondmouth, MT 51685", "alexander@databricks.com", now(), "APPEND", null, null), -# MAGIC (88888, "Faith", "48764 Howard Forge Apt. 421Vanessaside, MT 79393", "faith@databricks.com", now(), "APPEND", null, null), -# MAGIC (1000, null, null, null, now(), "DELETE", null, null); - -# COMMAND ---------- - -# pull the CDC changes from bronze through silver -trigger_silver_stream() - -# COMMAND ---------- - -# MAGIC %md -# MAGIC Let's make sure the new records made it into silver and the deleted record is gone. +time.sleep(20) # COMMAND ---------- -# MAGIC %sql -# MAGIC -# MAGIC select * from retail_client_silver where id in (77777, 88888, 1000); - -# COMMAND ---------- - -# wait for the gold stream to trigger -time.sleep(10) - -# COMMAND ---------- - -# MAGIC %md -# MAGIC If everything is working properly, we expect to see the MO count increase by 2. We also deleted a person who lived in UT, so we should see that decrease by 1. -# MAGIC -# MAGIC Please feel free to experiment with other scenarios by inserting change reecords. For example, what should happen when someone from MO updates their record but is still in MO? What about when someone moves from one state to another? - -# COMMAND ---------- - -# MAGIC %sql SELECT * FROM retail_client_gold ORDER BY count DESC LIMIT 10 +# DBTITLE 1,Start the gold stream +# MAGIC %sql SELECT * FROM retail_client_gold # COMMAND ---------- diff --git a/product_demos/cdc-pipeline/_resources/00-setup.py b/product_demos/cdc-pipeline/_resources/00-setup.py index 22de0368..fb77ab86 100644 --- a/product_demos/cdc-pipeline/_resources/00-setup.py +++ b/product_demos/cdc-pipeline/_resources/00-setup.py @@ -30,17 +30,19 @@ if reset_all_data or DBDemos.is_folder_empty(volume_folder+"/user_csv"): # delete all data + print("Dropping table...", end='') spark.sql(f"USE `{catalog}`.`{db}`") spark.sql("""DROP TABLE if exists clients_cdc""") spark.sql("""DROP TABLE if exists retail_client_silver""") spark.sql("""DROP TABLE if exists retail_client_gold""") + print(" Done") #data generation on another notebook to avoid installing libraries (takes a few seconds to setup pip env) - print(f"Generating data under {volume_folder} , please wait a few sec...") + print(f"Generating data under {volume_folder}. This may take several minutes...") path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get() parent_count = path[path.rfind("Delta-Lake-CDC-CDF"):].count('/') - 1 prefix = "./" if parent_count == 0 else parent_count*"../" prefix = f'{prefix}_resources/' - dbutils.notebook.run(prefix+"01-load-data", 120, {"raw_data_location": volume_folder}) + dbutils.notebook.run(path=prefix+"01-load-data", timeout_seconds=300, arguments={"raw_data_location": volume_folder}) else: print("data already existing. Run with reset_all_data=true to force a data cleanup for your local demo.") diff --git a/product_demos/cdc-pipeline/_resources/01-load-data.py b/product_demos/cdc-pipeline/_resources/01-load-data.py index 33f09783..cdd06b57 100644 --- a/product_demos/cdc-pipeline/_resources/01-load-data.py +++ b/product_demos/cdc-pipeline/_resources/01-load-data.py @@ -36,6 +36,9 @@ def create_dataset(df): df = df.withColumn("email", fake_email()) df = df.withColumn("address", fake_address()) return df + +print("Generating Data... ", end='') + #APPEND Faker.seed(0) df = spark.range(0, 10000) @@ -77,8 +80,11 @@ def cleanup_folder(path): df = df.withColumn("amount", (F.rand(10)*1000).cast('int')+10) df = df.withColumn("operation", F.lit('UPDATE')) df.repartition(1).write.mode("append").option("header", "true").format("csv").save(raw_data_location+"/cdc/transactions") +print("Done") +print("Cleaning up files... ", end='') cleanup_folder(raw_data_location+"/user_csv") cleanup_folder(raw_data_location+"/cdc/users") -cleanup_folder(raw_data_location+"/cdc/transactions") +cleanup_folder(raw_data_location+"/cdc/transactions") +print("Done")