# SIEM Data Exploration with Spark

## Data Formatting
<hr>

### Data Source
Data used in this example is from a market leading SIEM

### File Names
Individual CSV files are converted from CSV to Parquet files (see `architecture.pdf` for more info) then saved by hour with the name format `YYYY-MM-DD-HH`

### Field Names
Field names match from the header information from the original CSV

## Config Parameters
<hr>
Set these variables to connect to your HDFS cluster

In [None]:
# HDFS config parameters
hdfsNameNode = "10.0.0.1"
hdfsPort = "8020"

## Import Spark Libraries
<hr>

In [None]:
# Import libraries for PySpark/SparkSQL
from pyspark import SQLContext
from pyspark.sql.functions import *
# Create a SQLContext to use for SQL queries
sq = SQLContext(sc)ß

## Example 1
<hr>

### Network communication lookup, from source subnet to multiple destinations

#### SQL Example: 
```WHERE sourceAddress CONTAINS "55.54.53." AND  ( ( destinationAddress = "10.0.0.50" )  OR  ( destinationAddress = "10.0.0.51" )  OR  ( destinationAddress = "10.0.0.52" ) )```

In [None]:
%%time
### One Day
data1 = sq.read.parquet("hdfs://"+hdfsNameNode+":"+hdfsPort+"/data/2016-06-01*")
pdf1 = data1.filter(data1.sourceAddress.startswith("55.54.53.")) \
    .filter("destinationAddress = '10.0.0.50' OR destinationAddress = '10.0.0.51' OR destinationAddress = '10.0.0.52'") \
    .toPandas()

In [None]:
### Number of results
len(pdf1)

In [None]:
### Display the first 10 results
pdf1.head(10)

In [None]:
%%time
### One Week
data2 = sq.read.parquet("hdfs://"+hdfsNameNode+":"+hdfsPort+"/data/2016-06-0[1-7]*")
pdf2 = data2.filter(data2.sourceAddress.startswith("55.54.53.")) \
    .filter("destinationAddress = '10.0.0.50' OR destinationAddress = '10.0.0.51' OR destinationAddress = '10.0.0.52'") \
    .toPandas()

In [None]:
### Number of results
len(pdf2)

In [None]:
### Display the first 10 results
pdf2.head(10)

## Example 2
<hr>

### Account failed logon attempts lookup, using startswith keyword

#### SQL Example:
```WHERE destinationUserName startswith "ads." AND categoryOutcome = "/Failure"```

In [None]:
%%time
### One Day
data5 = sq.read.parquet("hdfs://"+hdfsNameNode+":"+hdfsPort+"/data/2016-06-01*")
pdf5 = data5.filter(data5.destinationUserName.startswith("ads.")) \
    .filter(data5.categoryOutcome == "/Failure") \
    .toPandas()

In [None]:
### Number of results
len(pdf5)

In [None]:
### Display the first 10 results
pdf5.head(10)

In [None]:
%%time
### One Week
data6 = sq.read.parquet("hdfs://"+hdfsNameNode+":"+hdfsPort+"/data/2016-06-0[1-7]*")
pdf6 = data6.filter(data6.destinationUserName.startswith("ads.")) \
    .filter(data6.categoryOutcome == "/Failure") \
    .toPandas()

In [None]:
### Number of results
len(pdf6)

In [None]:
### Display the first 10 results
pdf6.head(10)

## Example 3
<hr>

### Malware infection lookup, particular keyword in message field

#### SQL Example:
```WHERE deviceVendor="Symantec" AND message contains "exe"```

In [None]:
%%time
### One Day
data3 = sq.read.parquet("hdfs://"+hdfsNameNode+":"+hdfsPort+"/data/2016-06-01*")
pdf3 = data3.filter(data3.deviceVendor == "Symantec") \
    .filter(data3.message.like("%exe%")) \
    .toPandas()

In [None]:
### Number of results
len(pdf3)

In [None]:
### Display the first 10 results
pdf3.head(10)

In [None]:
%%time
### One Week
data4 = sq.read.parquet("hdfs://"+hdfsNameNode+":"+hdfsPort+"/data/2016-06-0[1-7]*")
pdf4 = data4.filter(data4.deviceVendor == "Symantec") \
    .filter(data4.message.like("%exe%")) \
    .toPandas()

In [None]:
### Number of results
len(pdf4)

In [None]:
### Display the first 10 results
pdf4.head(10)