-sandbox
<img src="https://files.training.databricks.com/images/Apache-Spark-Logo_TM_200px.png" style="float: left: margin: 20px"/>

# Structured Streaming with Kafka Lab

#### In this lesson, I will practice:
* Establish a connection with Kafka to stream data
* Do some ETL jobs and time window function

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Getting Started</h2>

Run the following cell to configure our "classroom."

In [0]:
%run "../Includes/Classroom-Setup"

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise 1: Use Kafka to read a stream</h2>

In this example, we are looking at a series of `ERROR`, `WARNING` and `INFO` log messages that are coming in via our Kafka server.

We want to analyze how many log messages are coming from each IP address?

Create `initialDF` with the following Kafka parameters:

0. `format` is `kafka`
0. `kafka.bootstrap.server` (pick the server closest to you)
  * is `server1.databricks.training:9092` US (Oregon)
  * or `server2.databricks.training:9092` Singapore
0. `subscribe` is `logdata`

When you are done, run the TEST cell that follows to verify your results.

In [0]:
# TODO
spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

kafkaServer = "server2.databricks.training:9092" # Specify "kafka" bootstrap server

# Create our initial DataFrame
initialDF = (spark.readStream
 .format("kafka")                                            # Specify "kafka" as the type of the stream
 .option("kafka.bootstrap.servers", kafkaServer)             # Set the location of the kafka server
 .option("subscribe", "logdata")                             # Indicate which topics to listen to
 .option("startingOffset", "earliest")                       # Rewind stream to beginning when we restart notebook
 .option("maxOffsetPerTrigger", 1000)                        # Throttle Kafka's processing of the streams
 .load()                                                     # Load the input data stream in as a DataFrame
)

display(initialDF, streamName = "initialStream")

key,value,topic,partition,offset,timestamp,timestampType
,eyJtZXNzYWdlIiAiWzIwMjEvMDUvMjggMDg6NTY6NTguNjQzXSAoSU5GTykgRGV2aWNlIDB4MWE0YyBpcyBvbmxpbmUuIiwgInRpbWVzdGFtcCI6IDE2MjIxOTIyMTg2NDN9,logdata,0,23986504,2021-05-28T08:57:49.599+0000,0
,eyJtZXNzYWdlIiAiWzIwMjEvMDUvMjggMDg6NTc6MTYuOTU3XSAoSU5GTykgUGluZ2luZyB3YXRjaGRvZyB0aW1lciBwcm9jZXNzOiBQcm9jZXNzIGlzIGFsaXZlLiIsICJ0aW1lc3RhbXAiOiAxNjIyMTkyMjM2OTU3fQ==,logdata,0,23986505,2021-05-28T08:57:49.599+0000,0
,eyJtZXNzYWdlIiAiWzIwMjEvMDUvMjggMDg6NTc6MzQuODYzXSAoSU5GTykgRGV2aWNlIDB4Njg4ZiBpcyBvbmxpbmUuIiwgInRpbWVzdGFtcCI6IDE2MjIxOTIyNTQ4NjN9,logdata,0,23986506,2021-05-28T08:57:49.599+0000,0
,eyJtZXNzYWdlIiAiWzIwMjEvMDUvMjggMDg6NTc6MzkuMjAxXSAoSU5GTykgTmV0d29yayBoZWFydGJlYXQgY2hlY2s6IEFsbCBPSy4iLCAidGltZXN0YW1wIjogMTYyMjE5MjI1OTIwMX0=,logdata,0,23986507,2021-05-28T08:57:49.599+0000,0
,eyJtZXNzYWdlIiAiWzIwMjEvMDUvMjggMDg6NTc6MzYuNTU4XSAoSU5GTykgVXNlciAiZ293NDA3MCIgbG9nZ2VkIGluIGZyb20gMTAuNjYuMTMwLjE5MC4iLCAidGltZXN0YW1wIjogMTYyMjE5MjI1NjU1OH0=,logdata,0,23986508,2021-05-28T08:57:49.599+0000,0
,eyJtZXNzYWdlIiAiWzIwMjEvMDUvMjggMDg6NTc6NDYuNzQyXSAoSU5GTykgQWxpZW4gbW9uaXRvciAoYWxpZW5kKTogRGV0ZWN0ZWQgbm8gc3BhY2UgYWxpZW5zLiIsICJ0aW1lc3RhbXAiOiAxNjIyMTkyMjY2NzQyfQ==,logdata,0,23986509,2021-05-28T08:57:49.599+0000,0
,eyJtZXNzYWdlIiAiWzIwMjEvMDUvMjggMDg6NTc6MjMuNTA4XSAoSU5GTykgQ2hlY2twb2ludGluZyBvZmYtaGVhcCBzZXJ2ZXIgbWVtb3J5Li4uIiwgInRpbWVzdGFtcCI6IDE2MjIxOTIyNDM1MDh9,logdata,0,23986510,2021-05-28T08:57:49.599+0000,0
,eyJtZXNzYWdlIiAiWzIwMjEvMDUvMjggMDg6NTc6MDguNDQzXSAoSU5GTykgU2VuZGluZyBTSUdURVJNIHRvIDIgZXhjZXNzIHdvcmtlcnMuIiwgInRpbWVzdGFtcCI6IDE2MjIxOTIyMjg0NDN9,logdata,0,23986511,2021-05-28T08:57:49.599+0000,0
,eyJtZXNzYWdlIiAiWzIwMjEvMDUvMjggMDg6NTc6NTcuNjQ0XSAoSU5GTykgVXNlciAiZXl1Njk3OCIgbG9nZ2VkIGluIGZyb20gMTcyLjQxLjE0LjE5OC4iLCAidGltZXN0YW1wIjogMTYyMjE5MjI3NzY0NH0=,logdata,0,23986512,2021-05-28T08:57:49.599+0000,0
,eyJtZXNzYWdlIiAiWzIwMjEvMDUvMjggMDg6NTY6NTUuMTEyXSAoSU5GTykgV2FpdGluZyBmb3IgbWFzdGVyIHByb2Nlc3MgKFBJRCAxMTIxNSkgdG8gZGllLiIsICJ0aW1lc3RhbXAiOiAxNjIyMTkyMjE1MTEyfQ==,logdata,0,23986513,2021-05-28T08:57:49.599+0000,0


In [0]:
# TEST - Run this cell to test your solution.
initSchemaStr = str(initialDF.schema)

dbTest("SS-04-key",       True, "(key,BinaryType,true)" in initSchemaStr)
dbTest("SS-04-value",     True, "(value,BinaryType,true)" in initSchemaStr)
dbTest("SS-04-topic",     True, "(topic,StringType,true)" in initSchemaStr)
dbTest("SS-04-partition", True, "(partition,IntegerType,true)" in initSchemaStr)
dbTest("SS-04-offset",    True, "(offset,LongType,true)" in initSchemaStr)
dbTest("SS-04-timestamp", True, "(timestamp,TimestampType,true)" in initSchemaStr)
dbTest("SS-04-timestampType", True, "(timestampType,IntegerType,true)" in initSchemaStr)

print("Tests passed!")

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise 2: Do Some ETL Processing</h2>

Perform the following ETL steps:

0. Cast `value` column to STRING
0. `ts_string` is derived from `value` at positions 14 to 24,
0. `epoc` is derived from `unix_timestamp` of `ts_string` using format "yyyy/MM/dd HH:mm:ss.SSS"
0. `capturedAt` is derived from casting `epoc` to `timestamp` format
0. `logData` is created by applying `regexp_extract` on `value`.. use this string `"""^.*\]\s+(.*)$"""`

When you are done, run the TEST cell that follows to verify your results.

In [0]:
# TODO
from pyspark.sql.functions import col, unix_timestamp, regexp_extract
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

cleanDF = (initialDF
 .withColumn("value", col("value").cast("STRING"))                             # Select the "value" column, cast "value" column to STRING
 .withColumn("ts_string", col("value").substr(14,24))                          # Select the "value" column, pull substring(14, 24) from it and rename to "ts_string"
 .withColumn("epoc", unix_timestamp("ts_string", "yyyy/MM/dd HH:mm:ss.SSS"))   # Select the "ts_string" column, apply unix_timestamp to it and rename to "epoc"
 .withColumn("capturedAt", col("epoc").cast("timestamp"))                      # Select the "epoc" column and cast to a timestamp and rename it to "capturedAt"
 .withColumn("logData", regexp_extract("value", "^.*\]\s+(.*)$", 1))           # Select the "logData" column and apply the regexp `"""^.*\]\s+(.*)$"""`
)
display(cleanDF, streamName = "cleanStream")

key,value,topic,partition,offset,timestamp,timestampType,ts_string,epoc,capturedAt,logData
,"{""message"" ""[2021/05/28 08:57:14.279] (WARN) Space on disk /dev/sdb2 is low: Only 480 MB available."", ""timestamp"": 1622192234279}",logdata,0,23986603,2021-05-28T08:57:52.599+0000,0,2021/05/28 08:57:14.279],1622192234,2021-05-28T08:57:14.000+0000,"(WARN) Space on disk /dev/sdb2 is low: Only 480 MB available."", ""timestamp"": 1622192234279}"
,"{""message"" ""[2021/05/28 08:57:41.146] (INFO) Alien monitor (aliend): Detected no space aliens."", ""timestamp"": 1622192261146}",logdata,0,23986604,2021-05-28T08:57:52.599+0000,0,2021/05/28 08:57:41.146],1622192261,2021-05-28T08:57:41.000+0000,"(INFO) Alien monitor (aliend): Detected no space aliens."", ""timestamp"": 1622192261146}"
,"{""message"" ""[2021/05/28 08:57:22.144] (WARN) Master took 550 ms to respond (threshold=500 ms)"", ""timestamp"": 1622192242144}",logdata,0,23986605,2021-05-28T08:57:52.599+0000,0,2021/05/28 08:57:22.144],1622192242,2021-05-28T08:57:22.000+0000,"(WARN) Master took 550 ms to respond (threshold=500 ms)"", ""timestamp"": 1622192242144}"
,"{""message"" ""[2021/05/28 08:56:58.214] (INFO) Your mother called. She says you never call her."", ""timestamp"": 1622192218214}",logdata,0,23986606,2021-05-28T08:57:52.599+0000,0,2021/05/28 08:56:58.214],1622192218,2021-05-28T08:56:58.000+0000,"(INFO) Your mother called. She says you never call her."", ""timestamp"": 1622192218214}"
,"{""message"" ""[2021/05/28 08:56:53.030] (INFO) Rolling 3 log file(s)..."", ""timestamp"": 1622192213030}",logdata,0,23986607,2021-05-28T08:57:52.599+0000,0,2021/05/28 08:56:53.030],1622192213,2021-05-28T08:56:53.000+0000,"(INFO) Rolling 3 log file(s)..."", ""timestamp"": 1622192213030}"
,"{""message"" ""[2021/05/28 08:57:27.786] (INFO) Pinging watchdog timer process: Process is alive."", ""timestamp"": 1622192247786}",logdata,0,23986608,2021-05-28T08:57:52.599+0000,0,2021/05/28 08:57:27.786],1622192247,2021-05-28T08:57:27.000+0000,"(INFO) Pinging watchdog timer process: Process is alive."", ""timestamp"": 1622192247786}"
,"{""message"" ""[2021/05/28 08:58:48.505] (INFO) Sending SIGTERM to 4 excess workers."", ""timestamp"": 1622192328505}",logdata,0,23986609,2021-05-28T08:57:52.599+0000,0,2021/05/28 08:58:48.505],1622192328,2021-05-28T08:58:48.000+0000,"(INFO) Sending SIGTERM to 4 excess workers."", ""timestamp"": 1622192328505}"
,"{""message"" ""[2021/05/28 08:57:43.194] (INFO) Waiting for worker process (PID 43411) to die."", ""timestamp"": 1622192263194}",logdata,0,23986610,2021-05-28T08:57:52.599+0000,0,2021/05/28 08:57:43.194],1622192263,2021-05-28T08:57:43.000+0000,"(INFO) Waiting for worker process (PID 43411) to die."", ""timestamp"": 1622192263194}"
,"{""message"" ""[2021/05/28 08:58:33.447] (INFO) Waiting for supervisor process (PID 55955) to die."", ""timestamp"": 1622192313447}",logdata,0,23986611,2021-05-28T08:57:52.599+0000,0,2021/05/28 08:58:33.447],1622192313,2021-05-28T08:58:33.000+0000,"(INFO) Waiting for supervisor process (PID 55955) to die."", ""timestamp"": 1622192313447}"
,"{""message"" ""[2021/05/28 08:58:27.393] (INFO) Flushed 72 buffers to disk."", ""timestamp"": 1622192307393}",logdata,0,23986612,2021-05-28T08:57:52.599+0000,0,2021/05/28 08:58:27.393],1622192307,2021-05-28T08:58:27.000+0000,"(INFO) Flushed 72 buffers to disk."", ""timestamp"": 1622192307393}"


In [0]:
# TEST - Run this cell to test your solution.
schemaStr = str(cleanDF.schema)

dbTest("SS-04-schema-value",     True, "(value,StringType,true)" in schemaStr)
dbTest("SS-04-schema-ts_string",  True, "(ts_string,StringType,true)" in schemaStr)
dbTest("SS-04-schema-epoc",   True, "(epoc,LongType,true)" in schemaStr)
dbTest("SS-04-schema-capturedAt", True, "(capturedAt,TimestampType,true)" in schemaStr)
dbTest("SS-04-schema-logData",  True, "(logData,StringType,true)" in schemaStr)

print("Tests passed!")

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise 3: Classify and Count IP Addresses Over a 10s Window</h2>

To solve this problem, you need to:

0. Parse the first part of an IP address from the column `logData` with the `regexp_extract()` function
  * You will need a regular expression which we have already provided below as `IP_REG_EX`
  * Hint: take the 1st matching value
0. Filter out the records that don't contain IP addresses
0. Form another column called `ipClass` that classifies IP addresses based on the first part of an IP address 
  * 1 to 126: "Class A"
  * 127: "Loopback"
  * 128 to 191: "Class B"
  * 192 to 223: "Class C"
  * 224 to 239: "Class D"
  * 240 to 256: "Class E"
  * anything else is invalid
0. Perform an aggregation over a window of time, grouping by the `capturedAt` window and `ipClass`
  * For this lab, use a 10-second window
0. Count the number of IP values that belong to a specific `ipClass`
0. Sort by `ipClass`

In [0]:
# TODO
from pyspark.sql.functions import col, length, window, when

# This is the regular expression pattern that we will use 
IP_REG_EX = """^.*\s+(\d{1,3})\.\d{1,3}\.\d{1,3}\.\d{1,3}.*$"""

ipDF = (cleanDF
 .withColumn("ip", regexp_extract("logData", IP_REG_EX, 1))                        # apply regexp_extract on IP_REG_EX with value of 1 to "logData" and rename it "ip"
 .filter(length(col("ip")) > 0)                                                    # keep only "ip" that have non-zero length
 .withColumn("ipClass"  
             ,when((col("ip") > 0) & (col("ip") < 127), "Class A")
             .when(col("ip") == 127, "Loopback")
             .when((col("ip") > 127) & (col("ip") < 192), "Class B")
             .when((col("ip") >= 192) & (col("ip") < 224), "Class C")
             .when((col("ip") >= 224) & (col("ip") < 240), "Class D")
             .when((col("ip") >= 240) & (col("ip") < 256), "Class E")
             .otherwise("Invalid"))                                                # figure out class of IP address based on first two octets
 .groupBy(window(col("CapturedAt"), "10 seconds").alias("time"), col("ipClass"))   # gather in 10 second windows of "capturedAt", call them "time" and "ipClass" 
 .count()                                                                          # add up total
 .orderBy(col("ipClass"))                                                          # sort by IP class
       )

In [0]:
# TEST - Run this cell to test your solution.
schemaStr = str(ipDF.schema)

dbTest("SS-04-schema-ipClass", True, "(ipClass,StringType,false)" in schemaStr)
dbTest("SS-04-schema-count",   True, "(count,LongType,false)" in schemaStr)
dbTest("SS-04-schema-start",   True, "(start,TimestampType,true)" in schemaStr)
dbTest("SS-04-schema-end",     True, "(end,TimestampType,true)" in schemaStr)

print("Tests passed!")

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise 4: Display a LIVE Plot</h2>

The `DataFrame` that you pass to `display()` should have three columns:

* `time`: The time window structure
* `ipClass`: The class the first part of the IP address belongs to
* `count`: The number of times that said class of IP address appeared in the window

Under <b>Plot Options</b>, use the following:
* <b>Keys:</b> `ipClass`
* <b>Values:</b> `count`

<b>Display type:</b> is 
* <b>Pie Chart</b>

<img src="https://files.training.databricks.com/images/eLearning/Structured-Streaming/plot-options-pie.png"/>

In [0]:
# TODO
display(ipDF, streamName = "ipStream")

time,ipClass,count
"List(2021-05-28T09:04:10.000+0000, 2021-05-28T09:04:20.000+0000)",Class A,5
"List(2021-05-28T08:58:50.000+0000, 2021-05-28T08:59:00.000+0000)",Class A,17
"List(2021-05-28T09:00:50.000+0000, 2021-05-28T09:01:00.000+0000)",Class A,10
"List(2021-05-28T09:02:10.000+0000, 2021-05-28T09:02:20.000+0000)",Class A,18
"List(2021-05-28T08:57:10.000+0000, 2021-05-28T08:57:20.000+0000)",Class A,5
"List(2021-05-28T08:58:40.000+0000, 2021-05-28T08:58:50.000+0000)",Class A,17
"List(2021-05-28T09:00:10.000+0000, 2021-05-28T09:00:20.000+0000)",Class A,18
"List(2021-05-28T09:01:20.000+0000, 2021-05-28T09:01:30.000+0000)",Class A,17
"List(2021-05-28T08:59:30.000+0000, 2021-05-28T08:59:40.000+0000)",Class A,14
"List(2021-05-28T09:01:40.000+0000, 2021-05-28T09:01:50.000+0000)",Class A,19


In [0]:
# TEST - Run this cell to test your solution.
dbTest("SS-04-numActiveStreams", True, len(spark.streams.active) > 0)

print("Tests passed!")

Wait until stream is done initializing...

In [0]:
untilStreamIsReady("ipStream")

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise 5: Stop streaming jobs</h2>

Before we can conclude, we need to shut down all active streams.

In [0]:
# TODO
for s in spark.streams.active:     # Iterate over all the active streams
    print("Stopping: ", s.name)
    s.stop()                       # Stop the stream

In [0]:
# TEST - Run this cell to test your solution.
dbTest("SS-04-numActiveStreams", 0, len(spark.streams.active))

print("Tests passed!")