<div style="float: right; margin: 30px 30px 30px 30px"><img src="images/future_ui.png" width="400px"><div>What my brain looks like while writing this notebook...</div></div>

# Checking out Brim
Someone recently suggested I check out Brim (https://www.brimsecurity.com/). I did and I found the product/tools/stack so interesting that I decided to write up a notebook about it. This is obviously a **'here's what I think'** notebook so normal disclaimers apply.

In general I'll try to do my best to properly represent the functionality but please feel free to correct/improve/clarify any content where 'I'm not getting it' or 'getting it wrong' (send email to briford.wylie _ at _ gmail.com).
<br><br>

# Brim Architecture/Stack
Here's a simplified diagram of the software stack provided by the Brim product.

<div style="margin: 30px 300px 30px 30px"><img src="images/brim_arch.png" width="500px"></div>
    
## Main Components
- **Brim App/UI**
- **PCAP slicer to Wireshark**
- **Command Line Interface (CLI)**
- **ZQ/ZQL (Query Engine/Pipeline)**
- **ZNG (Indexed File Format)**
    
## Details on Components

### Brim App/UI = Great
The User Interface on the Brim product is fantastic. Like *Slack*, it's built on the popular Electron toolkit (https://www.electronjs.org/).  It looks a bit like Splunk/ELK in some cases but the Brim interface has been vertically tailored to Zeek output which makes it **WAY** better. The pivot and drilldown functionality are super nice. As part of the tailored interface they even take care of little details like turning the connection state history from something opaque like 'ShADadR' to a nice connection history diagram that's much easier to interpret. Anyway the UI is great, in my opinion it's worth getting the product just for this beautiful, easy, and functional interface (https://www.brimsecurity.com/).

### PCAP slicer to Wireshark = Nice
<div style="float: right; margin: 10px 10px 10px 10px"><img src="images/wireshark.png" width="200px"></div>

From the connection information in the Zeek conn.log you can grab the timestamps and the network '5-tuple'. These can be used to filter the PCAP into just the packets relevent to your query/drilldown. When they first process the PCAP they compute a Packet Index file, I'm assuming this index stores time/5-tuple info mapped to byte offsets into the PCAP. When the user does a 'drill down' in the UI, they can pull out just the packets they want from the PCAP. Although PCAP slicing has been around for a while the Brim implementation looks to be nice, fast, and functional. 

**Note to Self:** Spin up something equivalent with dpkt (https://github.com/kbandla/dpkt), Parquet, and a few helper classes... perhaps call it the 'Zeek PCAP Scalpel' (https://github.com/SuperCowPowers/zps).

### Command Line Interface = Mixed
When I first saw the Command Line Interface for ZQ/ZQL I thought it was cool. I think for fast hunting, experimenting it will be useful. As mentioned above, the hunting/searching aspect in the App/UI is really nice, so here we're just talking about the command line interface. From a user experience perspective the CLI will be useful for simple filtering, grouping, counting. Using the CLI for the construction of a complicated pipeline that's used for meaningful analysis will be tenuous at best and frustrating or not even possible at worst. After about a day you're probably going to really want some sort of scripting interface to ZQL.

### ZQ/ZQL + ZNG = Confused
<div style="float: right; margin: 0px 0px 0px -30px"><img src="images/confused.jpg" width="250px"></div>

ZQ/ZQL Query Engine/Pipeline + ZNG Indexed File Format. Here the query engine and file format are somewhat interlinked. The storage format, indexing, and partitioning of the on-disk data format often drives the functionality and performance of the query engine/data pipeline. In their youtube demo they compare ZNG to several different files formats https://youtu.be/ldrEadAQYTM?t=1315. They don't include Parquet so I'll cover the Parquet alternative below. One of their 'good' check boxes is the support for hetergenous objects/rows...

**Hetergenous Objects: Is that really a good idea?** 

At a higher level this question is really ROW vs COLUMN or if we're looking at popular open source formats perhaps **Avro** vs **Parquet** (Avro: http://avro.apache.org/ Parquet: https://parquet.apache.org/). Zeek logs have a well defined, documented, and **stable** set of output fields/types. Each log (conn, dns, http, x509, etc) has a well known set of columns/fields with associated data types. That's a good thing in my opinion. In particular, columnar storage allows the vectorization of many operations, increases performance, and significantly reduces disk usage (random example blog: https://www.cloudforecast.io/blog/using-parquet-on-athena-to-save-money-on-aws/). 

<div style="float: right; margin: 10px 10px 10px 10px"><img src="images/foot.jpeg" width="300px"></div>

**Rolling your Own File Format?**

For the types of operations the Brim App conducts (groups, aggregations, counts, timespans, etc).. columnar seems to be the way to go. At this point I'm confused... there's obviously something that I'm missing.. so looking for answers I went to go check out the ZNG spec: https://github.com/brimsec/zq/blob/master/zng/docs/spec.md#3-zng-binary-format-zng as part of this documentation they call out the benefits of supporting 'different data types'...



> ZNG gets more interesting when different data types are interleaved in the stream. For example, consider this TZNG stream:"

<div style="float: right; margin: 10px 10px 10px 10px"><img src="images/confused_2.jpeg" width="300px"></div>

```
#35:string
35:hello, world
#36:int64
36:42
35:there's a fly in my soup!
35:no, there isn't.
36:3
```
My first reaction is this feels like a bad case of **rolling your own**. I could definitely be convinced that perhaps in a streaming data use case **Avro** is better than **Parquet**.. but rolling your own file format? Really?


**The first obvious question (at least for me) is why not Parquet?**

Instead of ZNG + ZQL why not `Parquet + Pandas` or `Parquet + Spark` or `Parquet + Drill` or `Parquet + Athena` or a zillion other combinations. In particular I'm a big fan of using/leveraging popular toolkits/libraries whenever possible. One of the goals of my software is to help 'bridge' from one cool thing to another. For instance **Zeek to Parquet** or **Zeek to Spark** or **Zeek to Kafka to Spark**. When you're leveraging popular open source your often taking advantage of **>10k person hours of work**, so NOT doing this should be viewed with a large amount of scepticism. :)

***

<div style="float: right; margin: 30px 0px 0px 0px"><img src="images/parquet.png" width="280px"></div>

## Okay so is Parquet/Spark the 'right thing'?
Maybe.. obviously the 'right thing' is based on your use cases.. but from my experience using popular open source libraries/formats like Spark and Parquet give you significant benefits for **most** use cases. In particular both Spark and Parquet 'scale up' from laptop, to beefy server, to AWS/Azure mega clusters. So from a developmentment tool chain perspective, you can prototype on your laptop and then use the same code/approaches as you scale up.

So cutting to the chase...
- **Parquet** > ZNG
- **Spark** >> ZQ/ZQL
<br><br><br>

<div style="float: right; margin: -80px 0px 0px 0px"><img src="images/spark.png" width="220px"></div>

## Let's get to the functionality/code
Enough, blah-blah, lets actually do a bit of coding.The rest of this notebook replicates the ZQ/ZQL functionality shown in this 'Zeek at Home' Demo (from Brim Security):
- https://www.youtube.com/watch?v=ldrEadAQYTM. 

In this notebook we'll load up some Zeek logs, convert the logs to Parquet files. We also perform some data exploration, querying, and analysis with Spark. 

### Notebook Install Requirements
```pip install zat```

### Notes
- Zeek Analysis Tools (ZAT) was shown at BroCon 2017: https://youtu.be/pG5lU9CLnIU
- ZAT is simply a 'helper' library, it helps you convert Zeek logs to Parquet and makes loading Zeek data into Spark easy.
- In addition to the tiny bit of functionality shown here, Spark has awesome data exploration, transformations, pipelines, and of course, machine learning libraries.
  - https://nbviewer.jupyter.org/github/SuperCowPowers/zat/blob/master/notebooks


### Software
- Zeek Analysis Tools (ZAT): https://github.com/SuperCowPowers/zat
- Parquet: https://parquet.apache.org
- Spark: https://spark.apache.org

### Data
- This script uses a Zeek CONN log with ~23 million rows and also a HTTP log
- Both datasets are available here: https://data.kitware.com/#collection/58d564478d777f0aef5d893a

In [1]:
# Third Party Imports
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# ZAT imports
import zat
from zat import log_to_sparkdf

# Good to print out versions of stuff
print('ZAT: {:s}'.format(zat.__version__))
print('PySpark: {:s}'.format(pyspark.__version__))

ZAT: 0.3.9
PySpark: 3.0.0


<div style="float: right; margin: 20px 20px 20px 20px"><img src="images/spark.png" width="200px"></div>

# Spark It!
### Spin up Spark with 4 Parallel Executors
Here we're spinning up a local spark server with 4 parallel executors, although this might seem a bit silly since we're probably running this on a laptop, there are a couple of important observations:

<div style="float: right; margin: 20px 20px 20px 20px"><img src="images/spark_jobs.png" width="400px"></div>

- If you have 4/8 cores use them!
- It's the exact same code logic as if we were running on a distributed cluster.
- We run the same code on **DataBricks** (www.databricks.com) which is awesome BTW.



In [2]:
# Spin up a local Spark Session (with 4 executors)
spark = SparkSession.builder.master('local[4]').appName('my_awesome').getOrCreate()

In [3]:
# Use the ZAT class to load our log file into a Spark dataframe
spark_it = log_to_sparkdf.LogToSparkDF(spark)

# Read in Zeek HTTP Log
data_path = '/Users/briford/data/bro/http.log'  # Obviously you'll need to change this :)
http_df = spark_it.create_dataframe(data_path)

<div style="float: right; margin: 0px 0px 0px -80px"><img src="images/spark_distributed.png" width="500px"></div>

# Spark Workers and Data Partitions
Spark will read in and partition the data out to our workers. Our dataframe(rdd) will have some number of partitions that are divided up amongst the worker pool. Each worker will operate on only a subset of the data and Spark will manage the 'magic' for how that work gets run, aggregated and presented.


**Image Credit:** Jacek Laskowski, please see his excellent book - Mastering Apache Spark  https://jaceklaskowski.gitbooks.io/mastering-apache-spark

In [4]:
http_df.rdd.getNumPartitions()

11

## Replicate functionality shown at https://youtu.be/ldrEadAQYTM?t=923
We're using HTTP data instead of SMB but this basically replicates the group/aggregate/count functionality they showed in the demo.

In [5]:
http_df.groupby('method', 'status_code').count().sort('count', ascending=False).show()

+-------+-----------+-------+
| method|status_code|  count|
+-------+-----------+-------+
|   HEAD|        404|1294022|
|    GET|        404| 429283|
|   POST|        200| 125638|
|    GET|        200|  88631|
|   POST|          0|  32918|
|    GET|        400|  29152|
|    GET|        303|  10858|
|    GET|        403|   8530|
|   POST|        404|   4277|
|    GET|        304|   3851|
|    GET|        302|   3250|
|    GET|          0|   2906|
|    GET|        401|   2159|
|OPTIONS|        200|   1897|
|   POST|        302|   1226|
|   HEAD|        503|   1010|
|   POST|        206|    869|
|    GET|        301|    642|
|   HEAD|          0|    606|
|    GET|        503|    550|
+-------+-----------+-------+
only showing top 20 rows



<div style="float: right; margin: 30px 0px 0px 0px"><img src="images/parquet.png" width="400px"></div>

# Convert my Zeek logs to Parquet files
Apache Parquet is a columnar storage format focused on performance. Here's we going to convert our Zeek/Zeek log to a Parquet file is one line of code. The conversion is super scalable since we're using spark distributed executors to do the conversions.

In [6]:
# Read in a Zeek CONN Log
data_path = '/Users/briford/data/bro/conn.log'  # Obviously you'll need to change this :)
conn_df = spark_it.create_dataframe(data_path)

In [None]:
# DataFrames can be saved as Parquet files, maintaining the schema information.
conn_df.write.parquet('conn.parquet', compression='gzip')

In [8]:
# Have Spark read in the Parquet File
conn_df = spark.read.parquet('conn.parquet')

<div style="float: right; margin: 20px 20px 20px 20px"><img src="images/compressed.jpeg" width="300px"></div>

# Parquet files are compressed
Here we see the first benefit of Parquet which stores data with compressed columnar format. There are several compression options available (including uncompressed).

## Original conn.log = 2.5 GB 
## conn.parquet = ~420MB

<div style="float: left; margin: 20px 20px 20px 20px"><img src="images/nuked_crop.jpg" width="150px"></div>


# Light it Up!
Now that we have our Parquet data loaded into Spark, we're going to demonstrate just a few simple Spark operations but obviously you now have the full power of the Death Star in your hands.

<div style="float: left; margin: 20px 0px 0px 50px"><img src="images/spark_sql.jpg" width="150px"></div>
<div style="float: left; margin: 0px 0px 0px 50px"><img src="images/mllib.png" width="150px"></div>

In [9]:
# Get information about the Spark DataFrame
num_rows = conn_df.count()
print("Number of Rows: {:d}".format(num_rows))
columns = conn_df.columns
print("Columns: {:s}".format(','.join(columns)))

Number of Rows: 22694356
Columns: ts,uid,id_orig_h,id_orig_p,id_resp_h,id_resp_p,proto,service,duration,orig_bytes,resp_bytes,conn_state,local_orig,missed_bytes,history,orig_pkts,orig_ip_bytes,resp_pkts,resp_ip_bytes,tunnel_parents


<div style="float: right; margin: 20px 20px 20px 20px"><img src="images/fast.jpg" width="350px"></div>

# Did we mention fast?
The query below was executed on 4 workers. The data contains over 22 million Zeek conn log entries and the time to complete was a **fraction of a second** running on my Mac Laptop :)

In [12]:
# Lets look at some 'service' breakdowns in our Zeek conn log
conn_df = conn_df.filter(conn_df['service'] != '-')
%timeit -r 1 -n 1 conn_df.groupby('proto','service').count().sort('count', ascending=False).show()   

+-----+--------+------+
|proto| service| count|
+-----+--------+------+
|  tcp|    http|445214|
|  udp|     dns|160559|
|  tcp|     ssl| 49017|
|  tcp|     ssh|  4778|
|  udp|    dhcp|  3052|
|  tcp|ftp-data|  2880|
|  tcp|     ftp|  2675|
|  tcp|     dns|   706|
|  tcp|    smtp|   194|
|  tcp|    pop3|     2|
+-----+--------+------+

544 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


## Replicate 'power user' functionality shown at https://youtu.be/ldrEadAQYTM?t=941
We'll do a 'pipeline' of operations similar to the demo
- First select just the columns we want (CUT)
- Next create a new column with an aggregation of other columns
- Last do the avg and sum of the total bytes

In [13]:
# First select just the columns we want (CUT)
cut_df = conn_df.select(['uid', 'orig_bytes', 'resp_bytes'])

# Next create a new column with an aggregation of other columns
cut_df = cut_df.withColumn('total_bytes', sum([cut_df['orig_bytes'], cut_df['resp_bytes']]))
cut_df.show(5)

# Last do the avg and sum of the total bytes
cut_df.select(F.avg(F.col('total_bytes')), F.sum(F.col('total_bytes'))).show()

+------------------+----------+----------+-----------+
|               uid|orig_bytes|resp_bytes|total_bytes|
+------------------+----------+----------+-----------+
|Cn3lP61UA4uyZ6QbO9|         0|         0|          0|
|Cbr8993YzQRCmNEYEj|         0|         0|          0|
|CIMBNv2q31wxrrar2i|         0|         0|          0|
| CWWPZTJRlk4vgaF89|         0|         0|          0|
| C7bMbs9ZbZOfKk2o7|     29737|     15395|      45132|
+------------------+----------+----------+-----------+
only showing top 5 rows

+-----------------+----------------+
| avg(total_bytes)|sum(total_bytes)|
+-----------------+----------------+
|68829.53709214335|     46052260189|
+-----------------+----------------+



# Okay now perhaps something a bit more useful
- First lets look at high traffic volumne hosts
- Then we'll drill down on services and ports with aggregation based on host, port, and service

In [14]:
# Lets looks at high volume orig_bytes hosts
conn_df.groupby('id_orig_h') \
    .agg(F.sum('resp_bytes'), F.sum('orig_bytes')) \
    .sort('sum(orig_bytes)', ascending=False).show(10)

+---------------+---------------+---------------+
|      id_orig_h|sum(resp_bytes)|sum(orig_bytes)|
+---------------+---------------+---------------+
| 192.168.202.88|        1427057|      491383902|
| 192.168.203.63|      199089308|      381871343|
|192.168.202.102|     1102077207|       83220349|
|192.168.202.110|    20757064973|       71199927|
| 192.168.202.79|      126189570|       56391990|
| 192.168.202.81|         295350|       42247132|
|192.168.202.118|       21311882|       20245778|
|192.168.202.140|    18726772783|       16533636|
| 192.168.202.96|       12865378|       14671460|
|192.168.202.138|     3039254789|       14461799|
+---------------+---------------+---------------+
only showing top 10 rows



In [15]:
# Lets looks at high volume resp_bytes hosts
conn_df.groupby('id_orig_h') \
    .agg(F.sum('resp_bytes'), F.sum('orig_bytes')) \
    .sort('sum(resp_bytes)', ascending=False).show(10)

+---------------+---------------+---------------+
|      id_orig_h|sum(resp_bytes)|sum(orig_bytes)|
+---------------+---------------+---------------+
|192.168.202.110|    20757064973|       71199927|
|192.168.202.140|    18726772783|       16533636|
|192.168.202.138|     3039254789|       14461799|
|192.168.202.102|     1102077207|       83220349|
| 192.168.204.60|      222120000|          27301|
| 192.168.203.63|      199089308|      381871343|
| 192.168.202.80|      143503940|         937179|
| 192.168.202.79|      126189570|       56391990|
|192.168.202.125|       84322637|        4307083|
| 192.168.202.76|       67647471|        4342070|
+---------------+---------------+---------------+
only showing top 10 rows



In [16]:
# First filter out unknown services
conn_df = conn_df.filter(conn_df['service'] != '-')

# More meaningful aggregation based on IP, port, and service
conn_df.groupby('id_orig_h', 'id_resp_p', 'service') \
    .agg(F.sum('resp_bytes'), F.sum('orig_bytes')) \
    .sort('sum(orig_bytes)', ascending=False).show(10)

+---------------+---------+-------+---------------+---------------+
|      id_orig_h|id_resp_p|service|sum(resp_bytes)|sum(orig_bytes)|
+---------------+---------+-------+---------------+---------------+
| 192.168.202.88|       22|    ssh|        1416968|      491259422|
| 192.168.203.63|       80|   http|      199083266|      381506783|
|192.168.202.102|       80|   http|     1096048524|       80956460|
| 192.168.202.79|       80|   http|      121620629|       54699732|
| 192.168.202.81|       22|    ssh|         294750|       42247132|
|192.168.202.110|       80|   http|    16614013448|       37952120|
|192.168.202.118|       80|   http|       20050632|       18731116|
|192.168.202.110|      443|    ssl|     2363800673|       17883212|
| 192.168.202.95|       22|    ssh|       19539221|       13947240|
| 192.168.202.96|       80|   http|       11374049|       11871726|
+---------------+---------+-------+---------------+---------------+
only showing top 10 rows



In [23]:
# Lets look at traffic from 192.168.202.88
high_df = conn_df.filter(conn_df['id_orig_h'] == '192.168.202.88')
high_df = high_df.select(['uid', 'id_orig_h', 'id_resp_h', 'id_resp_p', 'service', 'orig_bytes', 'resp_bytes'])
high_df.groupby('id_orig_h', 'id_resp_h', 'id_resp_p', 'service') \
    .agg(F.sum('resp_bytes'), F.sum('orig_bytes')) \
    .sort('sum(orig_bytes)', ascending=False).show(10)

+--------------+---------------+---------+-------+---------------+---------------+
|     id_orig_h|      id_resp_h|id_resp_p|service|sum(resp_bytes)|sum(orig_bytes)|
+--------------+---------------+---------+-------+---------------+---------------+
|192.168.202.88| 192.168.27.253|       22|    ssh|        1416968|      491259422|
|192.168.202.88|192.168.202.255|      137|    dns|              0|         112850|
|192.168.202.88| 192.168.206.44|       53|    dns|           6468|           6461|
|192.168.202.88|  192.168.202.1|       53|    dns|              0|           3662|
|192.168.202.88| 192.168.203.45|     8080|   http|              0|           1370|
|192.168.202.88|  192.168.207.4|       53|    dns|            321|            137|
|192.168.202.88|  192.168.202.1|       67|   dhcp|           3300|              0|
+--------------+---------------+---------+-------+---------------+---------------+



In [24]:
# Drill down on traffic going to 192.168.27.253
high_df.filter(high_df['id_resp_h'] == '192.168.27.253').show()

+------------------+--------------+--------------+---------+-------+----------+----------+
|               uid|     id_orig_h|     id_resp_h|id_resp_p|service|orig_bytes|resp_bytes|
+------------------+--------------+--------------+---------+-------+----------+----------+
|Cm18c03kPWYKeYkNvg|192.168.202.88|192.168.27.253|       22|    ssh|      1397|      1924|
| CTJ7E9TNub0Ko307l|192.168.202.88|192.168.27.253|       22|    ssh|      1253|      1764|
|CEMSUi3w61aTr92zl6|192.168.202.88|192.168.27.253|       22|    ssh| 245618677|    638068|
|CdLkTT3XEOX0gmHbGf|192.168.202.88|192.168.27.253|       22|    ssh|     13365|     82004|
|CPOwKp2B1PKhDsOBD5|192.168.202.88|192.168.27.253|       22|    ssh| 245617989|    637124|
|CU8JUQ2EJfH2r45Ar6|192.168.202.88|192.168.27.253|       22|    ssh|      6741|     56084|
+------------------+--------------+--------------+---------+-------+----------+----------+



<div style="float: right; margin: 50px 0px 0px 20px"><img src="images/deep_dive.jpeg" width="350px"></div>

# Lets go for a deeper dive
In this notebook we've explored how to load data into a Spark Dataframe and utilize Spark SQL commands. In the next notebook we'll take a deeper dive and investigate clustering using the Spark MLLib Module.
<div style="float: left; margin: 10px 10px -10px 10px"><img src="images/spark_sql.jpg" width="150px"></div>
<div style="float: left; margin: -10px 50px -10px 10px"><img src="images/mllib.png" width="150px"></div>

<br><br><br><br><br>
### Spark Modules
- Spark Streaming: processing real-time data streams
- Spark SQL, Datasets, and DataFrames: support for structured data and relational queries
- MLlib: built-in machine learning library
- GraphX: Spark’s new API for graph processing

### Spark Clustering Notebook
- [Zeek Spark Clustering](https://nbviewer.jupyter.org/github/SuperCowPowers/zat/blob/master/notebooks/Spark_Clustering.ipynb)

