# Finding Web Servers in Streaming Network Data With Spark

It's time to run our streaming experiment. In this notebook, we'll do a few things:

1. Install Java, Spark, Findspark and PySpark and mount Google Drive to Colab.
2. Import all the modules we need to run.
3. Set up our constants and our Spark session.
4. Then we'll test our hypothesis about web servers on the directory of JSON files using batch processing.
5. Finally, we'll stream the JSON files through Spark, run our query, and compare our results.

In [1]:
# Install Java, Spark, Findspark and PySpark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"

!pip install -q findspark
!pip install pyspark

# mount Google Drive
from google.colab import drive
drive.mount('/content/gdrive')

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/88/01/a37e827c2d80c6a754e40e99b9826d978b55254cc6c6672b5b08f2e18a7f/pyspark-2.4.0.tar.gz (213.4MB)
[K    100% |████████████████████████████████| 213.4MB 110kB/s 
[?25hCollecting py4j==0.10.7 (from pyspark)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K    100% |████████████████████████████████| 204kB 29.2MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Stored in directory: /root/.cache/pip/wheels/cd/54/c2/abfcc942eddeaa7101228ebd6127a30dbdf903c72db4235b23
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.0
Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleus

First, we need to import everything we need for our project. Note that we're importing a number of data types - this is needed to define our streaming JSON schema. This is probably the most important part of the streaming setup. It's critical to get it right up front.

We also import `StreamingContext` through which we handle our streaming.

In [0]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc, col, window
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
from pyspark.streaming import StreamingContext

import json
import time
import os

Next, we define our input path - yours may differ from the one below. Remember, though - it's important that this be the path **inside** your drive.

We also get the number of files in the directory, then get an "offset" value we'll use later on to signal when our query is completed.

Now, start your `SparkSession`.

In [3]:
inputPath = "/content/gdrive/My Drive/Colab Datasets/lanl/"

numFiles = len(os.listdir(inputPath))
numFileOffset = numFiles - 1

print(f"There are {numFiles} files in our inputPath, which gives an offset of {numFileOffset}.")

APP_NAME = "Web Server Hypothesis Test"
SPARK_URL = "local[*]"

spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()

There are 50 files in our inputPath, which gives an offset of 49.


Define our schema for the streaming data frame.

In [0]:
flowSchema = StructType([
    StructField('time', TimestampType(), True),
    StructField('duration', LongType(), True),
    StructField('srcdevice', StringType(), True),
    StructField('dstdevice', StringType(), True),
    StructField('protocol', LongType(), True),
    StructField('srcport', StringType(), True),
    StructField('dstport', StringType(), True),
    StructField('srcpackets', LongType(), True),
    StructField('dstpackets', LongType(), True),
    StructField('srcbytes', LongType(), True),
    StructField('dstbytes', LongType(), True)
])

Now that we've set up the session and all supporting variables, it's time to run some queries.

First, we'll create a static dataframe from all files in the directory. Spark makes this easy since we can pass it either a single filename, or a directory that contains our files, and it handles them the same in each case.

In [0]:
# Static DataFrame representing data in the JSON files
staticInputDF = spark.read.json(inputPath)

Check the schema. We don't need to apply our schema to this static data frame, so we don't do so at this time.

In [6]:
staticInputDF.printSchema()

root
 |-- dstbytes: double (nullable = true)
 |-- dstdevice: string (nullable = true)
 |-- dstpackets: double (nullable = true)
 |-- dstport: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- protocol: double (nullable = true)
 |-- srcbytes: double (nullable = true)
 |-- srcdevice: string (nullable = true)
 |-- srcpackets: double (nullable = true)
 |-- srcport: string (nullable = true)
 |-- time: double (nullable = true)



Time to build our query. It's a pretty simple SQL-like query.

Recall that each row in the dataset represents a single conversation between two devices. A web server should be queried on ports 80 and 443 disproportionately more than devices that are not web servers.

We select the `dstdevice` column, but limit only to the rows where the `dstport` is either `80` or `443`. Then we group by the `dstdevice` and get the count of each `dstdevice`.

Next, we sort by the count descending and show the top 20 web servers.

In [7]:
staticInputDF.select('dstdevice') \
    .where(col('dstport').isin([80, 443])) \
    .groupby('dstdevice') \
    .count() \
    .sort(desc('count')) \
    .show(20)

+-------------------+-----+
|          dstdevice|count|
+-------------------+-----+
|EnterpriseAppServer|14495|
|         Comp576843|14153|
|         Comp186884|12681|
|         Comp501516| 5859|
|         Comp393033| 3795|
|         Comp916004| 3332|
|         Comp498128| 2831|
|         Comp573929| 2555|
|         Comp611862| 2404|
|         Comp370444| 2385|
|         Comp097048| 1991|
|         Comp847595| 1886|
|         Comp574103| 1629|
|         Comp657655| 1590|
|         Comp309567| 1576|
|         Comp216677| 1528|
|         Comp509586| 1516|
|         Comp336938| 1501|
|         Comp146745| 1451|
|         Comp457448| 1180|
+-------------------+-----+
only showing top 20 rows



Now that we have our static baseline in place, let's try to replicate it in the streaming context.

The good news here is that Spark treats a Streaming dataframe just like a static/batch dataframe. So the code you'll see below should be very familiar.

First, we set up a streaming input data frame. This gets the rows from our JSON, one file at a time.

We simply tell Spark to read a stream limited to one file at a time, apply the defined schema, and use the JSON interpreter on the directory specified in `inputPath`.

In [0]:
streamingInputDF = (
  spark
    .readStream                       
    .schema(flowSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

Streaming handles things _slightly_ different - we need to create a streaming counts dataframe; we can't query the input dataframe directly.

This is where we define our query - note that it looks almost identical to the static dataframe we saw earlier - and we confirm to Spark that yes, our counts dataframe is indeed a streaming dataframe.

In [9]:
streamingCountsDF = streamingInputDF \
    .select('dstdevice') \
    .where(col('dstport').isin([80, 443])) \
    .groupBy(streamingInputDF.dstdevice) \
    .count() \
    .sort(desc('count'))

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

True

Now, it's time to start our streaming engine. We do this by creating an object named `query` that writes the stream into an in-memory table called `counts`. This is the table we'll monitor during streaming to see the progress of our count of web servers.

We also set a shuffles partition to a small value here.

The streaming process only begins when we execute the `.start()` method on the `streamingCountsDF` object.

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")       
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

Finally, we monitor the query as it proceeds. We let the query run a little by sleeping for four seconds; then we fall into a loop that updates each second until all files have been processed and the query has stopped. Unfortunately there isn't an easy way to see that the query has stopped, so we use our `numFileOffset` value to match with the `logOffset` in the `recentProgress` structure. Once they're equal, we terminate our loop, knowing we've run through all of the files in the directory.

In [11]:
# let the query run for a bit to insure there is data in the recent progress structure.
time.sleep(4)

# Monitor the progress of the query. The last table should be identical to the static query.
while True:
    time.sleep(1)
    if query.recentProgress[-1]['sources'][0]['endOffset']['logOffset'] == numFileOffset:
      spark.sql("select * from counts").show(20)
      break
      
    spark.sql("select * from counts").show(20)

+-------------------+-----+
|          dstdevice|count|
+-------------------+-----+
|EnterpriseAppServer| 1173|
|         Comp186884| 1162|
|         Comp576843| 1101|
|         Comp501516|  497|
|         Comp393033|  361|
|         Comp916004|  282|
|         Comp611862|  205|
|         Comp370444|  198|
|         Comp847595|  193|
|         Comp097048|  185|
|         Comp573929|  172|
|         Comp309567|  150|
|         Comp574103|  145|
|         Comp509586|  144|
|         Comp216677|  142|
|         Comp336938|  127|
|         Comp043650|  104|
|         Comp162943|  103|
|         Comp457448|   96|
|         Comp253298|   96|
+-------------------+-----+
only showing top 20 rows

+-------------------+-----+
|          dstdevice|count|
+-------------------+-----+
|EnterpriseAppServer| 1482|
|         Comp576843| 1389|
|         Comp186884| 1358|
|         Comp573929|  653|
|         Comp501516|  573|
|         Comp393033|  446|
|         Comp916004|  345|
|         Comp611862| 

So that's it!

If you compare the last table in the streaming example to the static dataframe query, you'll see that the two are identical. 