In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
import json
import pathlib
from pyspark.sql.functions import from_json, col, row_number, date_from_unix_date
from pyspark.sql import Window
from delta import DeltaTable


class SimpleParser():
    
    def __init__(self):
        
        self.spark =  (
            SparkSession
            .builder
            .master("local[*]")
            .appName('spark_demo')
            .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,io.delta:delta-spark_2.12:3.1.0')
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
            .config("spark.sql.session.timeZone", "UTC")
            .config("spark.sql.shuffle.partitions", "1")
            .config("spark.databricks.delta.snapshotPartitions", "2")
            .config("spark.ui.showConsoleProgress", "false")
            .config("spark.ui.enabled", "false")
            .config("spark.ui.dagGraph.retainedRootRDDs", "1")
            .config("spark.ui.retainedJobs", "1")
            .config("spark.ui.retainedStages", "1")
            .config("spark.ui.retainedTasks", "1")
            .config("spark.sql.ui.retainedExecutions", "1")
            .config("spark.worker.ui.retainedExecutors", "1")
            .config("spark.worker.ui.retainedDrivers", "1")
            .config("spark.driver.memory", "4g")
            .config("spark.sql.autoBroadcastJoinThreshold", "-1")
            .config("spark.driver.extraJavaOptions", "-Ddelta.log.cacheSize=3")
            .config(
                "spark.driver.extraJavaOptions",
                "-XX:+CMSClassUnloadingEnabled -XX:+UseCompressedOops",
            )
            .getOrCreate()
        )
        self.spark.sparkContext.setLogLevel("ERROR")
        
        self.table_path = "./spark_data_consumer/data/cdctable"
        self.checkpoint_path = './spark_data_consumer/checkpoints/cdctable'
        
    def get_schema(self):
        schema_path = pathlib.Path("spark_data_consumer/app/schema/cdctable.json").read_text()
        schema = StructType.fromJson(json.loads(schema_path))        
        return schema
    
    def read_data(self): 
        schema = self.get_schema()      
        
        _df = (
            self.spark
            .read
            .format("kafka")
            .option("kafka.bootstrap.servers", "kafka1:9092")
            .option("subscribe", "debezium.public.cdctable")
            .load()
            .select(
                from_json(col("value").cast("string"), schema).alias("parsed_value")              
            )
            .select("parsed_value.*")
        )

        return _df
    
    def read_streaming_data(self): 
        
        schema = self.get_schema()      
        
        _df = (
            self.spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "kafka1:9092")
            .option("subscribe", "debezium.public.cdctable")
            .option("startingOffsets", "earliest")
            .option("auto.offset.reset", "earliest")
            .load()
            .select(
                from_json(col("value").cast("string"), schema).alias("parsed_value")              
            )
            .select("parsed_value.*")
        )

        return _df
    
    def _processing_deduplicate(self, df):

        df_upsert = df.select("after.*", "source.txId").where("op in ('c', 'u')")
        df_delete = df.select("before.*", "source.txId").where("op in ('d')")

        df_combined = df_upsert.unionAll(df_delete)

        window_specs = Window.partitionBy('id').orderBy(col('txId').desc())

        df_deduplicated = df_combined.withColumn(
            "rn",
            row_number().over(window_specs)
        ).where("rn = 1").drop("rn", "txId")
        
        return df_deduplicated
    
    def _processing_upsert(self, df):
        table_exists = DeltaTable.isDeltaTable(sp.spark, self.table_path)

        if table_exists:
            dt = DeltaTable.forPath(sp.spark, self.table_path)            
            (
                dt.alias("t")
                .merge(df.alias("s"), "s.id = t.id")
                .whenMatchedUpdateAll()
                .whenNotMatchedInsertAll()
                # fixme: add deletion case
                .execute()
            )            
        else:
            df.repartition(1).write.format("delta").save(self.table_path)
            
        df.show(truncate=False)
                
    def _processing_transform(self, df):
        return df.withColumn("date", date_from_unix_date("date"))
    
    def foreach_batch_function(self, df, epoch_id):
        # Transform and write batchDF
        df.cache()
        print("Rows to process:", df.count())
        
        _d = self._processing_deduplicate(df)  
        _t = self._processing_transform(_d)      
        self._processing_upsert(_t)
        
        df.unpersist()
    
    def start_streaming(self):
        
        
        readStream = self.read_streaming_data()
        
        command = (
            readStream
            .writeStream
            .option("checkpointLocation", self.checkpoint_path)
            .foreachBatch(self.foreach_batch_function)
            .start()            
        )        
        
        command.awaitTermination()       
        

In [2]:
sp = SimpleParser()
#sp.start_streaming()

your 131072x1 screen size is bogus. expect trouble
24/02/29 16:21:37 WARN Utils: Your hostname, LT479333 resolves to a loopback address: 127.0.1.1; using 192.168.68.60 instead (on interface eth1)
24/02/29 16:21:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/avolok/repos/temp/k/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/avolok/.ivy2/cache
The jars for the packages stored in: /home/avolok/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7df20099-9165-4f14-b2d4-2cdd50423a0e;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
	found io.delt

In [5]:
#sp = SimpleParser()
df = sp.read_data()
dedup = sp._processing_deduplicate(df)

In [13]:

dedup.withColumn("date", date_from_unix_date("date")).show()

+----+------+----------+
|  id|  name|      date|
+----+------+----------+
| 226|NHWU3A|2022-10-31|
|1241|USY0B9|2019-10-29|
|1776|R27V31|2020-02-07|
|1892|9LBLP4|2019-03-06|
|1977|R11Q2P|2018-09-10|
|2258|LBMJQ7|2019-05-03|
|2381|GR0ZYX|2022-04-05|
|4009|LXU6A7|2022-03-11|
|4202|L2U3BA|2020-03-04|
|5264|XHJRD3|2023-06-10|
|5546|3GZH3S|2020-10-20|
|6164|SJWFJW|2021-08-29|
|6312|5TAPY2|2021-10-15|
|6481|2GQPGR|2023-03-06|
|7425|ICEFMO|2021-01-02|
|7442|Q1FHQ8|2019-12-03|
|7506|WMR1IH|2021-10-13|
|7630|R4Z4CX|2023-12-06|
|7794|BM4ZZX|2021-06-07|
|8077|FG64OV|2020-08-30|
+----+------+----------+
only showing top 20 rows



In [7]:
dedup.select("*").show()

+----+------+-----+
|  id|  name| date|
+----+------+-----+
|2258|LBMJQ7|18019|
|5264|XHJRD3|19518|
|7425|ICEFMO|18629|
|7442|Q1FHQ8|18233|
|7794|BM4ZZX|18785|
|9488|8O6BKK|17814|
+----+------+-----+

