Finding Web Servers in Streaming Network Data With Spark
-----------------------------------------------------

It's time to run our streaming experiment. In this notebook, we'll do a few things:
1. Import all the modules we need to run this in Spark.
2. Set up our constants and our Spark session.
3. First, test our hypothesis about web servers on the directory of JSON files using batch processing.
4. Finally, stream the JSON files through Spark, run our query, and compare our results.

---------------------------------------------------------------------

First, import everything we need for our project.
Note that we're importing a number of data types - this is needed to define our streaming JSON schema. This is probably the most important part of the streaming setup. It's critical to get it right up front.

We handle our streaming through the `StreamingContext`.

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.functions import desc, col, window

from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
from pyspark.streaming import StreamingContext

import json
import time
import os

Now define your input path - yours may differ from the one below. Remember, though - it's important that this be the path **inside** your container; not the local one on your laptop.

We also get the number of files in the directory, then get an "offset" value we'll use later on to signal when our query is completed.

Finally, start your `SparkSession`.

In [3]:
inputPath = "/home/ds/notebooks/lanl/day03/json/"

numFiles = len(os.listdir(inputPath))
numFileOffset = numFiles - 1

print(f"There are {numFiles} files in our inputPath, which gives an offset of {numFileOffset}.")

APP_NAME = "Web Server Hypothesis Test"
SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()

There are 50 files in our inputPath, which gives an offset of 49.


Define our schema for the streaming data frame.

In [4]:
flowSchema = StructType([
    StructField('time', TimestampType(), True),
    StructField('duration', LongType(), True),
    StructField('srcdevice', StringType(), True),
    StructField('dstdevice', StringType(), True),
    StructField('protocol', LongType(), True),
    StructField('srcport', StringType(), True),
    StructField('dstport', StringType(), True),
    StructField('srcpackets', LongType(), True),
    StructField('dstpackets', LongType(), True),
    StructField('srcbytes', LongType(), True),
    StructField('dstbytes', LongType(), True)
])

Now that we've set up the session and all supporting variables, it's time to run some queries.

First, we'll create a static dataframe from all files in the directory. Spark makes this easy since we can pass it either a single filename, or a directory that contains our files, and it handles them the same in each case.

In [5]:
# Static DataFrame representing data in the JSON files
staticInputDF = spark.read.json(inputPath)

Check the schema. We don't need to apply our schema to this static data frame, so we don't do so at this time.

In [6]:
staticInputDF.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- dstbytes: string (nullable = true)
 |-- dstdevice: string (nullable = true)
 |-- dstpackets: string (nullable = true)
 |-- dstport: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- srcbytes: string (nullable = true)
 |-- srcdevice: string (nullable = true)
 |-- srcpackets: string (nullable = true)
 |-- srcport: string (nullable = true)
 |-- time: string (nullable = true)



Time to build our query. It's a pretty simple SQL-like query.

Recall that each row in the dataset represents a single conversation between two devices. A web server should be queried on ports 80 and 443 disproportionately more than devices that are not web servers.

We select the `dstdevice` column, but limit only to the rows where the `dstport` is either `80` or `443`. Then we group by the `dstdevice` and get the count of each `dstdevice`.

Finally we sort by the count descending, and show the top 20 web servers.

In [7]:
staticInputDF.select('dstdevice') \
    .where(col('dstport').isin([80, 443])) \
    .groupby('dstdevice') \
    .count() \
    .sort(desc('count')) \
    .show(20)

+-------------------+-----+
|          dstdevice|count|
+-------------------+-----+
|EnterpriseAppServer|14199|
|         Comp576843|13837|
|         Comp186884|12497|
|         Comp501516| 5742|
|         Comp393033| 3712|
|         Comp916004| 3263|
|         Comp498128| 2813|
|         Comp573929| 2554|
|         Comp611862| 2351|
|         Comp370444| 2345|
|         Comp097048| 1954|
|         Comp847595| 1863|
|         Comp574103| 1598|
|         Comp309567| 1551|
|         Comp509586| 1494|
|         Comp216677| 1480|
|         Comp336938| 1463|
|         Comp657655| 1452|
|         Comp146745| 1413|
|         Comp457448| 1159|
+-------------------+-----+
only showing top 20 rows



Now that we have our static baseline in place, let's try to replicate it in the streaming context.

The good news here is that Spark treats a Streaming dataframe just like a static / batch dataframe. So the code you'll see below should be very familiar.

First we set up a streaming input data frame. This gets the rows from our JSON, one file at a time.

We simply tell Spark to read a stream limited to one file at a time, apply the defined schema, and use the JSON interpreter on the directory specified in `inputPath`.

In [8]:
streamingInputDF = (
  spark
    .readStream                       
    .schema(flowSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

Streaming handles things _slightly_ different - we need to create a streaming counts dataframe; we can't query the input dataframe directly.

This is where we define our query - note that it looks almost identical to the static dataframe we saw earlier - and we confirm to Spark that yes, our counts dataframe is indeed a streaming dataframe.

In [9]:
streamingCountsDF = streamingInputDF \
    .select('dstdevice') \
    .where(col('dstport').isin([80, 443])) \
    .groupBy(streamingInputDF.dstdevice) \
    .count() \
    .sort(desc('count'))

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

True

Finally it's time to start our streaming engine. We do this by creating an object named `query` that writes the stream into an in-memory table called `counts`. This is the table we'll monitor during streaming to see the progress of our count of web servers.

We also set a shuffles partition to a small value here.

The streaming process only begins when we execute the `.start()` method on the `streamingCountsDF` object.

In [10]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")       
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

Finally we monitor the query as it proceeds. We let the query run a little by sleeping for four seconds; then we fall into a loop that updates each second until all files have been processed and the query has stopped. Unfortunately there isn't an easy way to see that the query has stopped, so we use our `numFileOffset` value to match with the `logOffset` in the `recentProgress` structure. Once they're equal, we terminate our loop, knowing we've run through all of the files in the directory.

In [11]:
# let the query run for a bit to insure there is data in the recent progress structure.
time.sleep(4)

# Monitor the progress of the query. The last table should be identical to the static query.
while True:
    spark.sql("select * from counts").show(20)
    time.sleep(1)
    if query.recentProgress[-1]['sources'][0]['endOffset']['logOffset'] == numFileOffset:
        break

+-------------------+-----+
|          dstdevice|count|
+-------------------+-----+
|         Comp576843| 5254|
|EnterpriseAppServer| 5250|
|         Comp186884| 4163|
|         Comp501516| 2627|
|         Comp498128| 1656|
|         Comp657655| 1452|
|         Comp370444| 1301|
|         Comp916004| 1206|
|         Comp393033| 1049|
|         Comp611862|  862|
|         Comp097048|  846|
|         Comp573929|  745|
|         Comp574103|  605|
|         Comp146745|  567|
|         Comp847595|  564|
|         Comp457448|  556|
|         Comp509586|  455|
|         Comp309567|  447|
|         Comp216677|  424|
|         Comp523500|  411|
+-------------------+-----+
only showing top 20 rows

+-------------------+-----+
|          dstdevice|count|
+-------------------+-----+
|EnterpriseAppServer| 6377|
|         Comp576843| 6275|
|         Comp186884| 5567|
|         Comp501516| 3114|
|         Comp498128| 1856|
|         Comp370444| 1531|
|         Comp916004| 1466|
|         Comp657655| 

So that's it!

If you compare the last table in the streaming example to the static dataframe query, you'll see that the two are identical. 