In [None]:
# Import required PySpark functions
from pyspark.sql.functions import when
from pyspark.sql import SparkSession

# Create Spark session and attach MySQL JDBC driver
spark = SparkSession.builder \
    .appName("pyspark project") \
    .config(
        "spark.jars",
        r"C:\Users\chint\OneDrive\Desktop\connector\mysql-connector-j-8.0.33\mysql-connector-j-8.0.33.jar"
    ) \
    .getOrCreate()

# -------------------------
# FULL LOAD (Base data)
# -------------------------
# Read the existing (full) dataset from CSV
full_load = spark.read.csv(
    r"C:\Users\chint\PycharmProjects\PythonProject\officeData",
    header=False,
    inferSchema=True
)
# Rename default CSV column names to meaningful names
full_load = full_load \
    .withColumnRenamed("_c0", "id") \
    .withColumnRenamed("_c1", "fullname") \
    .withColumnRenamed("_c2", "city")

# -------------------------
# UPDATED / CDC LOAD
# -------------------------

# Read incremental / CDC data (Insert, Update, Delete)
Updated_load = spark.read.csv(
    r"C:\Users\chint\PycharmProjects\PythonProject\officeD",
    header=False,
    inferSchema=True
)

# Rename columns:
# status -> operation type (I, U, D)
Updated_load = Updated_load \
    .withColumnRenamed("_c0", "status") \
    .withColumnRenamed("_c1", "id") \
    .withColumnRenamed("_c2", "fullname") \
    .withColumnRenamed("_c3", "city")

# -------------------------
# APPLY CDC LOGIC
# -------------------------

# Iterate through each CDC record
for row in Updated_load.collect():

    # UPDATE operation
    # If status is 'U', update fullname and city for matching id
    if row["status"] == "U":
        full_load = full_load.withColumn(
            "fullname",
            when(full_load["id"] == row["id"], row["fullname"])
            .otherwise(full_load["fullname"])
        )

        full_load = full_load.withColumn(
    "city",
            when(full_load["id"] == row["id"], row["city"])
            .otherwise(full_load["city"])
        )

    # INSERT operation
    # If status is 'I', create a new DataFrame and union it
    if row["status"] == "I":
        insertedRow = [list(row)[1:]]   # remove status column
        columns = ["id", "FullName", "city"]

        newDF = spark.createDataFrame(insertedRow, columns)
        full_load = full_load.union(newDF)

    # DELETE operation
    # If status is 'D', remove matching id from full_load
    if row["status"] == "D":
        full_load = full_load.filter(full_load.id != row["id"])

# -------------------------
# WRITE FINAL DATA TO MYSQL
# -------------------------

# Write the final DataFrame into MySQL table
full_load.write \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/mani_schema") \
    .option("dbtable", "mani_schema.Persons") \
    .option("user", "root") \
    .option("password", "manikanta396@") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .mode("overwrite") \
    .save()