In [1]:
#%%capture
#!wget "https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.5.0/spark-sql-kafka-0-10_2.12-3.5.0.jar"
#!wget "https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.12/3.5.0/spark-streaming-kafka-0-10_2.12-3.5.0.jar"
!wget "https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.5.0/spark-avro_2.12-3.5.0.jar"

In [2]:
!pip install confluent_kafka

In [3]:
import os
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-avro_2.12:3.5.0 pyspark-shell' 

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as func

In [5]:
from confluent_kafka.schema_registry import SchemaRegistryClient
from pyspark.sql.avro.functions import from_avro, to_avro
spark = SparkSession.builder.appName("readFromKafka").master("local").getOrCreate()

In [6]:
kafka_url = "broker:29092"
kafka_topic = "quickstart-jdbc-test"
schema_registry_url = "http://schema-registry:8081"
schema_registry_subject = f"{kafka_topic}-value"

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_url) \
  .option("subscribe", kafka_topic) \
  .option("startingOffsets", "earliest") \
  .load()

df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [7]:
df2 = df.withColumn("magicByte", func.expr("substring(value, 1, 1)"))
df2 = df2.withColumn("valueSchemaId", func.expr("substring(value, 2, 4)"))
df2 = df2.withColumn("fixedValue", func.expr("substring(value, 6, length(value)-5)"))

# creating a new df with magicBytes, valueSchemaId & fixedValue
df3 = df2.select("magicByte", "valueSchemaId", "fixedValue")
df3.printSchema()

root
 |-- magicByte: binary (nullable = true)
 |-- valueSchemaId: binary (nullable = true)
 |-- fixedValue: binary (nullable = true)



In [8]:
def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject):
    sr = SchemaRegistryClient({'url': schema_registry_url})
    latest_version = sr.get_latest_version(schema_registry_subject)

    return sr, latest_version

In [9]:

# get schema using subject name
_, latest_version_avro = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject)

# deserialize data 
fromAvroOptions = {"mode":"PERMISSIVE"}
decoded_output = df2.select(
    from_avro(
        func.col("fixedValue"), latest_version_avro.schema.schema_str, fromAvroOptions
    )
    .alias("data")
)
df3 = decoded_output.select("data.*")
df3.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- department: string (nullable = true)
 |-- modified: timestamp (nullable = true)



In [10]:
# To check with console output
'''df3 \
    .writeStream \
    .format("console") \
    .trigger(processingTime='1 second') \
    .outputMode("append") \
    .option("truncate", "false") \
    .start() \
    .awaitTermination()'''

'df3     .writeStream     .format("console")     .trigger(processingTime=\'1 second\')     .outputMode("append")     .option("truncate", "false")     .start()     .awaitTermination()'

In [11]:
# other imports
# UDF function
binary_to_string_udf = func.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
# x->value, y->len
int_to_binary_udf = func.udf(lambda value, byte_size: (value).to_bytes(byte_size, byteorder='big'), BinaryType())

# other constants
kafka_analyzed_topic = "quickstart-clean"
schema_registry_analyzed_data_subject = f"{kafka_analyzed_topic}-value"

_, latest_version_analyzed_data = get_schema_from_schema_registry(schema_registry_url, schema_registry_analyzed_data_subject)

# convert dataframe to binary data
df3 = df3 \
.select(to_avro(func.struct(
    func.col("id"),
    func.col("name"),
    func.col("email"),
    func.col("department"),
    func.col("modified")
), latest_version_analyzed_data.schema.schema_str).alias("value"))

# add magicbyte & schemaId to binary data
magicByteBinary = int_to_binary_udf(func.lit(0), func.lit(1))
schemaIdBinary = int_to_binary_udf(func.lit(latest_version_analyzed_data.schema_id), func.lit(4))
df3 = df3.withColumn("value", func.concat(magicByteBinary, schemaIdBinary, func.col("value")))
df3.printSchema()

root
 |-- value: binary (nullable = true)



In [12]:
# Write to Kafka Sink
df3 \
    .writeStream \
    .format("kafka") \
    .trigger(processingTime='1 second') \
    .outputMode("append") \
    .option("kafka.bootstrap.servers", kafka_url) \
    .option("topic", kafka_analyzed_topic) \
    .option("checkpointLocation", "checkpoint1") \
    .start() \
    .awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 