# 1. Setup 

1.1. - installing confluent-kafka package

In [0]:
!pip show confluent-kafka

1.2. Topic 

In [0]:
bootstrap_servers = "pkc-4rn2p.canadacentral.azure.confluent.cloud:9092"
topic_name = "New_Topic"



In [0]:
# Databricks notebook source
import pyspark.sql.functions as F
from  pyspark.sql.functions import col, struct, to_json
from pyspark.sql.types import StructField, StructType, StringType, MapType


# 2. Batch Processing

## 2.1. Create Batch Data

In [0]:
dataDictionary = [
        ('James','driver'),
        ('Michael','teacher'),
        ('Robert','engineer'),
        ('Washington','architect'),
        ('Jefferson','CEO')
        ]
df = spark.createDataFrame(data=dataDictionary, schema = ["name","occupation"])
df.printSchema()
df.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- occupation: string (nullable = true)

+----------+----------+
|name      |occupation|
+----------+----------+
|James     |driver    |
|Michael   |teacher   |
|Robert    |engineer  |
|Washington|architect |
|Jefferson |CEO       |
+----------+----------+



In [0]:
# COMMAND ----------

#display input data
df2 = df.selectExpr("name AS key", "to_json(struct(*)) AS value")
display(df2)

key,value
James,"{""name"":""James"",""occupation"":""driver""}"
Michael,"{""name"":""Michael"",""occupation"":""teacher""}"
Robert,"{""name"":""Robert"",""occupation"":""engineer""}"
Washington,"{""name"":""Washington"",""occupation"":""architect""}"
Jefferson,"{""name"":""Jefferson"",""occupation"":""CEO""}"


## 2.2 Write Batch data

In [0]:
# write to topic
(df.selectExpr("name AS key", "to_json(struct(*)) AS value") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", bootstrap_servers) \
  .option("topic", topic_name) \
  .option("kafka.security.protocol","SASL_SSL") \
  .option("kafka.sasl.mechanism", "PLAIN") \
  .option("kafka.sasl.jaas.config", """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="6K7GG3OYQ7RUIEHR" password="Cts0b2WPh8+WTVLypXyidYNBxh9YagMtzrmLX21y4DFVs8Xm6+++Z2G38f1eSTK6";""") \
  .save()
)

## 2.3 read batch data 

In [0]:
# read messages
#read from topic
dfread = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("subscribe", topic_name) \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest")  \
    .option("kafka.security.protocol","SASL_SSL") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config", """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="6K7GG3OYQ7RUIEHR" password="Cts0b2WPh8+WTVLypXyidYNBxh9YagMtzrmLX21y4DFVs8Xm6+++Z2G38f1eSTK6";""") \
    .load()
display(dfread)

key,value,topic,partition,offset,timestamp,timestampType
V2FzaGluZ3Rvbg==,eyJuYW1lIjoiV2FzaGluZ3RvbiIsIm9jY3VwYXRpb24iOiJhcmNoaXRlY3QifQ==,New_Topic,2,0,2024-03-31T07:11:34.701+0000,0
SmVmZmVyc29u,eyJuYW1lIjoiSmVmZmVyc29uIiwib2NjdXBhdGlvbiI6IkNFTyJ9,New_Topic,2,1,2024-03-31T07:11:34.701+0000,0
Um9iZXJ0,eyJuYW1lIjoiUm9iZXJ0Iiwib2NjdXBhdGlvbiI6ImVuZ2luZWVyIn0=,New_Topic,1,0,2024-03-31T07:11:34.701+0000,0
SmFtZXM=,eyJuYW1lIjoiSmFtZXMiLCJvY2N1cGF0aW9uIjoiZHJpdmVyIn0=,New_Topic,1,1,2024-03-31T07:11:34.701+0000,0
TWljaGFlbA==,eyJuYW1lIjoiTWljaGFlbCIsIm9jY3VwYXRpb24iOiJ0ZWFjaGVyIn0=,New_Topic,1,2,2024-03-31T07:11:34.699+0000,0


In [0]:
json_schema = StructType(
    [   StructField("name", StringType(), True),
        StructField("occupation", StringType(), True)
    ]
)

In [0]:
#display data from topic
df3 = dfread.withColumn('value', F.from_json(F.col('value').cast('string'), json_schema))  \
      .select(F.col("value.name"),F.col("value.occupation")) 
display(df3)

name,occupation
Washington,architect
Jefferson,CEO
Robert,engineer
James,driver
Michael,teacher


# 3. Stream

In [0]:
# Databricks notebook source
import pyspark.sql.functions as F
from  pyspark.sql.functions import col, struct, to_json
from pyspark.sql.types import StructField, StructType, StringType, MapType


In [0]:
# COMMAND ----------

#create streaming dataframe
jsonSchema = StructType([ StructField("name", StringType(), True), 
                         StructField("occupation", StringType(), True) ])


3.1. Commented Code - This was used to create streaming json data

In [0]:
# # write json file and save it 
# import json

# # Generate data
# dataDictionary = [
#     ('James', 'driver'),
#     ('Michael', 'teacher'),
#     ('Robert', 'engineer'),
#     ('Washington', 'architect'),
#     ('Jefferson', 'CEO')
# ]

# # Generating more entries (for demonstration purposes)
# for i in range(16):  # 20 total entries including the original 5
#     dataDictionary.append(('Name' + str(i), 'Occupation' + str(i)))

# # Convert data to dictionary
# data = [{"name": name, "occupation": occupation} for name, occupation in dataDictionary]

# # Write data to JSON file
# with open("data.json", "w") as json_file:
#     json.dump(data, json_file, indent=4)

# print("JSON file generated successfully.")



JSON file generated successfully.


## 3.2. Streaming Data to Kafka

In [0]:
# MAGIC %fs head dbfs:/FileStore/databricks/data.json
inputPath = "dbfs:/FileStore/shared_uploads/arham.anwar@mail.mcgill.ca/"
streamingInputDF = (
  spark
    .readStream 
    .option("multiline","true")                      
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)


In [0]:
# COMMAND ----------

display(streamingInputDF.selectExpr("name AS key", "to_json(struct(*)) AS value"))


key,value
James,"{""name"":""James"",""occupation"":""driver""}"
Michael,"{""name"":""Michael"",""occupation"":""teacher""}"
Robert,"{""name"":""Robert"",""occupation"":""engineer""}"
Washington,"{""name"":""Washington"",""occupation"":""architect""}"
Jefferson,"{""name"":""Jefferson"",""occupation"":""CEO""}"
Name0,"{""name"":""Name0"",""occupation"":""Occupation0""}"
Name1,"{""name"":""Name1"",""occupation"":""Occupation1""}"
Name2,"{""name"":""Name2"",""occupation"":""Occupation2""}"
Name3,"{""name"":""Name3"",""occupation"":""Occupation3""}"
Name4,"{""name"":""Name4"",""occupation"":""Occupation4""}"


## Consuming Data from Kafka

In [0]:
#push streaming data into kafka topic
df= (streamingInputDF.selectExpr("name AS key", "to_json(struct(*)) AS value") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", bootstrap_servers) \
  .option("topic", topic_name) \
  .option("kafka.security.protocol","SASL_SSL") \
  .option("kafka.sasl.mechanism", "PLAIN") \
  .option("checkpointLocation", "dbfs:/FileStore/chekpoint/")
  .option("kafka.sasl.jaas.config", """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="6K7GG3OYQ7RUIEHR" password="Cts0b2WPh8+WTVLypXyidYNBxh9YagMtzrmLX21y4DFVs8Xm6+++Z2G38f1eSTK6";""") \
  .start()
)