In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructField, StructType, StringType
import py4j

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook-kafka-struct-stream").\
        master("spark://spark:7077").\
        config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.postgresql:postgresql:42.5.1').\
        getOrCreate()

/usr/local/lib/python3.9/dist-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found


:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6d55f236-d27a-4124-a9ad-ef8bbc1eef9f;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commo

In [None]:
class StructuredStreamingKafkaConnector:
    def __init__(self, spark_session, kafka_servers, subscribe_topic, decode_schema, fun, checkpoint_directory = None):
        self.__spark = spark_session
        self.__kafka_servers = kafka_servers
        self.__subscribe_topic = subscribe_topic
        self.__decode_schema = decode_schema
        self.__process_stream_function = fun
        self.__checkpoint_directory = checkpoint_directory
        self.__streaming_df = None
        
    def read_stream(self):
        if self.__streaming_df is None:
            self.__streaming_df = self.__spark \
                                    .readStream \
                                    .format("kafka") \
                                    .option("kafka.bootstrap.servers", self.__kafka_servers) \
                                    .option("subscribe", self.__subscribe_topic) \
                                    .load()
            self.__streaming_df.printSchema()
    
    def log_stream(self):
        query = self.__streaming_df \
                    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp") \
                    .writeStream \
                    .format("console") \
                    .option("checkpointLocation", "/fs/log_checkpoints") \
                    .start()
        query.awaitTermination()
        
    def write_stream(self):
        query = self.__streaming_df \
                    .selectExpr("CAST(value AS STRING)") \
                    .withColumn("value", from_json("value", self.__decode_schema)) \
                    .select(col('value.*')) \
                    .writeStream \
                    .foreachBatch(self.__process_stream_function)
        
        if not (self.__checkpoint_directory is None):
            query = query.option("checkpointLocation", self.__checkpoint_directory)
        
        query = query.start()
        query.awaitTermination()
            

In [None]:
schema = StructType(
        [
                StructField("val1", StringType()),
                StructField("val2", StringType())
        ]
)

mode = "append"
url = "jdbc:postgresql://test-postgres-db-%d:5432/db"
properties = {"user": "auser","password": "1234","driver": "org.postgresql.Driver"}

import time

def test_fun(x,y):
    x.persist()
    try:
        x.write.jdbc(url=url%(1), table="tbl", mode=mode, properties=properties)
        print("Wrote data to test-postgres-db-1")
    except py4j.protocol.Py4JJavaError:
        print("Exception will be handled....")
    
    x.write.jdbc(url=url%(1), table="tbl", mode=mode, properties=properties)
    
    print(y)
    x.show()
    time.sleep(5)
    
    try:
        x.write.jdbc(url=url%(2), table="tbl", mode=mode, properties=properties)
        print("Wrote data to test-postgres-db-2")
    except py4j.protocol.Py4JJavaError:
        print("Exception will be handled....")
        
    time.sleep(5)
    
    x.unpersist()

In [None]:
ssKafka = StructuredStreamingKafkaConnector(spark, "kafka:9092", "test", schema, lambda x, y: test_fun(x,y), "/fs/checkpoints")

In [None]:
ssKafka.read_stream()

In [None]:
#ssKafka.log_stream()

In [None]:
ssKafka.write_stream()