The goal of this notebook is to present to the user the changes that occured in a delta table.
It takes advantange of the delta versioning and dynamically generates the base SQL for the results.
For this to work, an "information_schema" table needs to be filled with the metadata of each table to process. This assumes there's no information schema to get this information from i.e. no Unity Catalog.

In [49]:
%%pyspark
debug = True

StatementMeta(smallpool3dot3, 1, 2, Finished, Available)

Some information was taken from: https://learn.microsoft.com/en-us/azure/databricks/delta/delta-change-data-feed

In [50]:
-- Create the "information_schema" table
-- PK columns MUST be 1st in the list
-- 
DROP TABLE IF EXISTS information_schema;

CREATE TABLE information_schema(table_name STRING, column_name STRING, column_pos INT, is_key BOOLEAN, is_ignore BOOLEAN);

INSERT INTO information_schema VALUES('students', 'id',    1, true,  false),
                                     ('students', 'name',  2, false, true),
                                     ('students', 'grade', 3, false, false),
                                     ('students', 'year',  4, false, false),
                                     ('students', 'age',   5, false, false);


StatementMeta(, 1, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [51]:
-- This is a data table to be the target of fetching changes

DROP TABLE IF EXISTS students; -- droped to clean any previous delta changes

-- Creation will be change # 0
CREATE TABLE students(id INT, name STRING, grade DOUBLE, year INT, age INT)
USING DELTA
TBLPROPERTIES (delta.enableChangeDataFeed = true);

-- Insert will be change # 1
INSERT INTO students VALUES(4, "Ted",     4.7, 2020, 20),
                           (5, "Tiffany", 5.5, 2021, 21),
                           (6, "Vini",    6.3, 2022, 22);

-- Update will be change # 2
UPDATE students SET grade = grade + 1, year = year + 1 WHERE name LIKE "T%";

SELECT * FROM students;

StatementMeta(, 1, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 1 rows and 1 fields>

<Spark SQL result set with 3 rows and 5 fields>

In [52]:
DESCRIBE EXTENDED students -- Property "Provider" shows it's a delta table

StatementMeta(smallpool3dot3, 1, 11, Finished, Available)

<Spark SQL result set with 15 rows and 3 fields>

In [53]:
DESCRIBE HISTORY default.students -- List delta changes

StatementMeta(smallpool3dot3, 1, 12, Finished, Available)

<Spark SQL result set with 3 rows and 15 fields>

In [54]:
%%pyspark

# Different ways to get delta changes

# Between 2 delta versions (as ints or longs e.g. changes from version 1 to 2)
#df = spark.read.format("delta").option("readChangeFeed", "true").option("startingVersion", 1).option("endingVersion", 2).table('students')

# Between timestamps (as string formatted timestamps)
#df = spark.read.format("delta").option("readChangeFeed", "true").option("startingTimestamp", '2021-04-21 05:45:46').option("endingTimestamp", '2021-05-21 12:00:00').table('students')

# Providing only the starting version/timestamp
df = spark.read.format("delta").option("readChangeFeed", "true").option("startingVersion", 2).table('students')

# With database/schema names inside the string for table name, with backticks for escaping dots and special characters
# SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

# With path based tables
# SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')

display(df)

StatementMeta(smallpool3dot3, 1, 13, Finished, Available)

SynapseWidget(Synapse.DataFrame, aa603a14-ceb3-45fd-91c6-6b35696382a0)

In [55]:
%%pyspark

# Create a temporary view to the delta changes

df.createOrReplaceTempView('V_CHANGE_students')

StatementMeta(smallpool3dot3, 1, 14, Finished, Available)

In [56]:
-- This is the query we want to generate dynamically

SELECT
  s1.id,
  CASE WHEN s1.grade <> s2.grade THEN concat(s1.grade, ' -> ', s2.grade) ELSE '' END AS grade,
  CASE WHEN s1.year  <> s2.year  THEN concat(s1.year,  ' -> ', s2.year)  ELSE '' END AS year,
  CASE WHEN s1.age   <> s2.age   THEN concat(s1.age,   ' -> ', s2.age)   ELSE '' END AS age,
  now() as sysdate
FROM V_CHANGE_students s1
INNER JOIN V_CHANGE_students s2 ON (s2.id = s1.id)
WHERE s1._change_type IN ('update_preimage', 'insert') AND s2._change_type = 'update_postimage' AND
      (s1.grade <> s2.grade OR s1.year <> s2.year OR s1.age <> s2.age)

StatementMeta(smallpool3dot3, 1, 15, Finished, Available)

<Spark SQL result set with 2 rows and 5 fields>

In [62]:
%%pyspark
# Dynamically generate the SQL queries for all tables in the "information_schema" table

import pandas as pd

table_list = spark.sql('SELECT DISTINCT(table_name) AS table_name FROM information_schema').toPandas()
if(debug): print(table_list)

final = [] # list of final results

for indx, tables_row in table_list.iterrows():
    table_name = tables_row['table_name']
    print('Processing table ' + table_name)

    sql = "SELECT column_name, is_key FROM information_schema WHERE table_name = '" + table_name + "' AND is_ignore = false ORDER BY column_pos"
    column_list = spark.sql(sql).toPandas()
    if(debug): print(column_list)

    full_sql   = 'SELECT ' # the SQL query for each table
    key_sql    = ''        # the condition for the self join
    pred_sql   = ''        # the comparison between all value columns
    id_vars    = []        # list of PK's for the pivot operation
    value_vars = []        # list of value columns for the pivot operation

    for indx2, column_row in column_list.iterrows():
        column_name = column_row['column_name']
        is_key      = column_row['is_key']

        if(is_key):
            full_sql = full_sql + '\ns1.' + column_name + ',\n'
            if(key_sql != ''): key_sql = key_sql + " AND "
            key_sql = key_sql + "s2." + column_name + " = s1." + column_name
            id_vars = id_vars + [column_name]
        else:
            full_sql = full_sql + "CASE WHEN s1." + column_name + " <> s2." + column_name + " THEN " \
                                "concat(s1." + column_name + ", ' -> ', s2. " + column_name + ") " \
                                "ELSE '' END AS " + column_name + ",\n"
            pred_sql = pred_sql + ("(" if pred_sql == '' else " OR ")
            pred_sql = pred_sql + "s1." + column_name + " <> s2." + column_name
            value_vars = value_vars + [column_name]

    full_sql = full_sql + "now() as sysdate\n" + \
                        "FROM V_CHANGE_" + table_name + " s1\n" + \
                        "INNER JOIN V_CHANGE_" + table_name + " s2 ON (" + key_sql + ")\n" + \
                        "WHERE s1._change_type IN ('update_preimage', 'insert') AND s2._change_type = 'update_postimage' AND\n" + \
                        pred_sql + ")"
    
    if(debug): print('SQL:\n' + full_sql)

    final.append([table_name, full_sql, id_vars, value_vars])

    if(debug): print('id_vars:', id_vars)
    if(debug): print('value_vars:', value_vars)

if(debug): print('final:', final)


StatementMeta(smallpool3dot3, 1, 21, Finished, Available)

  table_name
0   students
Processing table students
  column_name  is_key
0          id    True
1       grade   False
2        year   False
3         age   False
SQL:
SELECT 
s1.id,
CASE WHEN s1.grade <> s2.grade THEN concat(s1.grade, ' -> ', s2. grade) ELSE '' END AS grade,
CASE WHEN s1.year <> s2.year THEN concat(s1.year, ' -> ', s2. year) ELSE '' END AS year,
CASE WHEN s1.age <> s2.age THEN concat(s1.age, ' -> ', s2. age) ELSE '' END AS age,
now() as sysdate
FROM V_CHANGE_students s1
INNER JOIN V_CHANGE_students s2 ON (s2.id = s1.id)
WHERE s1._change_type IN ('update_preimage', 'insert') AND s2._change_type = 'update_postimage' AND
(s1.grade <> s2.grade OR s1.year <> s2.year OR s1.age <> s2.age)
id_vars: ['id']
value_vars: ['grade', 'year', 'age']
final: [['students', "SELECT \ns1.id,\nCASE WHEN s1.grade <> s2.grade THEN concat(s1.grade, ' -> ', s2. grade) ELSE '' END AS grade,\nCASE WHEN s1.year <> s2.year THEN concat(s1.year, ' -> ', s2. year) ELSE '' END AS year,\nCASE WHEN s1.ag

In [64]:
SELECT 
s1.id,
CASE WHEN s1.grade <> s2.grade THEN concat(s1.grade, ' -> ', s2. grade) ELSE '' END AS grade,
CASE WHEN s1.year <> s2.year THEN concat(s1.year, ' -> ', s2. year) ELSE '' END AS year,
CASE WHEN s1.age <> s2.age THEN concat(s1.age, ' -> ', s2. age) ELSE '' END AS age,
now() as sysdate
FROM V_CHANGE_students s1
INNER JOIN V_CHANGE_students s2 ON (s2.id = s1.id)
WHERE s1._change_type IN ('update_preimage', 'insert') AND s2._change_type = 'update_postimage' AND
(s1.grade <> s2.grade OR s1.year <> s2.year OR s1.age <> s2.age)

StatementMeta(smallpool3dot3, 1, 23, Finished, Available)

<Spark SQL result set with 2 rows and 5 fields>

In [70]:
%%pyspark

# Get results for one table, melt (unpivot), clean

table_name, full_sql, id_vars, value_vars = final[0][0], final[0][1], final[0][2], final[0][3]
if(debug): print('table_name:', table_name)
if(debug): print('full_sql:',   full_sql)
if(debug): print('id_vars:',    id_vars)
if(debug): print('value_vars:', value_vars)

# execute SQL quesry
df = spark.sql(full_sql).toPandas()
display(df)

# melt the dataset
df_melted = df.melt(id_vars, value_vars).sort_values(by=id_vars + ['variable'])
display(df_melted)

# clean the dataset
df_melted_clean = df_melted[df_melted['value'] != '']
display(df_melted_clean)


StatementMeta(smallpool3dot3, 1, 29, Finished, Available)

table_name: students
full_sql: SELECT 
s1.id,
CASE WHEN s1.grade <> s2.grade THEN concat(s1.grade, ' -> ', s2. grade) ELSE '' END AS grade,
CASE WHEN s1.year <> s2.year THEN concat(s1.year, ' -> ', s2. year) ELSE '' END AS year,
CASE WHEN s1.age <> s2.age THEN concat(s1.age, ' -> ', s2. age) ELSE '' END AS age,
now() as sysdate
FROM V_CHANGE_students s1
INNER JOIN V_CHANGE_students s2 ON (s2.id = s1.id)
WHERE s1._change_type IN ('update_preimage', 'insert') AND s2._change_type = 'update_postimage' AND
(s1.grade <> s2.grade OR s1.year <> s2.year OR s1.age <> s2.age)
id_vars: ['id']
value_vars: ['grade', 'year', 'age']


  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


SynapseWidget(Synapse.DataFrame, d28cb1da-c115-4e44-97d4-2d78cfb1057b)

  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]


SynapseWidget(Synapse.DataFrame, a3a9e541-cbc3-49dd-8856-22d11825276c)

SynapseWidget(Synapse.DataFrame, 04410818-9eaf-43c7-a066-baae744973b4)

If we want to select the changes between 2 versions (or timestamps), then we may have several changes applied to the same PK.
This means we need to extract only the oldest and newest record for each PK

In [78]:
%%pyspark

# Select changes between 2 versions

df = spark.read.format("delta").option("readChangeFeed", "true").option("startingVersion", 1).option("endingVersion", 2).table('students').orderBy("ID", "_commit_version")

df.createOrReplaceTempView('v_all_changes_students')

display(df)

StatementMeta(smallpool3dot3, 1, 37, Finished, Available)

SynapseWidget(Synapse.DataFrame, c9b6bce0-8923-46cb-8c06-c9f2eddfde26)

In [81]:
-- Create a temporaty view for a result that holds multiple changes for each PK into a table with 1-2 records per PK (oldest and newest)

-- We can recreate the same view as before as it has the same structure
-- This view can also be generated dynamically
CREATE OR REPLACE TEMP VIEW V_CHANGE_students AS
SELECT * FROM v_all_changes_students s1
WHERE (
  s1._commit_timestamp = (SELECT MAX(s2._commit_timestamp) FROM v_all_changes_students s2 WHERE s2.id = s1.id AND s2._change_type = s2._change_type)
  AND s1._change_type IN ('update_postimage', 'insert')
) OR (
s1._commit_timestamp = (SELECT MIN(s2._commit_timestamp) FROM v_all_changes_students s2 WHERE s2.id = s1.id AND s2._change_type = s2._change_type)
AND s1._change_type IN ('update_preimage', 'insert')
)
ORDER BY id, _commit_timestamp DESC;

SELECT * FROM V_CHANGE_students ORDER BY ID, _commit_version

-- We can now use this view as an input to the generation of changes, as before

StatementMeta(, 1, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 5 rows and 8 fields>