# Spark Session Start
### I have used DataBricks Community Edition for this project. I have used 15.3 (Apache Spark 3.5.0, Scala 2.12)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

spark = SparkSession.builder \
    .appName("KafkaDebezium") \
    .getOrCreate()

# Schema of the data

In [None]:
schema = StructType([
    StructField("schema", StructType([
        StructField("type", StringType(), True),
        StructField("fields", StructType([
            StructField("type", StringType(), True),
            StructField("optional", StringType(), True),
            StructField("default", StringType(), True),
            StructField("field", StringType(), True)
        ]), True),
        StructField("optional", StringType(), True),
        StructField("name", StringType(), True),
        StructField("version", StringType(), True)
    ]), True),

    StructField("payload", StructType([
        StructField("before", StructType([
            StructField("id", IntegerType(), True),
            StructField("first_name", StringType(), True),
            StructField("last_name", StringType(), True),
            StructField("email", StringType(), True),
            StructField("phone", StringType(), True),
            StructField("created_at", LongType(), True)
        ]), True),
        StructField("after", StructType([
            StructField("id", IntegerType(), True),
            StructField("first_name", StringType(), True),
            StructField("last_name", StringType(), True),
            StructField("email", StringType(), True),
            StructField("phone", StringType(), True),
            StructField("created_at", LongType(), True)
        ]), True),
        StructField("source", StructType([
            StructField("version", StringType(), True),
            StructField("connector", StringType(), True),
            StructField("name", StringType(), True),
            StructField("ts_ms", LongType(), True),
            StructField("snapshot", StringType(), True),
            StructField("db", StringType(), True),
            StructField("sequence", StringType(), True),
            StructField("schema", StringType(), True),
            StructField("table", StringType(), True),
            StructField("txId", LongType(), True),
            StructField("lsn", LongType(), True),
            StructField("xmin", LongType(), True)
        ]), True),
        StructField("op", StringType(), True),
        StructField("ts_ms", LongType(), True),
        StructField("transaction", StructType([
            StructField("id", StringType(), True),
            StructField("total_order", LongType(), True),
            StructField("data_collection_order", LongType(), True)
        ]), True)
    ]), True)
])

## Read the data from Kafka

In [None]:
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "<VM-External-IP>:9092") \
    .option("subscribe", "dbserver1.public.customers") \
    .load()

# Change the value column to a string
kafka_df = kafka_df.selectExpr("CAST(value AS STRING)")

### Change the JSON data to a DataFrame

In [None]:
json_df = kafka_df.select(from_json(col("value"), schema).alias("data"))

# Filter for only update operations
json_df = json_df.filter(col("data.payload.op") == "u")

### Change the data to a table

In [None]:
json_df = json_df.select(
    col("data.payload.after.id").alias("id"),
    col("data.payload.after.first_name").alias("first_name"),
    col("data.payload.after.last_name").alias("last_name"),
    col("data.payload.after.email").alias("email"),
    col("data.payload.after.phone").alias("phone"),
    col("data.payload.after.created_at").alias("created_at")
)

# Write the stream to memory
query = json_df.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("customers") \
    .start()

### Query the data

In [None]:
# An example of how to query the stream
spark.sql("SELECT * FROM customers").show()


+---+----------+---------+------------------+---------+----------------+
| id|first_name|last_name|             email|    phone|      created_at|
+---+----------+---------+------------------+---------+----------------+
|  3|    Alice3|      Doe|example4@email.com|123456789|1726082790625207|
|  3|    Alice2|      Doe|example4@email.com|123456789|1726082790625207|
|  3|    Alice1|      Doe|example4@email.com|123456789|1726082790625207|
+---+----------+---------+------------------+---------+----------------+

