In [1]:
#%%capture
#!wget "https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.5.0/spark-sql-kafka-0-10_2.12-3.5.0.jar"
#!wget "https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.12/3.5.0/spark-streaming-kafka-0-10_2.12-3.5.0.jar"
#!wget "https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.5.0/spark-avro_2.12-3.5.0.jar"

In [2]:
import os
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-avro_2.12:3.5.0 pyspark-shell' 

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as func

In [4]:
#!pip install confluent_kafka

In [5]:
from confluent_kafka.schema_registry import SchemaRegistryClient
from pyspark.sql.avro.functions import from_avro, to_avro
spark = SparkSession.builder.appName("readFromKafka").master("local").getOrCreate()

In [6]:
kafka_url = "broker:29092"
kafka_topic = "quickstart-jdbc-test"
schema_registry_url = "http://schema-registry:8081"
schema_registry_subject = f"{kafka_topic}-value"

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_url) \
  .option("subscribe", kafka_topic) \
  .option("startingOffsets", "earliest") \
  .load() 
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [7]:
def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject):
    sr = SchemaRegistryClient({'url': schema_registry_url})
    latest_version = sr.get_latest_version(schema_registry_subject)

    return sr, latest_version

In [8]:
wikimedia_df = df.withColumn("magicByte", func.expr("substring(value, 1, 1)"))
wikimedia_df = wikimedia_df.withColumn("valueSchemaId", func.expr("substring(value, 2, 4)"))
wikimedia_df = wikimedia_df.withColumn("fixedValue", func.expr("substring(value, 6, length(value)-5)"))

# creating a new df with magicBytes, valueSchemaId & fixedValue
wikimedia_value_df = wikimedia_df.select("magicByte", "valueSchemaId", "fixedValue")
wikimedia_value_df.printSchema()

root
 |-- magicByte: binary (nullable = true)
 |-- valueSchemaId: binary (nullable = true)
 |-- fixedValue: binary (nullable = true)



In [10]:
# get schema using subject name
_, latest_version_wikimedia = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject)

# deserialize data 
fromAvroOptions = {"mode":"PERMISSIVE"}
decoded_output = wikimedia_value_df.select(from_avro(func.col("fixedValue"), latest_version_wikimedia.schema.schema_str, fromAvroOptions).alias("wikimedia")
)
wikimedia_value_df = decoded_output.select("wikimedia.*")
wikimedia_value_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- department: string (nullable = true)
 |-- modified: timestamp (nullable = true)



In [None]:
query = wikimedia_value_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

In [None]:
words = df.select(
   explode(
       split(df.value, " ")
   ).alias("word"),
   col("timestamp")
)
wordCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy("word", "timestamp") \
    .count()

In [None]:
wordCounts.printSchema()

In [None]:
#!pyspark --version

In [None]:
'''query = df \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "192.168.1.107:8097") \
    .option("topic", "output") \
    .option("checkpointLocation", "checkpoint") \
    .start() 
query.awaitTermination()'''

In [None]:
##query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
query = wordCounts.selectExpr("CAST(word AS STRING) AS key", "CAST(count AS STRING)  AS value") \
    .writeStream \
    .trigger(processingTime='1 seconds') \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "192.168.1.107:8097,192.168.1.107:8098,192.168.1.107:8099") \
    .option("topic", "output") \
    .option("checkpointLocation", "checkpoint") \
    .start() \
    .awaitTermination()
    #.awaitTermination()


#query.awaitTermination()

In [None]:
query = wordCounts.selectExpr("CAST(word AS STRING) AS key", "CAST(count AS STRING)  AS value") \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()