In [None]:
#Create the SparkSession
spark = SparkSession
 .builder
 .appName("Spark-Kafka-Integration")
 .master("local")
 .getOrCreate()

In [None]:
#Define the Schema
val mySchema = StructType(Array(
 StructField("id", IntegerType),
 StructField("name", StringType),
 StructField("year", IntegerType),
 StructField("rating", DoubleType),
 StructField("duration", IntegerType)
))

In [None]:
#Create the Streaming Dataframe
streamingDataFrame = spark.readStream.schema(mySchema).csv("path of your directory like home/Desktop/dir/")

In [None]:
# Publish the stream  to Kafka
streamingDataFrame.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value").writeStream
  .format("kafka")
  .option("topic", "topicName")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("checkpointLocation", "path to your local dir")
  .start()

In [None]:
#Subscribe the stream from Kafka
import spark.implicits._
df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topicName")
  .load()

In [None]:
#Convert Stream according to my schema along with TimeStamp
df1 = df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
  .select(from_json($"value", mySchema).as("data"), $"timestamp")
  .select("data.*", "timestamp")

In [None]:
#Here, we just print our data to the console.

df1.writeStream
    .format("console")
    .option("truncate","false")
    .start()
    .awaitTermination()

#### Structured Streaming 
##### https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
##### https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
It was was introduced in Spark 2.0 (and became stable in 2.2) as an extension built on top of Spark SQL. Because of that, it takes advantage of Spark SQL code and memory optimizations. Structured Streaming also gives very powerful abstractions like Dataset/DataFrame APIs as well as SQL. No more dealing with RDD directly! It uses micro-batches to process data streams as a series of small-batch jobs with low latency i.e You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive.

In [None]:
# Creating a Kafka Source for Streaming Queries
# Subscribe to 1 topic ( for multiple topics - .option("subscribe", "topic1,topic2") \)
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \ 
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [None]:
#Creating a Kafka Source for Batch Queries 
#Subscribe to multiple topics, specifying explicit Kafka offsets. Deafult values for startingOffsets & endingOffsets are "earliest" & "latest" respectively.
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

In [None]:
#Creating a Kafka Sink for Streaming Queries
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option.
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
ds = df \
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .start()

In [None]:
#Writing the output of Batch Queries to Kafka
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .save()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .save()

In [None]:
#The above code is written in file spark_test.py and topic is meetup. The above program submitted using below command
#./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 spark_test.py localhost:2181 meetup

#### Spark Streaming - using DStreams
##### https://spark.apache.org/docs/2.1.1/streaming-kafka-0-8-integration.html
Spark Streaming went alpha with Spark 0.7.0. It’s based on the idea of discretized streams or DStreams. Each DStream is represented as a sequence of RDDs, so it’s easy to use if you’re coming from low-level RDD-backed batch workloads. DStreams underwent a lot of improvements over that period of time, but there were still various challenges, primarily because it’s a very low-level API. Since DStream is just a collection of RDDs, it’s typically used for low-level transformations and processing. Adding a DataFrames API on top of that provides very powerful abstractions like SQL, but requires a bit more configuration. And if you have a simple use case, Spark Structured Streaming might be a better solution in general!

In [None]:
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Sparksession


#Creaitng spark Context since we are submitting from the command line. This is basucally a receiver code
'''
spark = SparkSession \
        .builder \
        .appName("MeetupStreaming") \
        .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
sc = SparkContext(appName="MeetupStreaming")
sc.setLogLevel("FATAL")
ssc = StreamingContext(sc, 60)
'''

sc = SparkContext("local[*]",appName="MeetupStreaming")
sc.setLogLevel("WARN")

#Creating the streaming context
ssc = StreamingContext(sc, batchDuration=60)

kafkaStream = KafkaUtils.createStream(ssc, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
'''
brokers='localhost:9092'
topic='tweets'
directKafkaStream = KafkaUtils.createDirectStream(ssc, topics=[topic], kafkaParams={"metadata.broker.list": brokers}) # for receiver-less “direct” approach.
'''

# The above code is written in file spark_test.py and topic is meetup. The above program submitted using below command
# ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 spark_test.py localhost:2181 meetup