In [1]:
import os

SCALA_VERSION = '2.12'
SPARK_VERSION = '3.1.3'
# Download Kafka Jar file, this for readStream.format("kafka"), "kafka" is a driver
# kafka driver code is part of Maven Jar file
# https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.1.3
# pyspark-shell shall download the jar file behind..
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION} pyspark-shell'

In [2]:
# connect kafka with spark with simple word count example
# run on a terminal after starting kafka
#     kafka-topics  --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
#     kafka-console-producer --bootstrap-server localhost:9092 --topic test  

In [2]:
import findspark
findspark.init()

In [3]:
import pyspark


from pyspark.sql import SparkSession
# spark groupBy has default setting for spark.sql.shuffle.partitions as 200
# we set to  4, should NOT be done in production 
spark = SparkSession.builder.master("local[1]")\
                            .config('spark.sql.shuffle.partitions', 4)\
                            .appName("SparkStreamingKafkaBasic").getOrCreate()

22/03/12 03:17:21 WARN Utils: Your hostname, ubuntu-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.80.128 instead (on interface ens33)
22/03/12 03:17:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark-3.1.3-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-220b936e-d61d-4ae2-b15d-b28cf38bf693;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.3 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 616ms :: artifacts dl 10ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;2.6.2 from central

In [4]:
# read from kafka, here spark is consumer for kafka topic called test
# spark streaming works as dataframe/sql
# group.id is consumer group id
# subcribe is kafka topic
# "kafka" driver is not available by default with spark, we need to download it, we did on cell 1
kafkaDf = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("subscribe", "test")\
  .option("group.id", "wordcount-group")\
  .load()

In [5]:
# key and value are binary type, we need to CAST To STRING type
# TimestampType values
# CreateTime: Timestamp relates to message creation time as set by a Kafka client/producer
# LogAppend
kafkaDf.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
# Kafka values, key are in binary format
# we need to type case to STRING
# we pick only value, ignore other column
linesDf = kafkaDf.selectExpr("CAST(value AS STRING)")
linesDf.printSchema() # we get only value as string

root
 |-- value: string (nullable = true)



In [7]:
import pyspark.sql.functions as F
# split line into word list
# flatten word list into individual element as output, similar to flatMap
wordsDf = linesDf.select(F.explode(F.split(linesDf.value," ")).alias("word") )
wordCountsDf = wordsDf.groupBy("word").count()

In [10]:
echoOnconsole = wordCountsDf\
                .writeStream\
                .outputMode("complete")\
                .format("console")\
                .start() # start the query. spark will subscribe for data

22/03/12 03:19:13 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-7be2c071-00c3-4f3b-b46a-6ec3a1ecd7cc. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


-------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|word|count|
+----+-----+
+----+-----+



In [11]:
# now publish the word count result (word, count columns) to kafka topic "word-counts", publish as json format
# {"word": "kafka", "count": 8}

# F is alias for all functions, we can access col by F.col 
import pyspark.sql.functions as F

# convert all the columns into json
# * represent all columns ie word, count, 
# struct create a structure around word, count columns
# to json convert structure to column
# value is Kafka value part of message
wordCountsToKafkaDf = wordCountsDf\
                    .selectExpr("to_json(struct(*)) as value")

wordCountsToKafkaDf.printSchema()

root
 |-- value: string (nullable = true)



In [13]:
# checkpointLocation is for storing local state, for system restart, system failure in between
# ensure to run kafka console consumer for topic word-count, commands are present in top of file
wordCountsToKafkaDf.writeStream.format("kafka")\
                    .outputMode("complete")\
                     .option("kafka.bootstrap.servers", "localhost:9092")\
                    .option("topic", "word-counts")\
                    .option("checkpointLocation", "file:///tmp/spark")\
                    .start()
                    

22/03/12 03:24:52 WARN StreamingQueryManager: Stopping existing streaming query [id=87df4503-54eb-4954-8aac-9cbf7b4ae8c0, runId=9606aa0f-e89e-40cc-a060-0d435eb09627], as a new run is being started.


<pyspark.sql.streaming.StreamingQuery at 0x7f805c57c990>

22/03/12 03:25:06 WARN HDFSBackedStateStoreProvider: The state for version 14 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
22/03/12 03:25:06 WARN HDFSBackedStateStoreProvider: The state for version 14 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.


-------------------------------------------
Batch: 9
-------------------------------------------
+--------+-----+
|    word|count|
+--------+-----+
|  Friday|    1|
|      eh|    1|
|    will|    1|
|   cause|    1|
| working|    1|
|     its|    2|
|      to|    1|
|    part|    1|
|      be|    1|
|starting|    1|
|somewhat|    1|
|  almost|    1|
|      is|    1|
|     and|    1|
|    time|    1|
|   about|    1|
|    this|    1|
|     hmm|    1|
|       i|    1|
|     you|    1|
+--------+-----+
only showing top 20 rows



22/03/12 03:25:06 WARN HDFSBackedStateStoreProvider: The state for version 14 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
22/03/12 03:25:06 WARN HDFSBackedStateStoreProvider: The state for version 14 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
                                                                                

-------------------------------------------
Batch: 10
-------------------------------------------
+--------+-----+
|    word|count|
+--------+-----+
|  Friday|    1|
|      eh|    1|
|    will|    1|
|   cause|    1|
| working|    1|
|     its|    2|
|      to|    1|
|    part|    1|
|      be|    1|
|starting|    1|
|somewhat|    1|
|  almost|    1|
|      is|    1|
|     and|    1|
|    time|    1|
|   about|    2|
|     how|    1|
|    this|    1|
|     hmm|    1|
|       i|    1|
+--------+-----+
only showing top 20 rows



                                                                                

-------------------------------------------
Batch: 11
-------------------------------------------
+--------+-----+
|    word|count|
+--------+-----+
|  Friday|    1|
|      eh|    1|
|    will|    1|
|   cause|    1|
| working|    1|
|     its|    2|
|      to|    1|
|    part|    1|
|      be|    1|
|starting|    1|
|somewhat|    1|
|      is|    1|
|    time|    1|
|   about|    2|
|     you|    1|
|  almost|    1|
|     and|    1|
|     how|    1|
|    this|    1|
|     hmm|    1|
+--------+-----+
only showing top 20 rows



                                                                                

-------------------------------------------
Batch: 12
-------------------------------------------
+--------+-----+
|    word|count|
+--------+-----+
|  Friday|    1|
|      eh|    1|
|    will|    1|
|   cause|    1|
| working|    1|
|     its|    2|
|      to|    1|
|    part|    1|
|      be|    1|
|starting|    1|
|somewhat|    1|
|      is|    1|
|    time|    1|
|   about|    2|
|      in|    1|
|     you|    1|
|  almost|    1|
|     and|    1|
|     how|    1|
|    this|    1|
+--------+-----+
only showing top 20 rows



22/03/13 00:26:21 WARN NettyRpcEnv: Ignored message: HeartbeatResponse(false)
22/03/13 00:26:21 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from 192.168.80.128:34975 in 10000 milliseconds. This timeout is controlled by spark.executor.heartbeatInterval
	at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at scala.util.Failure.recover(Try.scala:234)
	at scala.concurrent.Future.$anonfun$recover$1(Future.scala:395)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concu