In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# 1. Start Spark
spark = SparkSession.builder \
    .appName("MySQL Replication with Update on Change") \
    .config("spark.jars", "/Users/mugesh_krishna/Downloads/mysql-connector-j-9.3.0/mysql-connector-j-9.3.0.jar") \
    .getOrCreate()

jdbc_url = "jdbc:mysql://localhost:3306/samplebill"
properties = {
    "user": "root",
    "password": "mugesh585",
    "driver": "com.mysql.cj.jdbc.Driver"
}

# 2. Load source and target tables
billinfo_df = spark.read.jdbc(url=jdbc_url, table="billinfo", properties=properties)
billrepli_df = spark.read.jdbc(url=jdbc_url, table="billrepli", properties=properties)

# 3. Join on primary key (billid)
joined_df = billinfo_df.alias("src").join(
    billrepli_df.alias("tgt"), on="billid", how="outer"
)

# 4. Identify records that are new or changed
changed_df = joined_df.filter(
    (col("tgt.billid").isNull()) |  # new record
    (col("src.name") != col("tgt.name")) |
    (col("src.origin") != col("tgt.origin")) |
    (col("src.desti") != col("tgt.desti")) |
    (col("src.billdate") != col("tgt.billdate")) |
    (col("src.deliverydate") != col("tgt.deliverydate")) |
    (col("src.ispackage") != col("tgt.ispackage"))
).select(
    "src.billid", "src.name", "src.origin", "src.desti", "src.billdate", "src.deliverydate", "src.ispackage"
)

# 5. Write updated records back to billrepli
# Note: Replace/merge logic requires either:
# - temp staging table + manual update
# - OR truncate and reinsert all changed (if safe)

# Option 1: Upsert using overwrite for changed rows only
if changed_df.count() > 0:
    changed_df.write.jdbc(
        url=jdbc_url,
        table="billrepli_staging",
        mode="overwrite",
        properties=properties
    )

    print(f"{changed_df.count()} records written to staging table.")
=

else:
    print("No new or changed records found.")
