### Simulate Data Changes - AdventureWorks

This is the second Databricks notebook of two in the Azure Synapse Lakehouse Sync tutorial. It simulates inserts, updates, deletes, and schema changes across multiple AdventureWorks tables to demonstrate the Change Data Feed feature.

**FactInternetSales**
  1. Merge records (updates and inserts)
  2. Delete Records
  
**FactResellerSales**
  1. Delete records
  2. Merge records (updates and inserts)

**DimCustomer**
  1. Merge records (updates and inserts)
  
**DimProduct**
  1. No changes
  
**DimPromotion**
  1. Delete records
  2. Merge records (updates and inserts)
  3. Increase the data size of the EnglishPromotionName column to include a value with 40 characters
  
**DimDate**
  1. No changes

### Notebook Parameters

The below parameters are populated at runtime by the **SynapseLakehouseSync_Tutorial** pipeline in the Synapse Analytics Workspace.

<br>

- **DeltaDataFolderPathFull**: The Azure Data Lake Storage path which contains our sample parquet tables and will contain the Delta 2.0 Gold Zone tables
- **DatabaseName**: The Delta 2.0 database name and the Synapse Dedicated SQL database to be synchronized to
- **SynapseWorkspaceName**: The ADLS storage account name where the AdventureWorks parquet data exists
- **ParquetDataADLSFullPath**: The ADLS storage account name where the AdventureWorks change dataset in parquet format exists.
- **ParquetDataDatabricksKeyVaultScope**: The Azure Databricks scope name
- **ParquetDataAzureKeyVaultSecretName**: The Azure Key Vault secret name

In [None]:
SynapseLakehouseSyncParameters = '''
{
    "DatabaseName": "AdventureWorks",
    "SynapseWorkspaceName": "synapsesyncqbq",
    "ParquetDataADLSFullPath": "abfss://gold@enterprisedatalakeqbq.dfs.core.windows.net/Sample/"
}
'''

In [33]:
import json

SynapseLakehouseSyncParameters = json.loads(SynapseLakehouseSyncParameters)

print(json.dumps(SynapseLakehouseSyncParameters, indent=4))

In [34]:
%run "/Synapse Lakehouse Sync/Synapse Lakehouse Sync Functions"

In [35]:
deleteDF_FactResellerSales = spark.read.parquet(f'{SynapseLakehouseSyncParameters["ParquetDataADLSFullPath"]}/{SynapseLakehouseSyncParameters["DatabaseName"]}_changes/FactResellerSales_deletes')

In [36]:
from delta.tables import *

tableName = 'FactResellerSales'
print(f'Merge - {tableName}')
deltaTable = DeltaTable.forName(spark, f'{SynapseLakehouseSyncParameters["DatabaseName"]}.{tableName}')

deltaTable.alias("target").merge(
  deleteDF_FactResellerSales.alias("source"), 'source.SalesOrderNumber = target.SalesOrderNumber AND source.SalesOrderLineNumber = target.SalesOrderLineNumber' ) \
  .whenMatchedDelete() \
  .execute()


In [37]:
mergeDF_FactInternetSales = spark.read.parquet(*[f'{SynapseLakehouseSyncParameters["ParquetDataADLSFullPath"]}/{SynapseLakehouseSyncParameters["DatabaseName"]}_changes/FactInternetSales_inserts', f'{SynapseLakehouseSyncParameters["ParquetDataADLSFullPath"]}/{SynapseLakehouseSyncParameters["DatabaseName"]}_changes/FactInternetSales_updates'])
mergeDF_FactResellerSales = spark.read.parquet(*[f'{SynapseLakehouseSyncParameters["ParquetDataADLSFullPath"]}/{SynapseLakehouseSyncParameters["DatabaseName"]}_changes/FactResellerSales_inserts', f'{SynapseLakehouseSyncParameters["ParquetDataADLSFullPath"]}/{SynapseLakehouseSyncParameters["DatabaseName"]}_changes/FactResellerSales_updates'])
mergeDF_DimCustomer = spark.read.parquet(*[f'{SynapseLakehouseSyncParameters["ParquetDataADLSFullPath"]}/{SynapseLakehouseSyncParameters["DatabaseName"]}_changes/DimCustomer_inserts', f'{SynapseLakehouseSyncParameters["ParquetDataADLSFullPath"]}/{SynapseLakehouseSyncParameters["DatabaseName"]}_changes/DimCustomer_updates'])
mergeDF_DimPromotion = spark.read.parquet(*[f'{SynapseLakehouseSyncParameters["ParquetDataADLSFullPath"]}/{SynapseLakehouseSyncParameters["DatabaseName"]}_changes/DimPromotion_inserts', f'{SynapseLakehouseSyncParameters["ParquetDataADLSFullPath"]}/{SynapseLakehouseSyncParameters["DatabaseName"]}_changes/DimPromotion_updates'])

In [38]:
from delta.tables import *

tableName = 'FactInternetSales'
print(f'Merge - {tableName}')
deltaTable = DeltaTable.forName(spark, f'{SynapseLakehouseSyncParameters["DatabaseName"]}.{tableName}')

deltaTable.alias("target").merge(
  mergeDF_FactInternetSales.alias("source"), 'source.SalesOrderNumber = target.SalesOrderNumber AND source.SalesOrderLineNumber = target.SalesOrderLineNumber' ) \
  .whenMatchedUpdate(set =
{'ProductKey': 'source.ProductKey', 'OrderDateKey': 'source.OrderDateKey', 'DueDateKey': 'source.DueDateKey', 'ShipDateKey': 'source.ShipDateKey', 'CustomerKey': 'source.CustomerKey', 'PromotionKey': 'source.PromotionKey', 'CurrencyKey': 'source.CurrencyKey', 'SalesTerritoryKey': 'source.SalesTerritoryKey', 'SalesOrderNumber': 'source.SalesOrderNumber', 'SalesOrderLineNumber': 'source.SalesOrderLineNumber', 'RevisionNumber': 'source.RevisionNumber', 'OrderQuantity': 'source.OrderQuantity', 'UnitPrice': 'source.UnitPrice', 'ExtendedAmount': 'source.ExtendedAmount', 'UnitPriceDiscountPct': 'source.UnitPriceDiscountPct', 'DiscountAmount': 'source.DiscountAmount', 'ProductStandardCost': 'source.ProductStandardCost', 'TotalProductCost': 'source.TotalProductCost', 'SalesAmount': 'source.SalesAmount', 'TaxAmt': 'source.TaxAmt', 'Freight': 'source.Freight', 'CarrierTrackingNumber': 'source.CarrierTrackingNumber', 'CustomerPONumber': 'source.CustomerPONumber', 'OrderDate': 'source.OrderDate', 'DueDate': 'source.DueDate', 'ShipDate': 'source.ShipDate'}) \
  .whenNotMatchedInsert(values =
{'ProductKey': 'source.ProductKey', 'OrderDateKey': 'source.OrderDateKey', 'DueDateKey': 'source.DueDateKey', 'ShipDateKey': 'source.ShipDateKey', 'CustomerKey': 'source.CustomerKey', 'PromotionKey': 'source.PromotionKey', 'CurrencyKey': 'source.CurrencyKey', 'SalesTerritoryKey': 'source.SalesTerritoryKey', 'SalesOrderNumber': 'source.SalesOrderNumber', 'SalesOrderLineNumber': 'source.SalesOrderLineNumber', 'RevisionNumber': 'source.RevisionNumber', 'OrderQuantity': 'source.OrderQuantity', 'UnitPrice': 'source.UnitPrice', 'ExtendedAmount': 'source.ExtendedAmount', 'UnitPriceDiscountPct': 'source.UnitPriceDiscountPct', 'DiscountAmount': 'source.DiscountAmount', 'ProductStandardCost': 'source.ProductStandardCost', 'TotalProductCost': 'source.TotalProductCost', 'SalesAmount': 'source.SalesAmount', 'TaxAmt': 'source.TaxAmt', 'Freight': 'source.Freight', 'CarrierTrackingNumber': 'source.CarrierTrackingNumber', 'CustomerPONumber': 'source.CustomerPONumber', 'OrderDate': 'source.OrderDate', 'DueDate': 'source.DueDate', 'ShipDate': 'source.ShipDate'}) \
  .execute()

#############################################################################################
#############################################################################################
#############################################################################################

tableName = 'FactResellerSales'
print(f'Merge - {tableName}')
deltaTable = DeltaTable.forName(spark, f'{SynapseLakehouseSyncParameters["DatabaseName"]}.{tableName}')

deltaTable.alias("target").merge(
  mergeDF_FactResellerSales.alias("source"), 'source.SalesOrderNumber = target.SalesOrderNumber AND source.SalesOrderLineNumber = target.SalesOrderLineNumber' ) \
  .whenMatchedUpdate(set =
{'ProductKey': 'source.ProductKey', 'OrderDateKey': 'source.OrderDateKey', 'DueDateKey': 'source.DueDateKey', 'ShipDateKey': 'source.ShipDateKey', 'ResellerKey': 'source.ResellerKey', 'EmployeeKey': 'source.EmployeeKey', 'PromotionKey': 'source.PromotionKey', 'CurrencyKey': 'source.CurrencyKey', 'SalesTerritoryKey': 'source.SalesTerritoryKey', 'SalesOrderNumber': 'source.SalesOrderNumber', 'SalesOrderLineNumber': 'source.SalesOrderLineNumber', 'RevisionNumber': 'source.RevisionNumber', 'OrderQuantity': 'source.OrderQuantity', 'UnitPrice': 'source.UnitPrice', 'ExtendedAmount': 'source.ExtendedAmount', 'UnitPriceDiscountPct': 'source.UnitPriceDiscountPct', 'DiscountAmount': 'source.DiscountAmount', 'ProductStandardCost': 'source.ProductStandardCost', 'TotalProductCost': 'source.TotalProductCost', 'SalesAmount': 'source.SalesAmount', 'TaxAmt': 'source.TaxAmt', 'Freight': 'source.Freight', 'CarrierTrackingNumber': 'source.CarrierTrackingNumber', 'CustomerPONumber': 'source.CustomerPONumber', 'OrderDate': 'source.OrderDate', 'DueDate': 'source.DueDate', 'ShipDate': 'source.ShipDate'}) \
  .whenNotMatchedInsert(values =
{'ProductKey': 'source.ProductKey', 'OrderDateKey': 'source.OrderDateKey', 'DueDateKey': 'source.DueDateKey', 'ShipDateKey': 'source.ShipDateKey', 'ResellerKey': 'source.ResellerKey', 'EmployeeKey': 'source.EmployeeKey', 'PromotionKey': 'source.PromotionKey', 'CurrencyKey': 'source.CurrencyKey', 'SalesTerritoryKey': 'source.SalesTerritoryKey', 'SalesOrderNumber': 'source.SalesOrderNumber', 'SalesOrderLineNumber': 'source.SalesOrderLineNumber', 'RevisionNumber': 'source.RevisionNumber', 'OrderQuantity': 'source.OrderQuantity', 'UnitPrice': 'source.UnitPrice', 'ExtendedAmount': 'source.ExtendedAmount', 'UnitPriceDiscountPct': 'source.UnitPriceDiscountPct', 'DiscountAmount': 'source.DiscountAmount', 'ProductStandardCost': 'source.ProductStandardCost', 'TotalProductCost': 'source.TotalProductCost', 'SalesAmount': 'source.SalesAmount', 'TaxAmt': 'source.TaxAmt', 'Freight': 'source.Freight', 'CarrierTrackingNumber': 'source.CarrierTrackingNumber', 'CustomerPONumber': 'source.CustomerPONumber', 'OrderDate': 'source.OrderDate', 'DueDate': 'source.DueDate', 'ShipDate': 'source.ShipDate'}) \
  .execute()

#############################################################################################
#############################################################################################
#############################################################################################

tableName = 'DimCustomer'
print(f'Merge - {tableName}')
deltaTable = DeltaTable.forName(spark, f'{SynapseLakehouseSyncParameters["DatabaseName"]}.{tableName}')

deltaTable.alias("target").merge(
  mergeDF_DimCustomer.alias("source"), 'source.CustomerAlternateKey = target.CustomerAlternateKey' ) \
  .whenMatchedUpdate(set =
{'CustomerKey': 'source.CustomerKey', 'GeographyKey': 'source.GeographyKey', 'CustomerAlternateKey': 'source.CustomerAlternateKey', 'Title': 'source.Title', 'FirstName': 'source.FirstName', 'MiddleName': 'source.MiddleName', 'LastName': 'source.LastName', 'NameStyle': 'source.NameStyle', 'BirthDate': 'source.BirthDate', 'MaritalStatus': 'source.MaritalStatus', 'Suffix': 'source.Suffix', 'Gender': 'source.Gender', 'EmailAddress': 'source.EmailAddress', 'YearlyIncome': 'source.YearlyIncome', 'TotalChildren': 'source.TotalChildren', 'NumberChildrenAtHome': 'source.NumberChildrenAtHome', 'EnglishEducation': 'source.EnglishEducation', 'SpanishEducation': 'source.SpanishEducation', 'FrenchEducation': 'source.FrenchEducation', 'EnglishOccupation': 'source.EnglishOccupation', 'SpanishOccupation': 'source.SpanishOccupation', 'FrenchOccupation': 'source.FrenchOccupation', 'HouseOwnerFlag': 'source.HouseOwnerFlag', 'NumberCarsOwned': 'source.NumberCarsOwned', 'AddressLine1': 'source.AddressLine1', 'AddressLine2': 'source.AddressLine2', 'Phone': 'source.Phone', 'DateFirstPurchase': 'source.DateFirstPurchase', 'CommuteDistance': 'source.CommuteDistance'}) \
  .whenNotMatchedInsert(values =
{'CustomerKey': 'source.CustomerKey', 'GeographyKey': 'source.GeographyKey', 'CustomerAlternateKey': 'source.CustomerAlternateKey', 'Title': 'source.Title', 'FirstName': 'source.FirstName', 'MiddleName': 'source.MiddleName', 'LastName': 'source.LastName', 'NameStyle': 'source.NameStyle', 'BirthDate': 'source.BirthDate', 'MaritalStatus': 'source.MaritalStatus', 'Suffix': 'source.Suffix', 'Gender': 'source.Gender', 'EmailAddress': 'source.EmailAddress', 'YearlyIncome': 'source.YearlyIncome', 'TotalChildren': 'source.TotalChildren', 'NumberChildrenAtHome': 'source.NumberChildrenAtHome', 'EnglishEducation': 'source.EnglishEducation', 'SpanishEducation': 'source.SpanishEducation', 'FrenchEducation': 'source.FrenchEducation', 'EnglishOccupation': 'source.EnglishOccupation', 'SpanishOccupation': 'source.SpanishOccupation', 'FrenchOccupation': 'source.FrenchOccupation', 'HouseOwnerFlag': 'source.HouseOwnerFlag', 'NumberCarsOwned': 'source.NumberCarsOwned', 'AddressLine1': 'source.AddressLine1', 'AddressLine2': 'source.AddressLine2', 'Phone': 'source.Phone', 'DateFirstPurchase': 'source.DateFirstPurchase', 'CommuteDistance': 'source.CommuteDistance'}) \
  .execute()

#############################################################################################
#############################################################################################
#############################################################################################

tableName = 'DimPromotion'
print(f'Merge - {tableName}')
deltaTable = DeltaTable.forName(spark, f'{SynapseLakehouseSyncParameters["DatabaseName"]}.{tableName}')

deltaTable.alias("target").merge(
  mergeDF_DimPromotion.alias("source"), 'source.PromotionAlternateKey = target.PromotionAlternateKey' ) \
  .whenMatchedUpdate(set =
{'PromotionKey': 'source.PromotionKey', 'PromotionAlternateKey': 'source.PromotionAlternateKey', 'EnglishPromotionName': 'source.EnglishPromotionName', 'SpanishPromotionName': 'source.SpanishPromotionName', 'FrenchPromotionName': 'source.FrenchPromotionName', 'DiscountPct': 'source.DiscountPct', 'EnglishPromotionType': 'source.EnglishPromotionType', 'SpanishPromotionType': 'source.SpanishPromotionType', 'FrenchPromotionType': 'source.FrenchPromotionType', 'EnglishPromotionCategory': 'source.EnglishPromotionCategory', 'SpanishPromotionCategory': 'source.SpanishPromotionCategory', 'FrenchPromotionCategory': 'source.FrenchPromotionCategory', 'StartDate': 'source.StartDate', 'EndDate': 'source.EndDate', 'MinQty': 'source.MinQty', 'MaxQty': 'source.MaxQty'}) \
  .whenNotMatchedInsert(values =
{'PromotionKey': 'source.PromotionKey', 'PromotionAlternateKey': 'source.PromotionAlternateKey', 'EnglishPromotionName': 'source.EnglishPromotionName', 'SpanishPromotionName': 'source.SpanishPromotionName', 'FrenchPromotionName': 'source.FrenchPromotionName', 'DiscountPct': 'source.DiscountPct', 'EnglishPromotionType': 'source.EnglishPromotionType', 'SpanishPromotionType': 'source.SpanishPromotionType', 'FrenchPromotionType': 'source.FrenchPromotionType', 'EnglishPromotionCategory': 'source.EnglishPromotionCategory', 'SpanishPromotionCategory': 'source.SpanishPromotionCategory', 'FrenchPromotionCategory': 'source.FrenchPromotionCategory', 'StartDate': 'source.StartDate', 'EndDate': 'source.EndDate', 'MinQty': 'source.MinQty', 'MaxQty': 'source.MaxQty'}) \
  .execute()

In [39]:
deleteDF_FactInternetSales = spark.read.parquet(f'{SynapseLakehouseSyncParameters["ParquetDataADLSFullPath"]}/{SynapseLakehouseSyncParameters["DatabaseName"]}_changes/FactInternetSales_deletes')

In [40]:
from delta.tables import *

tableName = 'FactInternetSales'
print(f'Merge - {tableName}')
deltaTable = DeltaTable.forName(spark, f'{SynapseLakehouseSyncParameters["DatabaseName"]}.{tableName}')

deltaTable.alias("target").merge(
  deleteDF_FactInternetSales.alias("source"), 'source.SalesOrderNumber = target.SalesOrderNumber AND source.SalesOrderLineNumber = target.SalesOrderLineNumber' ) \
  .whenMatchedDelete() \
  .execute()


In [41]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, ArrayType, IntegerType, BooleanType, LongType, DoubleType
import datetime
from delta.tables import *

mySchema = StructType([StructField('_Id',LongType(),True),StructField('PromotionKey',IntegerType(),True),StructField('PromotionAlternateKey',IntegerType(),True),StructField('EnglishPromotionName',StringType(),True),StructField('SpanishPromotionName',StringType(),True),StructField('FrenchPromotionName',StringType(),True),StructField('DiscountPct',DoubleType(),True),StructField('EnglishPromotionType',StringType(),True),StructField('SpanishPromotionType',StringType(),True),StructField('FrenchPromotionType',StringType(),True),StructField('EnglishPromotionCategory',StringType(),True),StructField('SpanishPromotionCategory',StringType(),True),StructField('FrenchPromotionCategory',StringType(),True),StructField('StartDate',TimestampType(),True),StructField('EndDate',TimestampType(),True),StructField('MinQty',IntegerType(),True),StructField('MaxQty',IntegerType(),True)])

data = [(1, 1, 1, '_' * 40, 'Sin descuento', 'Aucune remise', float(0), 'No Discount', 'Sin descuento', 'Aucune remise', 'No Discount', 'Sin descuento', 'Aucune remise', datetime.datetime.strptime('2010-11-29', '%Y-%m-%d'), datetime.datetime.strptime('2014-06-30', '%Y-%m-%d'), 0, None )]

mergeDF_DimPromotion_DatatypeChange = spark.createDataFrame(data=data, schema=mySchema)


tableName = 'DimPromotion'
print(f'Merge - {tableName}')
deltaTable = DeltaTable.forName(spark, f'{SynapseLakehouseSyncParameters["DatabaseName"]}.{tableName}')

deltaTable.alias("target").merge(
  mergeDF_DimPromotion_DatatypeChange.alias("source"), 'source.PromotionAlternateKey = target.PromotionAlternateKey' ) \
  .whenMatchedUpdate(set =
{'PromotionKey': 'source.PromotionKey', 'PromotionAlternateKey': 'source.PromotionAlternateKey', 'EnglishPromotionName': 'source.EnglishPromotionName', 'SpanishPromotionName': 'source.SpanishPromotionName', 'FrenchPromotionName': 'source.FrenchPromotionName', 'DiscountPct': 'source.DiscountPct', 'EnglishPromotionType': 'source.EnglishPromotionType', 'SpanishPromotionType': 'source.SpanishPromotionType', 'FrenchPromotionType': 'source.FrenchPromotionType', 'EnglishPromotionCategory': 'source.EnglishPromotionCategory', 'SpanishPromotionCategory': 'source.SpanishPromotionCategory', 'FrenchPromotionCategory': 'source.FrenchPromotionCategory', 'StartDate': 'source.StartDate', 'EndDate': 'source.EndDate', 'MinQty': 'source.MinQty', 'MaxQty': 'source.MaxQty'}) \
  .whenNotMatchedInsert(values =
{'PromotionKey': 'source.PromotionKey', 'PromotionAlternateKey': 'source.PromotionAlternateKey', 'EnglishPromotionName': 'source.EnglishPromotionName', 'SpanishPromotionName': 'source.SpanishPromotionName', 'FrenchPromotionName': 'source.FrenchPromotionName', 'DiscountPct': 'source.DiscountPct', 'EnglishPromotionType': 'source.EnglishPromotionType', 'SpanishPromotionType': 'source.SpanishPromotionType', 'FrenchPromotionType': 'source.FrenchPromotionType', 'EnglishPromotionCategory': 'source.EnglishPromotionCategory', 'SpanishPromotionCategory': 'source.SpanishPromotionCategory', 'FrenchPromotionCategory': 'source.FrenchPromotionCategory', 'StartDate': 'source.StartDate', 'EndDate': 'source.EndDate', 'MinQty': 'source.MinQty', 'MaxQty': 'source.MaxQty'}) \
  .execute()


In [42]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, ArrayType, IntegerType, BooleanType, LongType, DoubleType
import datetime
import pyspark


# dfDropColumn = DeltaTable.forPath(spark, f'abfss://bronzezone@adlsbrmyers.dfs.core.windows.net/AdventureWorks/DimCustomer').toDF()
# dfDropColumn.printSchema()

#https://stackoverflow.com/questions/54457068/how-to-drop-a-column-from-a-databricks-delta-table
# spark.sql(f'ALTER TABLE AdventureWorks.DimCustomer DROP COLUMN EmailAddress')


In [43]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, ArrayType, IntegerType, BooleanType, LongType, DoubleType
import datetime

# dfAddColumn = DeltaTable.forPath(spark, f'abfss://bronzezone@adlsbrmyers.dfs.core.windows.net/AdventureWorks/DimProduct').toDF()
# dfAddColumn.printSchema()

#https://stackoverflow.com/questions/54457068/how-to-drop-a-column-from-a-databricks-delta-table
# spark.sql(f'ALTER TABLE AdventureWorks.DimProduct DROP COLUMN Size')


