In [1]:
# Install drivers/jars while notebook start
import os

SCALA_VERSION = '2.12'
SPARK_VERSION = '3.5.3'

# download jar files at runtime, load the jar file

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION} pyspark-shell'

In [1]:
sc # in jupyter, sc is not initialized

''

In [2]:
spark # is not initilaized by default

NameError: name 'spark' is not defined

In [2]:
# Local cluster
import findspark
findspark.init()

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession
# * means paralleism , it will take from number vritual cores in your laptop
spark = SparkSession.builder.master("local[*]")\
                            .config('spark.sql.shuffle.partitions', 4)\
                            .appName("SparkStreamingKafkaInvoiceStream").getOrCreate()

In [6]:
spark

In [5]:
TOPIC = 'quickstart-events'
# you need kafka jars to support format('kafka')
# databricsk already include kafka driver, by spark download does not
kafkaDf = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("subscribe", TOPIC)\
  .load()

# .show/print will not work directily due to stream..
# linesDf.show() # won't work, show is for batch, read once, process once and output once
kafkaDf.printSchema() # works

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
# key is kafka key, in binary format
# value is kafka value, in binary format
# topic string
# parition, integer
# offer long 
# timestamp - longint in ms
# timestampType - Source Time, Record write time

# now convert kafka value which is in bytes to STRING, we ignore the key for now...
# now we pick only value from the stream..
linesDf = kafkaDf.selectExpr("timestamp", "CAST(value AS STRING)")
linesDf.printSchema() # we get only value as string

root
 |-- timestamp: timestamp (nullable = true)
 |-- value: string (nullable = true)



In [7]:
# split the lines into words, then convert the words into individual row using a function called explode
# explode will convert columns/array elements into spark record
import pyspark.sql.functions as F
# input: welcome to spark
# linesDf.value is a column
# split convert to list of words [welcome, to, spark]  record
# convert list of words into individual word/record
# explode, will convert elements into record
#wordsDf = linesDf.select(F.split(linesDf.value," "))
# explode is same as flatMap in rdd, it flatten the list into elements
# it creates records for each elemenet in the input list
# after explode the output would be, column name is shown as col
#        welcome
#        to
#        spark
# wordsDf = linesDf.select(F.explode(F.split(linesDf.value," ")) )

wordsDf = linesDf.select("timestamp", F.explode(F.split(linesDf.value," ")).alias("word") )
#        welcome
#        to
#        spark
# now the same result with col name word


wordsDf.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- word: string (nullable = false)



In [8]:
# generate running word count from stream using WINDOW of 1 MINUTE
# if we send the words, for every minute, the count shall reset
# "word" is a column name
# count is aggregate, kindly, we don't apply window here
# since no window, count from begingging, continue without any time constraint
# and continue without any any time bound, NO WINDOW
import pyspark.sql.functions as F

# non-window, the output is word and count
# wordCountsDf = wordsDf.groupBy("word").count()
# window example, the output schema would differ, start time, end time, word, count
wordCountsDf = wordsDf.groupBy(F.window("timestamp", "1 minute"), "word").count()

# show does not work on streams
# we have to print result in a stream manner.
# .format("console") - console meands stdout
# mode is complete - prints whole data, for a single word, it prints whole results
# word may be apple, but the output will include all other words
# in batch, we use .write.format, one time write
# in stream, we use .writeSteam.format (continuous write)
# we did not use trigger , spark has default trigger 300 ms
# use trigger api to define, this is not Window
# trigger is for micro batch - collect data until trigger time reaches and process data
# to print the data on console..
# read the data send by nc command from linux terminal, print it on Jupyter console
# the output shall be printed on spark compute cluster logs, not on notebook cell
# notice that stream will start as job. and the stream shall be keep running until we intrupt
echoOnconsole = wordCountsDf\
                .writeStream\
                .outputMode("complete")\
                .format("console")\
                .option("checkpointLocation", "file:///c:/tmp/spark/FileStore/tables/tmp/spark-wordcount-output") \
                .start() # start the query. spark will subscribe for data