## Hashing the rows from incoming and existing

In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("IcebergWithPySpark")
    .config(
        "spark.jars.packages",
        "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.2,"
        "mysql:mysql-connector-java:8.0.33,"
        "org.apache.hadoop:hadoop-aws:3.3.4,"
        "com.amazonaws:aws-java-sdk-bundle:1.12.262",
    )
    .config(
        "spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    )
    .config("spark.sql.catalog.mysql", "org.apache.iceberg.spark.SparkCatalog")    
    .config("spark.sql.catalog.mysql.type", "jdbc")
    .config(
        "spark.sql.catalog.mysql.uri",
        "jdbc:mysql://localhost:3306/exampledb",
    )
    .config("spark.sql.catalog.mysql.jdbc.user", "exampleuser")
    .config("spark.sql.catalog.mysql.jdbc.password", "examplepass")
    .config("spark.sql.catalog.mysql.jdbc.driver", "com.mysql.cj.jdbc.Driver")
    .config(
        "spark.sql.catalog.mysql.warehouse",
        "hdfs://namenode:9000/user/hive/warehouse",
    )
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000")
    .getOrCreate()
)

25/07/09 09:28:02 WARN Utils: Your hostname, rohitkarki resolves to a loopback address: 127.0.1.1; using 10.13.163.99 instead (on interface wlp4s0)
25/07/09 09:28:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/rohitkarki/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/rohitkarki/.ivy2/cache
The jars for the packages stored in: /home/rohitkarki/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.3_2.12 added as a dependency
mysql#mysql-connector-java added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-490e8767-99c4-40a5-952d-316d3fd814e1;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.3_2.12;1.5.2 in central
	found mysql#mysql-connector-java;8.0.33 in central
	found com.mysql#mysql-connector-j;8.0.33 in central
	found com.google.protobuf#protobuf-java;3.21.9 in central
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 310ms :: artifacts dl 11ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bu

25/07/09 09:28:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
# Check current catalog and namespace
spark.sql("SHOW CURRENT NAMESPACE").show()

spark.sql("CREATE NAMESPACE IF NOT EXISTS mysql.default").show()


spark.sql("SHOW NAMESPACES IN mysql").show()

# If sales namespace exists, try to use it
spark.sql("USE mysql.default")
spark.sql("SHOW TABLES").show()

+-------------+---------+
|      catalog|namespace|
+-------------+---------+
|spark_catalog|  default|
+-------------+---------+

25/07/09 09:28:11 WARN JdbcCatalog: JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1
++
||
++
++

+---------+
|namespace|
+---------+
|  default|
+---------+

+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
|  default|customer_dim|      false|
|  default|       hello|      false|
|  default|      movie_|      false|
+---------+------------+-----------+



In [3]:
from pyspark.sql.functions import md5, sha2, concat_ws, col, lit

def add_hash_column(df, columns_to_hash, hash_type="md5"):
    """Add hash column to DataFrame"""
    if hash_type.lower() == "md5":
        return df.withColumn("row_hash", md5(concat_ws("|", *columns_to_hash)))
    else:
        return df.withColumn("row_hash", sha2(concat_ws("|", *columns_to_hash), 256))

In [3]:
spark.sql("""
SELECT * FROM hello
""").show()

+---+-----+--------+
| id| data|category|
+---+-----+--------+
|  1|hello|       A|
|  2|world|       B|
|  3|  foo|       A|
+---+-----+--------+



                                                                                

In [None]:
incoming_data = [
    (1, "Lol", "A"),  # changed 
    (4, "hello", "B"), # new
]

existing_df = spark.read.table("mysql.default.hello")
existing_df.show()


new_df = spark.createDataFrame(incoming_data, [
    "id", "data", "category"
])
new_df.show()

# Columns to include in hash (exclude metadata columns)
columns_to_hash = [col for col in existing_df.columns if col not in ["row_hash", "updated_at"]]


+---+-----+--------+
| id| data|category|
+---+-----+--------+
|  1|hello|       A|
|  2|world|       B|
|  3|  foo|       A|
+---+-----+--------+

+---+-----+--------+
| id| data|category|
+---+-----+--------+
|  1|  Lol|       A|
|  4|hello|       B|
+---+-----+--------+



In [None]:
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import sha2, concat_ws, col, lit

# def perform_cdc(existing_df, new_df, primary_key="id"):
#     # Read data
#     # perform reading from Iceberg table

#     # Get columns to hash (exclude metadata columns)
#     hash_columns = [c for c in existing_df.columns if c not in [primary_key, "row_hash", "updated_at", "active"]]
    
#     # Add hash columns
#     target_hashed = existing_df.withColumn("row_hash", sha2(concat_ws("|", *hash_columns), 256))
#     source_hashed = new_df.withColumn("row_hash", sha2(concat_ws("|", *hash_columns), 256))
    
#     # Find new
#     new_rows = source_hashed.join(
#         target_hashed.select(primary_key, "row_hash"),
#         [primary_key],
#         "left_anti"
#     )
#     print("New DATAS")
#     new_rows.show()
#     # Detect changed rows: same ID, but different row_hash
#     source_alias = source_hashed.alias("src")
#     target_alias = target_hashed.alias("tgt")

#     changed_rows = source_alias.join(
#         target_alias.select(primary_key, "row_hash"),
#         on=primary_key,
#         how="inner"
#     ).filter(
#         col("src.row_hash") != col("tgt.row_hash")
#     ).select("src.*")
    
#     print("Changed DATAS")
#     changed_rows.show()
#     # Union new and changed rows
#     result_df = new_rows.union(changed_rows)

#     return result_df

# # Example usage
# result_df = perform_cdc(
#     existing_df,
#     target_df
# )
# result_df.show()
# result_df.drop("row_hash").writeTo("hello").overwritePartitions()

New DATAS
+---+----+--------+--------------------+
| id|data|category|            row_hash|
+---+----+--------+--------------------+
|  1| Lol|       A|b310add6c3e16f67b...|
+---+----+--------+--------------------+

Changed DATAS
+---+-----+--------+--------------------+
| id| data|category|            row_hash|
+---+-----+--------+--------------------+
|  4|hello|       B|ee6f2af077718ff5c...|
+---+-----+--------+--------------------+

+---+-----+--------+--------------------+
| id| data|category|            row_hash|
+---+-----+--------+--------------------+
|  1|  Lol|       A|b310add6c3e16f67b...|
|  4|hello|       B|ee6f2af077718ff5c...|
+---+-----+--------+--------------------+



                                                                                

In [None]:
# source_df is the new data, target_df is the existing data
# IN CDC nomenclature, source is the new data and target is the existing data

from pyspark.sql import functions as F


incoming_data = [
    (1, "Lol", "A"),  # changed 
    (4, "hello", "B"), # new
]

# Read existing data
existing_df = spark.read.table("mysql.default.hello")
print("EXISTING DATA")
existing_df.show()

# Create new dataframe
new_df = spark.createDataFrame(incoming_data, ["id", "data", "category"])
print("new data")
new_df.show()

from pyspark.sql import SparkSession
from pyspark.sql.functions import sha2, concat_ws, col, lit, current_timestamp

def perform_cdc(existing_df, new_df, primary_key="id"):
    # Get columns to hash (exclude metadata columns)
    hash_columns = [c for c in existing_df.columns if c not in ["row_hash", "updated_at", "active"]]
    
    # Add hash columns and timestamp
    source_hashed = new_df.withColumn("row_hash", sha2(concat_ws("|", *new_df.columns), 256)) \
                          .withColumn("updated_at", current_timestamp()) \
                          .withColumn("active", lit(1))
    
    # For existing data, keep the existing hash if columns exist
    if "row_hash" in existing_df.columns and "updated_at" in existing_df.columns and "active" in existing_df.columns:
        target_hashed = existing_df
    else:
        target_hashed = existing_df.withColumn("row_hash", sha2(concat_ws("|", *hash_columns), 256)) \
                                   .withColumn("updated_at", current_timestamp()) \
                                   .withColumn("active", lit(1))
    
    # Find new rows (not in target)
    new_rows = source_hashed.join(
        target_hashed.select(primary_key),
        [primary_key],
        "left_anti"
    )
    print("New DATA:")
    new_rows.show()
    
    # Detect changed rows: same ID, but different row_hash
    source_alias = source_hashed.alias("src")
    target_alias = target_hashed.alias("tgt")

    # changed_rows = source_alias.join(
    #     target_alias.select(primary_key, "row_hash"),
    #     on=primary_key,
    #     how="inner"
    # ).filter(
    #     col("src.row_hash") != col("tgt.row_hash")
    # ).select("src.*")
    
    # print("Changed DATA:")
    # changed_rows.show()

    # Detect changed rows (same ID but different hash)
    changed_rows_new = source_hashed.alias("src").join(
        target_hashed.alias("tgt").select(primary_key, "row_hash"),
        primary_key,
        "inner"
    ).filter("src.row_hash != tgt.row_hash").select("src.*")
    
    # Mark OLD versions of changed rows as inactive (active=0)
    changed_rows_old = target_hashed.alias("tgt").join(
        changed_rows_new.select(primary_key).alias("src"),
        primary_key,
        "inner"
    ).withColumn("active", F.lit(0)) \
     .withColumn("updated_at", F.current_timestamp())
    
    print("OLD Versions (marked active=0):")
    changed_rows_old.show()
    print("NEW Versions (active=1):")
    changed_rows_new.show()
    
    # Unchanged rows (remain as-is)
    unchanged_rows = target_hashed.join(
        source_hashed.select(primary_key),
        primary_key,
        "left_anti"
    ).filter("active == 1")  # Keep only active rows

    # Combine all: new, updated (new=1 + old=0), and unchanged
    result_df = new_rows.union(changed_rows_new) \
                        .union(changed_rows_old) \
                        .union(unchanged_rows)
    
    return result_df

# Perform CDC
result_df = perform_cdc(existing_df, new_df)
print("Final Result:")
result_df.show()

# Write back to table
# result_df.drop("row_hash").write.mode("overwrite").saveAsTable("mysql.default.hello")

EXISTING DATA
+---+-----+--------+
| id| data|category|
+---+-----+--------+
|  1|hello|       A|
|  2|world|       B|
|  3|  foo|       A|
+---+-----+--------+

new data
+---+-----+--------+
| id| data|category|
+---+-----+--------+
|  1|  Lol|       A|
|  4|hello|       B|
+---+-----+--------+

New DATA:
+---+-----+--------+--------------------+--------------------+------+
| id| data|category|            row_hash|          updated_at|active|
+---+-----+--------+--------------------+--------------------+------+
|  4|hello|       B|ef9cb88bf62ccf2b7...|2025-07-08 16:16:...|     1|
+---+-----+--------+--------------------+--------------------+------+

OLD Versions (marked active=0):
+---+-----+--------+--------------------+--------------------+------+
| id| data|category|            row_hash|          updated_at|active|
+---+-----+--------+--------------------+--------------------+------+
|  1|hello|       A|01de35f326f8d903c...|2025-07-08 16:16:...|     0|
+---+-----+--------+---------

In [81]:
spark.sql("""
CREATE TABLE hello (
id bigint,
data string,
category string)
""").show()

++
||
++
++



In [82]:
spark.sql("""
INSERT INTO mysql.default.hello VALUES
(1, "hello", "A"),
(2, "world", "B"),
(3, "foo", "A")
""").show()

++
||
++
++

