In [None]:
endpoint = "<insert-endpoint-here>"
username = "<insert-username-here>"
password = "<insert-password-here>"

In [None]:
import sys
import pyspark.sql.functions as F
from pyspark.sql import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType

# Instantiate
spark = SparkSession.builder.appName("kafka_spark_poc").getOrCreate()

# Reduce logging verbosity
spark.sparkContext.setLogLevel("WARN")
jaas_config = f'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="{username}" password="{password}";'

## Kafka configs
kafka_source_config = {
    "kafka.sasl.jaas.config": jaas_config,
    "kafka.bootstrap.servers" : f"{endpoint}",
    "kafka.sasl.mechanism": "SCRAM-SHA-256",
    "kafka.security.protocol" : "SASL_SSL",
    "subscribe": "source",
    "startingOffsets": "latest",
    "failOnDataLoss": "false"
}
kafka_sink_config = {
    "kafka.sasl.jaas.config": jaas_config,
    "kafka.bootstrap.servers": f"{endpoint}",
    "kafka.sasl.mechanism": "SCRAM-SHA-256",
    "kafka.security.protocol": "SASL_SSL",
    "topic": "sink",
    "checkpointLocation" : "./checkpoint.txt"
}

## Source Schema
df_schema = StructType([
    StructField("name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("age", IntegerType(), True),
    # StructField("timestamp", TimestampType(), True)
])

In [None]:
## Read Stream
streaming_df = spark\
    .readStream\
    .format("kafka")\
    .options(**kafka_source_config)\
    .load()

## Prepare Sink DF
sink_df = streaming_df.selectExpr("CAST(value AS STRING) as value")\
    .select(
        # Convert the value to a string
        F.from_json(F.col("value").cast("string"), df_schema).alias("value")
    )\
    .select("value.*")\
    .withColumn("is_minor", F.expr("age < 18"))\
    .select(
        # Convert everything back to a json string
        F.to_json(
            F.struct("name", "address", "age", "is_minor")
        ).alias("value")
    )

## Write Sink DF
write = sink_df.writeStream\
    .format("kafka")\
    .options(**kafka_sink_config)\
    .start()

# Start the streaming application to run until the following happens
# 1. Exception in the running program
# 2. Manual Interruption
write.awaitTermination()