# Bro IDS to Spark
In this notebook we're going to explore how to put Bro IDS log data into Spark.


## Install Local Spark (NEW as of July 17th 2017!)
- pip install pyspark

You can test whether spark is installed correctly by starting up the spark shell.
- $ spark-shell

There are some warnings and stuff but if you get this you have successfully installed spark.
You can quit the shell by typing ':quit' and the scala> prompt

<div style="margin: 20px 20px 20px 20px"><img align="left" src="images/spark.png" width="500px"></div>

In [2]:
from pyspark.sql import SparkSession

In [3]:
from brothon import bro_log_reader
import pandas as pd

In [4]:
# Convert Bro IDS log to Pandas DataFrame
reader = bro_log_reader.BroLogReader('../data/dns.log')
dns_df = pd.DataFrame(reader.readrows())
dns_df.head()

Successfully monitoring ../data/dns.log...


Unnamed: 0,AA,RA,RD,TC,TTLs,Z,answers,id.orig_h,id.orig_p,id.resp_h,...,qclass_name,qtype,qtype_name,query,rcode,rcode_name,rejected,trans_id,ts,uid
0,False,True,True,False,36.000000,0,54.245.228.191,192.168.33.10,1030,4.2.2.3,...,C_INTERNET,1,A,guyspy.com,0,NOERROR,False,44949,2013-09-15 17:44:27.631940,CZGShC2znK1sV7jdI7
1,False,True,True,False,"1000.000000,36.000000",0,"guyspy.com,54.245.228.191",192.168.33.10,1030,4.2.2.3,...,C_INTERNET,1,A,www.guyspy.com,0,NOERROR,False,50071,2013-09-15 17:44:27.696869,CZGShC2znK1sV7jdI7
2,False,True,True,False,"60.000000,60.000000,60.000000,60.000000,60.000...",0,"54.230.86.87,54.230.86.18,54.230.87.160,54.230...",192.168.33.10,1030,4.2.2.3,...,C_INTERNET,1,A,devrubn8mli40.cloudfront.net,0,NOERROR,False,39062,2013-09-15 17:44:28.060639,CZGShC2znK1sV7jdI7
3,False,True,True,False,"60.000000,60.000000,60.000000,60.000000,60.000...",0,"54.230.86.87,54.230.86.18,54.230.84.20,54.230....",192.168.33.10,1030,4.2.2.3,...,C_INTERNET,1,A,d31qbv1cthcecs.cloudfront.net,0,NOERROR,False,7312,2013-09-15 17:44:28.141795,CZGShC2znK1sV7jdI7
4,False,True,True,False,"4993.000000,129.000000,129.000000,129.000000",0,"cdn.entrust.net.c.footprint.net,192.221.123.25...",192.168.33.10,1030,4.2.2.3,...,C_INTERNET,1,A,crl.entrust.net,0,NOERROR,False,41872,2013-09-15 17:44:28.422704,CZGShC2znK1sV7jdI7


In [5]:
# Spark needs super clean data
import numpy as np

# Replace Bro '-' with NAs
dns_df.replace('-', np.NaN, inplace=True)
dns_df.dropna(inplace=True)

In [6]:
# Spin up a local Spark kernel
spark = SparkSession.builder.appName('my_awesome').getOrCreate()

In [7]:
# Convert to Spark DF
spark_df = spark.createDataFrame(dns_df)

In [8]:
spark_df.count()

51

In [9]:
# Try a generator
def data_generator():
    for _ in range(10):
        yield {'a': 3, 'b': 'foo', 'c': 2.4}

In [10]:
# Convert to Spark DF
gen_spark_df = spark.createDataFrame(data_generator())



In [11]:
gen_spark_df.count()

10

# Streaming
Okay so at this point I can create a Spark Dataframe and I want to now use my Python Generator to create an 'unbounded DataFrame (as discussed in https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html)

Details:
- From Cell 4 above: reader.readrows() is a Python generator that dynamically 'yields' Python dictionaries when new data comes in.
- I'd like to use this Python Generator as a Streaming 'source'

Resources that I've looked at
- https://spark.apache.org/docs/latest/streaming-programming-guide.html
- https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html
- https://spark.apache.org/docs/latest/streaming-custom-receivers.html
- http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/Structured%20Streaming%20using%20Python%20DataFrames%20API.html

Any thoughts/suggestions/pointers are greatly appreciated

In [12]:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(spark, 60)  

In [None]:
ssc.