<div style="float: right; margin: 20px 20px 20px 20px"><img src="images/bro.png" width="100px"></div>

# Bro to Spark: Clustering
In this notebook we will pull Bro data into Spark then do some analysis and clustering. The first step is to convert your Bro log data into a Parquet file, for instructions on how to do this (just a few lines of Python code using the BAT package) please see this notebook:

<div style="float: right; margin: 0px 0px 0px 0px"><img src="images/parquet.png" width="300px"></div>

### Bro logs to Parquet Notebook
- [Bro to Parquet to Spark](https://github.com/Kitware/bat/blob/master/notebooks/Bro_to_Parquet_to_Spark.ipynb)

Apache Parquet is a columnar storage format focused on performance. Parquet data is often used within the Hadoop ecosystem and we will specifically be using it for loading data into Spark.

<div style="float: right; margin: 0px 0px 0px 0px"><img src="images/mllib.png" width="200px"></div>
<div style="float: right; margin: 30px 0px 0px 0px"><img src="images/spark.png" width="200px"></div>

### Software
- Bro Analysis Tools (BAT): https://github.com/Kitware/bat
- Parquet: https://parquet.apache.org
- Spark: https://spark.apache.org
- Spark MLLib: https://spark.apache.org/mllib/

### Data
- Sec Repo: http://www.secrepo.com (no headers on these)
- Kitware: [data.kitware.com](https://data.kitware.com/#collection/58d564478d777f0aef5d893a) (with headers)

In [2]:
# Third Party Imports
import pyspark
from pyspark.sql import SparkSession
import pyarrow

# Local imports
import bat
from bat.log_to_parquet import log_to_parquet

# Good to print out versions of stuff
print('BAT: {:s}'.format(bat.__version__))
print('PySpark: {:s}'.format(pyspark.__version__))
print('PyArrow: {:s}'.format(pyarrow.__version__))

BAT: 0.2.9
PySpark: 2.2.0
PyArrow: 0.6.0


<div style="float: right; margin: 20px 20px 20px 20px"><img src="images/spark.png" width="200px"></div>

# Spark It!
### Spin up Spark with 4 Parallel Executors
Here we're spinning up a local spark server with 4 parallel executors, although this might seem a bit silly since we're probably running this on a laptop, there are a couple of important observations:

<div style="float: right; margin: 20px 20px 20px 20px"><img src="images/spark_jobs.png" width="400px"></div>

- If you have 4/8 cores use them!
- It's the exact same code logic as if we were running on a distributed cluster.
- We run the same code on **DataBricks** (www.databricks.com) which is awesome BTW.



In [3]:
# Spin up a local Spark Session (with 4 executors!)
spark = SparkSession.builder.master("local[4]").appName('my_awesome').getOrCreate()

<div style="float: right; margin: 50px 20px 20px -20px"><img src="images/parquet.png" width="350px"></div>
##  Read in our Parquet File
Here we're loading in a Bro HTTP log with ~2 million rows to demonstrate the functionality and do some analysis and clustering on the data. For more information on converting Bro logs to Parquet files please see our Bro to Parquet notebook:

#### Bro logs to Parquet Notebook
- [Bro to Parquet to Spark](https://github.com/Kitware/bat/blob/master/notebooks/Bro_to_Parquet_to_Spark.ipynb)

In [5]:
# Have Spark read in the Parquet File
spark_df = spark.read.parquet("http.parquet")

<div style="float: left; margin: 20px 20px 20px 20px"><img src="images/eyeball.jpeg" width="150px"></div>
# Lets look at our data
We should always inspect out data when it comes in. Look at both the data values and the data types to make sure you're getting exactly what you should be.

In [6]:
# Get information about the Spark DataFrame
num_rows = spark_df.count()
print("Number of Rows: {:d}".format(num_rows))
columns = spark_df.columns
print("Columns: {:s}".format(','.join(columns)))

Number of Rows: 2048442
Columns: filename,host,id.orig_h,id.orig_p,id.resp_h,id.resp_p,info_code,info_msg,method,orig_fuids,orig_mime_types,password,proxied,referrer,request_body_len,resp_fuids,resp_mime_types,response_body_len,status_code,status_msg,tags,trans_depth,uid,uri,user_agent,username,ts


In [7]:
spark_df.groupby('method','status_code').count().sort('count', ascending=False).show()

+-------+-----------+-------+
| method|status_code|  count|
+-------+-----------+-------+
|   HEAD|        404|1294022|
|    GET|        404| 429361|
|   POST|        200| 125638|
|    GET|        200|  88636|
|   POST|          0|  32918|
|    GET|        400|  29152|
|    GET|        303|  10858|
|    GET|        403|   8530|
|   POST|        404|   4277|
|    GET|        304|   3851|
|    GET|        302|   3250|
|    GET|          0|   2823|
|    GET|        401|   2159|
|OPTIONS|        200|   1897|
|   POST|        302|   1226|
|   HEAD|        503|   1010|
|   POST|        206|    869|
|    GET|        301|    642|
|   HEAD|          0|    606|
|    GET|        503|    550|
+-------+-----------+-------+
only showing top 20 rows



<div style="float: right; margin: 50px 0px 0px 20px"><img src="images/deep_dive.jpeg" width="350px"></div>

# Data looks good, lets take a deeper dive
Spark has a powerful SQL engine as well as a Machine Learning library. So now that we've loaded our Bro data we're going to utilize the Spark SQL commands to do some investigation of our data including clustering from the MLLib.

<div style="float: left; margin: 20px 0px 0px 0px"><img src="images/spark_sql.jpg" width="180px"></div>
<div style="float: left; margin: 0px 50px 0px 0px"><img src="images/mllib.png" width="180px"></div>

In [None]:
    # Trains a k-means model.
    kmeans = KMeans().setK(2).setSeed(1)
    model = kmeans.fit(dataset)

    # Evaluate clustering by computing Within Set Sum of Squared Errors.
    wssse = model.computeCost(dataset)
    print("Within Set Sum of Squared Errors = " + str(wssse))

    # Shows the result.
    centers = model.clusterCenters()
    print("Cluster Centers: ")
    for center in centers:
        print(center)

# More Coming...

<div style="float: right; margin: 50px 0px 0px -20px"><img src="https://www.kitware.com/img/small_logo_over.png" width="250px"></div>
## Wrap Up
Well that's it for this notebook, we pulled in Bro data from a Parquet file, then did some digging with high speed, parallel SQL operations and we clustered our data to organize the restuls.

If you liked this notebook please visit the [BAT](https://github.com/Kitware/bat) project for more notebooks and examples.