In [1]:
pip install delta-spark==3.2.0

Collecting delta-spark==3.2.0
  Downloading delta_spark-3.2.0-py3-none-any.whl.metadata (2.0 kB)
Collecting py4j==0.10.9.7 (from pyspark<3.6.0,>=3.5.0->delta-spark==3.2.0)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading delta_spark-3.2.0-py3-none-any.whl (21 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: py4j, delta-spark
Successfully installed delta-spark-3.2.0 py4j-0.10.9.7
Note: you may need to restart the kernel to use updated packages.


In [None]:
import json, os, re
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-spark_2.12:3.2.0,org.apache.hadoop:hadoop-common:3.3.4,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262,org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

from pyspark.sql import SparkSession


from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *

from delta import *
from delta.tables import *

# Initialize Delta Spark Session
spark = SparkSession.builder \
    .appName("write to minio") \
    .master('spark://spark-master:7077') \
    .config('spark.executor.cores', '1') \
    .config('spark.executor.instances', '1') \
    .config('spark.driver.cores', '1') \
    .config('spark.cores.max', '3') \
    .config('spark.executor.memory', '512m') \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000/") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

schema = StructType([
    StructField("before", StringType(), True),
    StructField("after", StringType(), True)
])

def read_stream_from_kafka_topic(topic):
    return (spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "kafka:9092")
            .option("subscribePattern", topic)
            .option("startingOffsets", "earliest")
            .option("failOnDataLoss", "false")
            .load()
            # filter out empty values
            .withColumn("value", expr("string(value)"))
            .withColumn('value', 
                        from_json(col("value"), schema))
            .select(
              # offset must be the first field, due to aggregation
              expr("offset as kafka_offset"),
              expr("timestamp as kafka_ts"),
              expr("string(key) as kafka_key"),
              "value.*",
              "topic"
            )
           )


def get_schema_from_table(df):
    df = df.select("after")
    df_read = spark.read.json(df.rdd.map(lambda x: x.after), multiLine=True)
    schema = df_read.schema.json()
    return StructType.fromJson(json.loads(schema))

def write_to_delta(df, batch_id):
    print("Data input")
    df.show()
    delta_table_path = "s3a://mysql/brozes/"
    topic_array = df.select("topic").rdd.flatMap(lambda x: x).collect()

    unique_topics = list(set(topic_array))
    for path in unique_topics:
        name = path.split(".")[-1]
        delta_paths = delta_table_path + name
        print(delta_paths)

        df_null_data = (df.filter(col("topic") == path)
            .select("kafka_key", expr("struct(*) as d"))
            .groupBy("kafka_key")
            .agg(expr("max(d) d")) 
            .select('d.kafka_key', 
                    'd.kafka_offset', 
                    'd.kafka_ts', 
                    'd.after',
                    'd.before',
                    'd.topic'
            )
            .filter(col("after").isNull()))
        
        # print("Null data")
        # df_null_data.show()

        df_notNull_data = (df.filter(col("topic") == path)
            .select("kafka_key", expr("struct(*) as d"))
            .groupBy("kafka_key")
            .agg(expr("max(d) d")) 
            .select('d.kafka_key', 
                    'd.kafka_offset', 
                    'd.kafka_ts', 
                    'd.after',
                    'd.before',
                    'd.topic'
            )
            .filter(col("after").isNotNull()))
        
        # print("Not Null data")
        # df_notNull_data.show()
        
        if not df_notNull_data.rdd.isEmpty():
            schema = get_schema_from_table(df_notNull_data.filter(col("topic") == path))
            df_notNull_data = (df_notNull_data
                    .withColumn('after', from_json(col("after"), schema))
                    .select('kafka_key', 
                            'kafka_offset', 
                            'kafka_ts', 
                            'before',
                            'after.*'
                    ))
            # Write to Delta Lake
            if DeltaTable.isDeltaTable(spark, delta_paths):
                
                (DeltaTable
                .forPath(spark, delta_paths)
                .alias("now")
                .merge(df_notNull_data.alias("pre"), "pre.kafka_key = now.kafka_key")
                .whenMatchedUpdateAll()
                .whenNotMatchedInsertAll()
                .execute())
            else:
                (df_notNull_data
                .write
                .format("delta")
                .mode("append")
                .save(delta_paths))
                
            print("Save successfully")
        if not df_null_data.rdd.isEmpty():
            
            if DeltaTable.isDeltaTable(spark, delta_paths):
                delta_table = DeltaTable.forPath(spark, delta_paths)

                for row in df_null_data.collect():
                    kafka_key = row["kafka_key"]
                    delta_table.delete(col("kafka_key") == kafka_key)
            print("Save successfully")


if __name__ == "__main__":
    topic = "tlcn.tlcn.*"
    df = read_stream_from_kafka_topic(topic)

    df.writeStream \
    .foreachBatch(write_to_delta) \
    .outputMode("update") \
    .start() \
    .awaitTermination()


Data input
+------------+--------------------+---------+------+--------------------+-----------------+
|kafka_offset|            kafka_ts|kafka_key|before|               after|            topic|
+------------+--------------------+---------+------+--------------------+-----------------+
|           0|2024-11-05 04:18:...| {"id":1}|  NULL|{"id":1,"name":"T...|tlcn.tlcn.teacher|
|           0|2024-11-05 04:18:...| {"id":1}|  NULL|{"id":1,"name":"D...|tlcn.tlcn.student|
|           0|2024-11-05 04:18:...| {"id":1}|  NULL|{"id":1,"name":"m...|tlcn.tlcn.subject|
|           1|2024-11-05 04:18:...| {"id":2}|  NULL|{"id":2,"name":"H...|tlcn.tlcn.subject|
|           2|2024-11-05 04:18:...| {"id":3}|  NULL|{"id":3,"name":"H...|tlcn.tlcn.subject|
|           3|2024-11-05 04:18:...| {"id":4}|  NULL|{"id":4,"name":"L...|tlcn.tlcn.subject|
|           4|2024-11-05 04:18:...| {"id":5}|  NULL|{"id":5,"name":"v...|tlcn.tlcn.subject|
|           5|2024-11-05 04:18:...| {"id":6}|  NULL|{"id":6,"name":"s