In [0]:
from datetime import datetime, timedelta
from delta.tables import DeltaTable
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

In [0]:
%sql
create or replace table demo_catalog.demo_schema.members(
    id long,
    member_id string,
    name string,
    email string,
    phone string,
    favorite_store_id int,
    last_purchase_date date,
    member_type_rfm string,
    member_category string,
    status int,
    verified int,
    photo_id int,
    created_at timestamp,
    updated_at timestamp,
    deleted_at timestamp
)

In [0]:
# function to read data from MySQL, return DataFrame
# don't add ; at the end of query
# set useLegacyDatetimeCode to false if you want the utc timestamp from timestamp columns
# use databricks secrets for username and passowrd

def read_from_mysql(database_name, query):
    try:
        df = (
            spark.read.format("jdbc")
            .option("driver", "org.mariadb.jdbc.Driver")
            .option(
                "url",
                f"host_address/{database_name}?useLegacyDatetimeCode=false",
            )
            .option("dbtable", f"({query}) as result")
            .option("user", dbutils.secrets.get("jdbc", "demo_username"))
            .option("password", dbutils.secrets.get("jdbc", "demo_password"))
            .load()
        )
    except Exception as error:
        return error

    return df


In [0]:
# check schema
df = read_from_mysql("demo_database", "select * from demo_database.members limit 10")
df.display()

id,member_id,name,email,phone,favorite_store_id,last_purchase_date,member_type_rfm,member_category,status,verified,photo_id,created_at,updated_at,deleted_at
1,10001,member1,demo@gmail.com,0,1,2023-11-21,,,1,1,10001,2023-11-21T00:00:00Z,2023-11-21T00:00:00Z,
2,10002,member2,demo@gmail.com,0,1,2023-11-21,,,1,1,10002,2023-11-21T05:11:30Z,2023-11-21T05:11:30Z,
3,10003,member3,demo@gmail.com,0,1,2023-11-21,,,1,1,10003,2023-11-21T05:35:45Z,2023-11-21T05:35:45Z,
4,10004,member4,demo@gmail.com,0,1,2023-11-21,,New,1,1,10004,2023-11-21T05:38:33Z,2023-11-21T05:38:33Z,
5,10005,member5,demo@gmail.com,0,1,2023-11-21,,New,1,1,10005,2023-11-21T05:38:56Z,2023-11-21T05:38:56Z,
6,10006,member6,demo@gmail.com,0,1,2023-11-21,,New,1,1,10006,2023-11-21T05:39:07Z,2023-11-21T05:39:07Z,
7,10007,member7,demo@gmail.com,0,1,2023-11-21,,New,1,1,10007,2023-11-21T05:39:18Z,2023-11-21T05:39:18Z,
8,10008,member8,demo@gmail.com,0,1,2023-11-21,,New,1,1,10008,2023-11-21T05:39:35Z,2023-11-21T05:39:35Z,
9,10009,member9,demo@gmail.com,0,1,2023-11-21,,New,1,1,10009,2023-11-21T05:39:45Z,2023-11-21T05:39:45Z,
10,10010,member10,demo@gmail.com,0,1,2023-11-21,,New,1,1,10010,2023-11-21T05:40:05Z,2023-11-21T05:40:05Z,


Example 1 <br> 
When the rows in table getting update, we can add updated_at column at the source table and read the changes rows with that column.

In [0]:
current_timestamp = datetime.now()

# get maximum updated timestamp from the existing table
df_current = spark.sql("select max(updated_at) as max_timestamp from demo_catalog.demo_schema.members")

df_current = df_current.select("max_timestamp").collect()[0]
max_timestamp = df_current["max_timestamp"]

print("current timestamp:", current_timestamp)
print("max timestamp:", max_timestamp)

current timestamp: 2023-11-23 02:25:18.875175
max timestamp: 2023-11-23 02:18:24


In [0]:
# read only updated rows from posdb 
query = f"select * from demo_database.members where updated_at <= '{current_timestamp}' and updated_at >= '{max_timestamp}'"

new_data = read_from_mysql("demo_database", query)
new_data.display()

id,member_id,name,email,phone,favorite_store_id,last_purchase_date,member_type_rfm,member_category,status,verified,photo_id,created_at,updated_at,deleted_at
345,10345,membe345,demo@gmail.com,0,1,2023-11-23,,New,1,1,10345,2023-11-23T02:18:24Z,2023-11-23T02:18:24Z,
346,10346,membe346,demo@gmail.com,0,1,2023-11-23,,New,1,1,10346,2023-11-23T02:19:20Z,2023-11-23T02:19:20Z,
347,10347,membe347,demouser@gmail.com,0,1,2023-11-23,,New,1,1,10347,2023-11-23T02:19:34Z,2023-11-23T02:20:33Z,
348,10348,membe348,demo@gmail.com,0,1,2023-11-23,,New,1,1,10348,2023-11-23T02:19:52Z,2023-11-23T02:19:52Z,
349,10349,membe349,demo@gmail.com,0,1,2023-11-23,,New,1,1,10349,2023-11-23T02:23:27Z,2023-11-23T02:23:27Z,
350,10350,membe350,demo@gmail.com,0,1,2023-11-23,,New,1,1,10350,2023-11-23T02:23:46Z,2023-11-23T02:23:46Z,


In [0]:
# spark and delta configs for automerge 
# so we don't need to worry for new schema or deleted schema 

if spark.conf.get("spark.databricks.delta.schema.autoMerge.enabled") == "false":
    spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
    print("set to true")
elif spark.conf.get("spark.databricks.delta.schema.autoMerge.enabled") == "true":
    print("true")
else:
    print("unknown value")

In [0]:
deltaTable = DeltaTable.forName(spark, "demo_catalog.demo_schema.members")

(
    deltaTable.alias("current")
    .merge(new_data.alias("new"), "new.id = current.id")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

In [0]:
# we need to use merge instand of append
# because when reading data with where condition, we use => where updated_at <= '{current_timestamp}' 
# so the row with the current max timestamp will always include in the query
# because when we don't use less then equal and when there are two exactly timestamps in the source table, then we only get one.
# just need to make sure we got all data and not missing any row.
# even in Kimball book mentioned that this approach is wrong, I think it is okay if we know our data and know how to handle it.

In [0]:
# df = spark.table("demo_catalog.demo_schema.members")
# df.limit(10).display()

In [0]:
#

Example 2 <br>
When the source table is insert only, then we can use watermark table or any way to get the current max id. That way, we can read the changes rows only.

In [0]:
%sql
create or replace table demo_catalog.demo_schema.watermark_table(
  table_name string,
  max_id long
);

-- start with 0
insert into demo_catalog.demo_schema.watermark_table (table_name, max_id) values ("members", 0);

In [0]:
#get maximum id from the existing table
df_current = spark.sql("select max_id from demo_catalog.demo_schema.watermark_table where table_name = 'members' ")
df_current = df_current.select("max_id").collect()[0]
current_max_id = df_current["max_id"]

#get maximum id from the source table
df_source = read_from_mysql("demo_database", "select max(id) as max_id from demo_database.members ")
df_source = df_source.select("max_id").collect()[0]
source_max_id = df_source["max_id"]

print("current max id:", current_max_id)
print("source max id:", source_max_id)

In [0]:
# read only updated rows from posdb 
query = f"select * from demo_database.members where updated_at <= '{current_timestamp}' and updated_at >= '{max_timestamp}'"

new_data = read_from_mysql("demo_database", query)
new_data.display()

In [0]:
deltaTable = DeltaTable.forName(spark, "demo_catalog.demo_schema.members")

(
    deltaTable.alias("current")
    .merge(new_data.alias("new"), "new.id = current.id")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

In [0]:
%sql
update demo_catalog.demo_schema.watermark_table set max_id = 100 where table_name = 'members'

In [0]:
#