In [None]:
properties = {}

properties['spark.app.name'] = "1A-kafka-kafka-stream"

properties['spark.sql.streaming.checkpointLocation'] = f"hdfs://172.21.1.190:8020/var/workspace/spark/app/checkpoint/{properties['spark.app.name']}"

properties['spark.stream.read.0A.kafka.bootstrap.servers'] = "172.21.1.110:39092"
properties['spark.stream.read.0A.subscribe']               = "kc_tradingexpert_fixlogtracer_ullink_ulbridge_bridge_v1"
properties['spark.stream.read.0A.startingOffsets']         = "earliest"
properties['spark.stream.read.0A.maxOffsetsPerTrigger']    = 10000
properties['spark.stream.read.0A.kafka.group.id']          = f"{properties['spark.app.name']}"

properties['spark.stream.write.1A.kafka.bootstrap.servers'] = "172.21.1.110:39092"
properties['spark.stream.write.1A.topic']                   = f"{properties['spark.app.name']}"


properties['spark.driver.memory']                        = "512m"

properties

In [None]:
from kc.annotation.connectivity import ConnectivitySpark

In [None]:
@ConnectivitySpark(configuration={
    "spark.app.name": properties['spark.app.name'],
    "spark.sql.streaming.checkpointLocation": properties['spark.sql.streaming.checkpointLocation'],
    "spark.driver.memory": properties['spark.driver.memory']
})
class Application:
    pass

In [None]:
app = Application()

In [None]:
###################################################################################################################

In [None]:
df_source = app.spark.readStream.format("kafka") \
                 .option("kafka.bootstrap.servers", properties["spark.stream.read.0A.kafka.bootstrap.servers"]) \
                 .option("subscribe", properties["spark.stream.read.0A.subscribe"]) \
                 .option("startingOffsets", properties["spark.stream.read.0A.startingOffsets"]) \
                 .option("maxOffsetsPerTrigger", properties["spark.stream.read.0A.maxOffsetsPerTrigger"]) \
                 .option("kafka.group.id", properties["spark.stream.read.0A.kafka.group.id"]) \
                 .load() \

df_source.createOrReplaceTempView("df_source")

In [None]:
###################################################################################################################

In [None]:
df0 = app.spark.sql("""\
----------------------
SELECT  
      CAST(value AS STRING) AS json
    , timestamp             AS _KAFKA_SOURCE_MESSAGE_TIMESTAMP

FROM df_source
----------------------
""")

df0.createOrReplaceTempView("df0")

In [None]:
###################################################################################################################

In [None]:
df1 = app.spark.sql("""\
----------------------
SELECT
    from_json(json, 'struct< `metadata`   : struct< `namespace` : string 
                                                   , `name`      : string
                                                   , `name0`     : string 
                                                   , `size0`     : int
                                                   , `message0`  : struct< `id`           : string
                                                                         , `type`         : string
                                                                         , `timestamp`    : string
                                                                         , `date`         : string
                                                                         , `time`         : string
                                                                         , `process_name` : string
                                                                         , `loglevel`     : string
                                                                         , `process_id`   : string
                                                                         >
                                                   >
                            , `spec`       : struct< `fix`                  : string
                                                   , `source_process_name`  : string
                                                   , `sink_process_name`    : string
                                                   , `source_CLORDID`       : string
                                                   , `sink_CLORDID`         : string
                                                   , `action`               : string
                            >
                            , `@timestamp` : timestamp
                            >
                     ')               AS dict
    , _KAFKA_SOURCE_MESSAGE_TIMESTAMP

FROM df0
----------------------
""")

df1.createOrReplaceTempView("df1")

In [None]:
###################################################################################################################

In [None]:
df2 = app.spark.sql("""\
----------------------
SELECT
      dict.*
    , _KAFKA_SOURCE_MESSAGE_TIMESTAMP

FROM df1
----------------------
""")

df2.createOrReplaceTempView("df2")

In [None]:
###################################################################################################################

In [None]:
import pyspark.sql

def __DumpBatchDF_SinkGroup_2(batch_DF: pyspark.sql.DataFrame, batch_id: int) -> None:
    batch_DF.persist()
    batch_DF.selectExpr("to_json(struct(*)) AS value") \
            .write.format("kafka") \
            .option("kafka.bootstrap.servers", properties['spark.stream.write.1A.kafka.bootstrap.servers']) \
            .option("topic", properties['spark.stream.write.1A.topic']) \
            .mode("append") \
            .save()
    batch_DF.unpersist()

In [None]:
###################################################################################################################

In [None]:
pipeline = df2.writeStream \
                  .foreachBatch(__DumpBatchDF_SinkGroup_2) \
                  .outputMode("append") \
                  .trigger(processingTime='1 second') \
                  .start()

In [None]:
###################################################################################################################