Create database and Slowly Changing Dimensions (SCD) Type 1 table

In [0]:
%sql
create database if not exists look_up

create table if not exists look_up.dim_country_scd1(
  country                  STRING,
  country_code_2_digit     STRING,
  country_code_3_digit     STRING,
  continent                STRING,
  population               INT
);

alter table look_up.dim_country_scd1 set tblproperties ('delta.enableChangeDataFeed' = 'true');

In [0]:
jdbcHostname = dbutils.secrets.get(scope="jdbc_stock", key="jdbcHostname")
jdbcDatabase = dbutils.secrets.get(scope="jdbc_stock", key="jdbcDatabase")
jdbcUsername = dbutils.secrets.get(scope="jdbc_stock", key="jdbcUsername")
jdbcPassword = dbutils.secrets.get(scope="jdbc_stock", key="jdbcPassword")

jdbcUrl = f"jdbc:sqlserver://{jdbcHostname};database={jdbcDatabase}"

country_df = (spark.read 
                .format("jdbc") 
                .option("url", jdbcUrl) 
                .option("dbtable", "look_up.dim_country") 
                .option("user", jdbcUsername) 
                .option("password", jdbcPassword) 
                .load()
)

country_df.createOrReplaceTempView("country")

Ingest data to SCD Type 1 table from Azure SQL Database

In [0]:
%sql

MERGE INTO look_up.dim_country_scd1 as target
USING country as source
ON target.country = source.country
WHEN MATCHED THEN 
  UPDATE SET 
    target.country = CASE WHEN target.country <> source.country THEN source.country ELSE target.country END,
    target.country_code_2_digit = CASE WHEN target.country_code_2_digit <> source.country_code_2_digit THEN source.country_code_2_digit ELSE target.country_code_2_digit END,
    target.country_code_3_digit = CASE WHEN target.country_code_3_digit <> source.country_code_3_digit THEN source.country_code_3_digit ELSE target.country_code_3_digit END,
    target.continent = CASE WHEN target.continent <> source.continent THEN source.continent ELSE target.continent END,
    target.population = CASE WHEN target.population <> source.population THEN source.population ELSE target.population END

WHEN NOT MATCHED THEN 
    INSERT *

WHEN NOT MATCHED BY source THEN 
    DELETE

In [0]:
last_version = spark.conf.get('spark.databricks.delta.lastCommitVersionInSession')
change_df1 = spark.sql(f"select * from table_changes('look_up.dim_country_scd1', {last_version}, {last_version})")
change_df1.createOrReplaceTempView('change_df1_view')

Create SCD Type 2 table

In [0]:
%sql
create table if not exists look_up.dim_country_scd2(
  country                  STRING,
  country_code_2_digit     STRING,
  country_code_3_digit     STRING,
  continent                STRING,
  population               INT,
  start_date               TIMESTAMP,
  end_date                 TIMESTAMP,
  is_current               BOOLEAN
);

MERGE INTO look_up.dim_country_scd2 AS target
USING change_df1_view AS source
ON target.country = source.country
WHEN NOT MATCHED AND source._change_type = 'insert'
THEN 
  INSERT(
    country,
    country_code_2_digit,
    country_code_3_digit,
    continent,
    population,
    start_date,
    end_date,
    is_current
  ) VALUES (
    country,
    country_code_2_digit,
    country_code_3_digit,
    continent,
    population,
    CURRENT_TIMESTAMP,
    NULL,
    TRUE
  );

Make changes to the country table in the Azure SQL database

In [0]:
odbcDriver = "{ODBC Driver 17 for SQL Server}"
odbcServer = dbutils.secrets.get(scope="jdbc_stock", key="odbcServer")
jdbcDatabase = dbutils.secrets.get(scope="jdbc_stock", key="jdbcDatabase")
jdbcUsername = dbutils.secrets.get(scope="jdbc_stock", key="jdbcUsername")
jdbcPassword = dbutils.secrets.get(scope="jdbc_stock", key="jdbcPassword")

connection_string = (f"DRIVER={odbcDriver};SERVER={odbcServer};DATABASE={jdbcDatabase};UID={jdbcUsername};PWD={jdbcPassword};""TrustServerCertificate=Yes")

In [0]:
import pyodbc

# Establish connection
conn = pyodbc.connect(connection_string)

# create cursor object
cursor = conn.cursor()

#Insert, update, and delete one row in SQL Database table
cursor.execute("INSERT INTO look_up.dim_country(country, country_code_2_digit, country_code_3_digit, continent, population) VALUES ('c','cc','ccc','cde','20000')")

cursor.execute("UPDATE look_up.dim_country SET population = 38928341 WHERE country_code_2_digit = 'AF'")

cursor.execute("DELETE FROM look_up.dim_country WHERE country_code_2_digit = 'DZ'")

conn.commit()

cursor.close()
conn.close()

Sync changes to the country_scd1 table in Delta Lake.

In [0]:
combo_df = (spark.read 
                .format("jdbc") 
                .option("url", jdbcUrl) 
                .option("dbtable", "look_up.dim_country") 
                .option("user", jdbcUsername) 
                .option("password", jdbcPassword) 
                .load()
                .orderBy('country')
)

combo_df.printSchema()
display(combo_df)
combo_df.createOrReplaceTempView("country_combo")

In [0]:
%sql

MERGE INTO look_up.dim_country_scd1 AS target
USING country_combo AS source
ON target.country = source.country
WHEN MATCHED AND (target.country <> source.country OR target.country_code_2_digit <> source.country_code_2_digit OR target.country_code_3_digit <> source.country_code_3_digit OR target.continent <> source.continent OR target.population <> source.population)
  THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY source THEN DELETE;

Use Change Data Feed (CDF) output to sync country_scd2 table

In [0]:
last_version = spark.conf.get('spark.databricks.delta.lastCommitVersionInSession')
change_df3 = spark.sql(f"select * from table_changes('look_up.dim_country_scd1', {last_version}, {last_version})")
change_df3.createOrReplaceTempView('change_df3_view')

In [0]:
%sql

MERGE INTO look_up.dim_country_scd2 AS target
USING (SELECT * FROM change_df3_view WHERE _change_type != 'update_preimage') AS source
ON target.country = source.country

WHEN MATCHED AND source._change_type = 'delete' THEN 
  DELETE

WHEN MATCHED AND target.is_current = TRUE THEN 
  UPDATE SET 
  target.population = source.population

WHEN NOT MATCHED AND source._change_type = 'insert' THEN 
  INSERT(
    country,
    country_code_2_digit,
    country_code_3_digit,
    continent,
    population,
    start_date,
    end_date,
    is_current
  ) VALUES (
    country,
    country_code_2_digit,
    country_code_3_digit,
    continent,
    population,
    CURRENT_TIMESTAMP,
    NULL,
    TRUE
  );