# Streaming from Kafka

<b>Configure magic</b> - this imports the correct java class versions

In [None]:
%%configure -f
{
    "conf": { 
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0"
    }
}


The following command fires up the spark session and context (if not already started) and returns the version number.<br/> 
Use this info to check 2.2.0 version number in the above package name is correct. This only needs to be run if you experience errors in the above magic execution 

In [None]:
sc.version

Do some imports<br/>

In [None]:
import time
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, TimestampType, LongType


Ensures only one copy of the session object<br/>

In [None]:
def getSparkSessionInstance():
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .appName("Structured Streaming ") \
            .master("local[*]") \
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']


Starts a session (if you haven't already or if you've stopped the old one)<br/>
Creates a new context<br/>
Sets the error level to prevent INFO and WARNING line showing<br/>
Creates the streaming context

In [None]:
spark = getSparkSessionInstance()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)

<b>Assign Values</b> - Overwite the kafkaBrokers value with your "main" Azure host

In [None]:
kafkaBrokers = "192.168.0.10:9092"
kafkaTopic = "SensorReadings"
print("Done!")

This sets up the input stream to your Azure cluster and reads off the event stream.<br/>

In [None]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafkaBrokers) \
    .option("subscribe", kafkaTopic) \
    .option("startingOffsets", "earliest") \
    .load()


In a Kafka stream, both the key and value are byte arrays. We cast them as strings.<br/> 
The timestamp is a long integer.

In [None]:
df1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")

Our sensor data is structured as a JSON string within the value field. The structure is defined in the schema below.

In [None]:
value_schema = StructType([
        StructField("sensor",StringType(),True),
        StructField("machine",StringType(),True),
        StructField("units",StringType(),True),
        StructField("time", LongType(), True),
        StructField("value", DoubleType(), True)
])

Here we extract the data using the schema above.<br/>
We also cast the kafka-style timestamp to a unix-style timestamp. This is needed for the windowed streaming later.<br/>
We create a temporary view over the dataframe to use the following SQL command

In [None]:
df2 = df1.withColumn("jsonData", from_json(col("value"), value_schema)) \
        .withColumn("timestamp", (col("timestamp").cast("long")/1000).cast("timestamp")) \
        .select("key", "jsonData.*", "timestamp")

df2.createOrReplaceTempView("sensor_find")


This SQL creates a data frame showing the key (sensor name) and the units.<br/>
This simply give the values more meaning in the scenario output

In [None]:
streamingDataFrame = spark.sql('SELECT timestamp, concat(key, " (", units, ")") AS key, value FROM sensor_find')

Two options for output are given:<br/>
<b>Grouped</b> - Shows the average value per sensor. This option just lists the sensor and average value over all readings.<br>
<b>Windowed</b> - This is probably a more realistic output in a similar real-life scenario but (depending on window values) displays a long list.<br/>
For displaying in a notebook, the shorter list is preferable 

In [None]:
outStream = streamingDataFrame. \
    groupBy("key"). \
    avg('value')

#outStream = streamingDataFrame. \
#    withWatermark("timestamp", "1 minute"). \
#    groupBy(window("timestamp", "10 minutes", "5 minutes"), "key"). \
#    groupBy("key"). \
#    avg('value')


The aggregation process creates a new aggreagated column. We rename it for readability

In [None]:
outStream = outStream.withColumnRenamed("avg(value)", "value")

This cell gathers a batch of records and streams them to the console then wait for the next batch to arrive

In [None]:
query = outStream.writeStream. \
    outputMode('complete'). \
    option("numRows", 1000). \
    option("truncate", "false"). \
    format("console"). \
    start(). \
    awaitTermination()
