# Structured Streaming with Kafka Lab

## Prerequisites
* Web browser: **Chrome**
* Familiarity with Kafka
* A cluster configured with **8 cores** and **DBR 6.2**
* External Services
  - Familiarity with Kafka is helpful!

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Classroom-Setup

For each lesson to execute correctly, please make sure to run the **`Classroom-Setup`** cell at the<br/>
start of each lesson (see the next cell) and the **`Classroom-Cleanup`** cell at the end.

In [None]:
%run "../Includes/Classroom-Setup"

Define the name of the stream we are to use later in this lesson:

In [None]:
myStreamName = "lab04a_ps"

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png">Use Kafka to read a stream</h2>

In this example, we are looking at a series of `ERROR`, `WARNING` and `INFO` log messages that are coming in via our Kafka server.

We want to analyze how many log messages are coming from each IP address?

Create `initialDF` with the following Kafka parameters:

1. `format` is `kafka`
2. `kafka.bootstrap.server` (pick the server closest to you)
  * is `server1.databricks.training:9092` US (Oregon)
  * or `server2.databricks.training:9092` Singapore
3. `subscribe` is `logdata`

When you are done, run the TEST cell that follows to verify your results.

In [None]:
# TODO
spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

kafkaServer = "server1.databricks.training:9092" # Specify "kafka" bootstrap server

# Create our initial DataFrame
initialDF = (spark.readStream
 .format("kafka")              # Specify "kafka" as the type of the stream
 .option("kafka.bootstrap.servers", kafkaServer)           # Set the location of the kafka server
 .option("subscribe", "logdata")              # Indicate which topics to listen to
 .option("startingOffets", "earliest")              # Rewind stream to beginning when we restart notebook
 .option("maxOffsetsPerTrigger", 1000)              # Throttle Kafka's processing of the streams
 .load()              # Load the input data stream in as a DataFrame
)

In [None]:
# TEST - Run this cell to test your solution.
initSchemaStr = str(initialDF.schema)

dbTest("SS-04-key",       True, "(key,BinaryType,true)" in initSchemaStr)
dbTest("SS-04-value",     True, "(value,BinaryType,true)" in initSchemaStr)
dbTest("SS-04-topic",     True, "(topic,StringType,true)" in initSchemaStr)
dbTest("SS-04-partition", True, "(partition,IntegerType,true)" in initSchemaStr)
dbTest("SS-04-offset",    True, "(offset,LongType,true)" in initSchemaStr)
dbTest("SS-04-timestamp", True, "(timestamp,TimestampType,true)" in initSchemaStr)
dbTest("SS-04-timestampType", True, "(timestampType,IntegerType,true)" in initSchemaStr)

print("Tests passed!")

In [None]:
display(initialDF, streamName = myStreamName)

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png">Do Some ETL Processing</h2>

Perform the following ETL steps:

1. Cast `value` column to STRING
2. `ts_string` is derived from `value` at positions 14 to 24,
3. `epoc` is derived from `unix_timestamp` of `ts_string` using format "yyyy/MM/dd HH:mm:ss.SSS"
4. `capturedAt` is derived from casting `epoc` to `timestamp` format
5. `logData` is created by applying `regexp_extract` on `value`.. use this string `"""^.*\]\s+(.*)$"""`

When you are done, run the TEST cell that follows to verify your results.

In [None]:
# TODO
from pyspark.sql.functions import *

cleanDF = (initialDF
 .withColumn("value", col("value").cast("String"))  # Select the "value" column, cast "value" column to STRING
 .withColumn("ts_string", col("value").substr(14,24))  # Select the "value" column, pull substring(14, 24) from it and rename to "ts_string"
 .withColumn("epoc", unix_timestamp("ts_string", "yyyy/MM/dd HH:mm:ss.SSS"))  # Select the "ts_string" column, apply unix_timestamp to it and rename to "epoc"
 .withColumn("capturedAt", col("epoc").cast("timestamp"))  # Select the "epoc" column and cast to a timestamp and rename it to "capturedAt"
 .withColumn("logData", regexp_extract("value","""^.*\]\s+(.*)$""",1))  # Select the "logData" column and apply the regexp `"""^.*\]\s+(.*)$"""`
)

In [None]:
# TEST - Run this cell to test your solution.
schemaStr = str(cleanDF.schema)

dbTest("SS-04-schema-value",     True, "(value,StringType,true)" in schemaStr)
dbTest("SS-04-schema-ts_string",  True, "(ts_string,StringType,true)" in schemaStr)
dbTest("SS-04-schema-epoc",   True, "(epoc,LongType,true)" in schemaStr)
dbTest("SS-04-schema-capturedAt", True, "(capturedAt,TimestampType,true)" in schemaStr)
dbTest("SS-04-schema-logData",  True, "(logData,StringType,true)" in schemaStr)

print("Tests passed!")

In [None]:
display(cleanDF, streamName = myStreamName)

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png">Classify and Count IP Addresses Over a 10s Window</h2>

To solve this problem, you need to:

1. Parse the first part of an IP address from the column `logData` with the `regexp_extract()` function
  * You will need a regular expression which we have already provided below as `IP_REG_EX`
2. Filter out the records that don't contain IP addresses
3. Form another column called `ipClass` that classifies IP addresses based on the first part of an IP address 
  * 1 to 126: "Class A"
  * 127: "Loopback"
  * 128 to 191: "Class B"
  * 192 to 223: "Class C"
  * 224 to 239: "Class D"
  * 240 to 256: "Class E"
  * anything else is invalid
4. Perform an aggregation over a window of time, grouping by the `capturedAt` window and `ipClass`
  * For this lab, use a 10-second window
5. Count the number of IP values that belong to a specific `ipClass`
6. Sort by `ipClass`

In [None]:
# TODO
from pyspark.sql.functions import col,length, window, when

#This is the regular expression pattern that we will use 
IP_REG_EX = """^.*\s+(\d{1,3})\.\d{1,3}\.\d{1,3}\.\d{1,3}.*$"""

ipDF = (cleanDF
 .withColumn("ip", regexp_extract("logData", IP_REG_EX, 1))                                # apply regexp_extract on IP_REG_EX with value of 1 to "logData" and rename it "ip"
 .filter(length(col("ip"))>0)                                            # keep only "ip" that have non-zero length
 .withColumn("ipClass" 
             ,when(col("ip")<127, "Class A")
             .when(col("ip") == 127, "Loopback")
             .when((col("ip")>127) & (col("ip")<192), "Class B")
             .when((col("ip")>191) & (col("ip")<224), "Class C") # figure out class of IP address based on first two octets
             .when((col("ip")>223) & (col("ip")<240), "Class D")
             .when((col("ip")>239) & (col("ip")<257), "Class E")
             .otherwise("invalid")                                       # add rest of when/otherwise clauses
            )
 .groupBy(window("capturedAt", "10 seconds"), col("ipClass"))                  # gather in 10 second windows of "capturedAt", call them "time" and "ipClass" 
 .count()                                            # add up total
 .orderBy(col("ipClass"))   # sort by IP class
            
       )

In [None]:
# TEST - Run this cell to test your solution.
schemaStr = str(ipDF.schema)

dbTest("SS-04-schema-ipClass", True, "(ipClass,StringType,false)" in schemaStr)
dbTest("SS-04-schema-count",   True, "(count,LongType,false)" in schemaStr)
dbTest("SS-04-schema-start",   True, "(start,TimestampType,true)" in schemaStr)
dbTest("SS-04-schema-end",     True, "(end,TimestampType,true)" in schemaStr)

print("Tests passed!")

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png">Display a LIVE Plot</h2>

The `DataFrame` that you pass to `display()` should have three columns:

* `time`: The time window structure
* `ipClass`: The class the first part of the IP address belongs to
* `count`: The number of times that said class of IP address appeared in the window

Under <b>Plot Options</b>, use the following:
* <b>Keys:</b> `ipClass`
* <b>Values:</b> `count`

<b>Display type:</b> is 
* <b>Pie Chart</b>

<img src="https://files.training.databricks.com/images/eLearning/Structured-Streaming/plot-options-pie.png"/>

In [None]:
# TODO
display(ipDF, streamName = myStreamName)

In [None]:
# TEST - Run this cell to test your solution.
dbTest("SS-04-numActiveStreams", True, len(spark.streams.active) > 0)

print("Tests passed!")

Wait until stream is done initializing...

In [None]:
untilStreamIsReady(myStreamName)

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png">Stop streaming jobs</h2>

Before we can conclude, we need to shut down all active streams.

In [None]:
for s in spark.streams.active:   # Iterate over all the active streams
  try:
    s.stop()                     # Stop the stream
    s.awaitTermination()         # Wait for it to stop
    
  except Exception as e:
    # In extream cases, this funtion may throw an ignorable error.
    print("An [ignorable] error has occured while stoping the stream.\n".str(e))    

In [None]:
# TEST - Run this cell to test your solution.
dbTest("SS-04-numActiveStreams", 0, len(spark.streams.active))

print("Tests passed!")

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Classroom-Cleanup<br>

Run the **`Classroom-Cleanup`** cell below to remove any artifacts created by this lesson.

In [None]:
%run "../Includes/Classroom-Cleanup"