# **Introduction to Elasticsearch and Spark SQL via Pyspark**
----------------------------------------------------------------------------
## Goals:
* Practice Spark SQL via PySpark skills
* Ensure JupyterLab Server, Spark Cluster & Elasticsearch are communicating
* Learn to read from HELK elasticsearch indices

## Import SparkSession Class

In [1]:
from pyspark.sql import SparkSession

## Create a SparkSession instance
* Define a **spark** variable
* Pass values to the **appName** and **master** functions
    * For the master function, we are going to use the HELK's Spark Master container (helk-spark-master)
* This time add the **config()** function to set Elasticsearch information needed to read from it

[**config(key=None, value=None, conf=None)**](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession.Builder.config)
* Sets a config option.
* Options set using this method are automatically propagated to both SparkConf and SparkSession’s own configuration.

In [2]:
spark = SparkSession.builder \
    .appName("HELK Reader") \
    .master("spark://helk-spark-master:7077") \
    .config("es.read.field.as.array.include", "tags") \
    .config("es.nodes","helk-elasticsearch:9200") \
    .config("es.net.http.auth.user","elastic") \
    .config("es.net.http.auth.pass","elasticpassword") \
    .enableHiveSupport() \
    .getOrCreate()
    #PLEASE REMEMBER!!!!
    #If you are using elastic TRIAL license, then you need the es.net.http.auth.pass value
    #If you are using elastic BASIC license, then you can remove the es.net.http.auth.pass value

## Check the SparkSession variable

In [3]:
spark

## Read data from the HELK Elasticsearch via Spark SQL

### Using the Dataframe API to access Elasticsearch index (Elasticsearch-Sysmon Index)

* As we know, Spark SQL is a Spark module for structured data processing, and provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.
* Elasticsearch becomes a native source for Spark SQL so that data can be indexed and queried from Spark SQL transparently
* Spark SQL works with structured data - in other words, all entries are expected to have the same structure (same number of fields, of the same type and name)
* Using unstructured data (documents with different structures) is not supported and will cause problems.
* Through the **org.elasticsearch.spark.sql** package, esDF methods are available on the SQLContext API

Reference: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html

In [4]:
es_reader = (spark
          .read
          .format("org.elasticsearch.spark.sql")
          .option("inferSchema", "true")
)

[**load(path=None, format=None, schema=None, **options)**](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.load)
* Loads data from a data source and returns it as a :class`DataFrame`.

In [5]:
%%time
sysmon_df = es_reader.load("logs-endpoint-winevent-sysmon-*/doc")

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 1.35 s


## Filter Operation

Filter our the data to only show certain data fields and events with the action **"processcreate"** which is Sysmon Event ID 1

In [6]:
processcreate_df = sysmon_df.filter(sysmon_df.action == "processcreate")

## Select Operation

You can select a few columns from your dataframe with the **select** method.

In [7]:
processcreate_df = processcreate_df.select("process_guid","process_parent_name","process_parent_command_line","process_name","process_command_line","action","@timestamp")

In [8]:
%%time
processcreate_df.show(10)

+--------------------+-------------------+---------------------------+-------------+--------------------+-------------+--------------------+
|        process_guid|process_parent_name|process_parent_command_line| process_name|process_command_line|       action|          @timestamp|
+--------------------+-------------------+---------------------------+-------------+--------------------+-------------+--------------------+
|1C9FDC81-9806-5C6...|            cmd.exe|       c:\windows\system...|  conhost.exe|\??\c:\windows\sy...|processcreate|2019-02-22 06:34:...|
|1C9FDC81-9806-5C6...|        svchost.exe|       c:\windows\system...|taskhostw.exe|taskhostw.exe ngc...|processcreate|2019-02-22 06:34:...|
|1C9FDC81-9807-5C6...|        svchost.exe|       c:\windows\system...| wsqmcons.exe|c:\windows\system...|processcreate|2019-02-22 06:34:...|
|1C9FDC81-9809-5C6...|       gpupdate.exe|       gpupdate.exe /tar...|  conhost.exe|\??\c:\windows\sy...|processcreate|2019-02-22 06:34:...|
|1C9FDC81-980

## Create Dataframes from the original Sysmon Dataframe

* Filter the original **sysmon_df** dataframe
* Select specific columns
* display results

### NetworkConnect Events

We are going to use the network events logged by Sysmon (Event ID 3)

In [9]:
networkconnect_df = sysmon_df.filter(sysmon_df.action == "networkconnect")

In [10]:
networkconnect_df = networkconnect_df.select("process_guid","dst_ip_addr","dst_port","dst_host_name","action","@timestamp")

In [11]:
networkconnect_df.show(10,truncate=False)

+------------------------------------+---------------+--------+-------------------------------+--------------+-----------------------+
|process_guid                        |dst_ip_addr    |dst_port|dst_host_name                  |action        |@timestamp             |
+------------------------------------+---------------+--------+-------------------------------+--------------+-----------------------+
|1C9FDC81-84E5-5C6D-0000-001060530400|239.255.255.250|1900    |null                           |networkconnect|2019-02-22 06:34:47.078|
|1C9FDC81-84E5-5C6D-0000-001060530400|127.0.0.1      |56783   |desktop-lfd11qp.rivendell.local|networkconnect|2019-02-22 06:34:47.078|
|1C9FDC81-84E5-5C6D-0000-001060530400|null           |56781   |desktop-lfd11qp.rivendell.local|networkconnect|2019-02-22 06:34:47.484|
|1C9FDC81-84CA-5C6D-0000-0010262D0100|null           |53      |null                           |networkconnect|2019-02-22 06:34:49.839|
|1C9FDC81-84CA-5C6D-0000-0010262D0100|null           |5

### FileCreate Event

In [12]:
filecreate_df = sysmon_df.filter(sysmon_df.action == "filecreate")

In [13]:
filecreate_df = filecreate_df.select("process_guid","file_name","action","@timestamp")

In [14]:
%%time
filecreate_df.show(10,truncate=False)

+------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+-----------------------+
|process_guid                        |file_name                                                                                                                                                                            |action    |@timestamp             |
+------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+-----------------------+
|1C9FDC81-980B-5C6F-0000-00109B5CD100|c:\programdata\regid.1991-06.com.microsoft\regid.1991-06.com.microsoft_windows-10-pro.swidtag                                                                                        |filecreate|2

## Spark SQL JOINs & Sysmon Data

[**join(other, on=None, how=None)**](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.join)

Joins with another DataFrame, using the given join expression.

Parameters:	
* **other** – Right side of the join
* **on** – a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
* **how** – str, default inner. Must be one of: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.

### ProcessCreate -> NetworkCreate

In [15]:
process_network_df = processcreate_df.join(networkconnect_df, "process_guid", how="inner")

In [16]:
%%time
process_network_df.select("process_parent_name","process_name","dst_ip_addr").show(truncate=False)

+-------------------+----------------------+--------------+
|process_parent_name|process_name          |dst_ip_addr   |
+-------------------+----------------------+--------------+
|svchost.exe        |backgroundtaskhost.exe|204.79.197.200|
|svchost.exe        |backgroundtaskhost.exe|40.112.91.29  |
|svchost.exe        |backgroundtaskhost.exe|40.112.91.29  |
|svchost.exe        |backgroundtaskhost.exe|40.112.91.29  |
|svchost.exe        |backgroundtaskhost.exe|40.112.91.29  |
+-------------------+----------------------+--------------+

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 6.57 s


In [17]:
%%time
process_network_df.groupBy('process_parent_name').count().sort('count', ascending=False).show()

+-------------------+-----+
|process_parent_name|count|
+-------------------+-----+
|        svchost.exe|    5|
+-------------------+-----+

CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 7.38 s


In [18]:
%%time
(process_network_df
            .filter(process_network_df
            .process_parent_name=="svchost.exe")
            .select("process_parent_command_line","process_name","dst_ip_addr")
            .show(5,truncate=False)
)

+------------------------------------------------+----------------------+--------------+
|process_parent_command_line                     |process_name          |dst_ip_addr   |
+------------------------------------------------+----------------------+--------------+
|c:\windows\system32\svchost.exe -k dcomlaunch -p|backgroundtaskhost.exe|204.79.197.200|
|c:\windows\system32\svchost.exe -k dcomlaunch -p|backgroundtaskhost.exe|40.112.91.29  |
|c:\windows\system32\svchost.exe -k dcomlaunch -p|backgroundtaskhost.exe|40.112.91.29  |
|c:\windows\system32\svchost.exe -k dcomlaunch -p|backgroundtaskhost.exe|40.112.91.29  |
|c:\windows\system32\svchost.exe -k dcomlaunch -p|backgroundtaskhost.exe|40.112.91.29  |
+------------------------------------------------+----------------------+--------------+

CPU times: user 0 ns, sys: 12 ms, total: 12 ms
Wall time: 2.4 s


### ProcessCreate -> FileCreate

Let's focus now on the least frequent events

In [19]:
process_file_df = processcreate_df.join(filecreate_df, "process_guid", how="inner")

In [20]:
%%time
process_file_df.groupBy('process_parent_name').count().sort('count').show()

+-------------------+-----+
|process_parent_name|count|
+-------------------+-----+
|       services.exe|    2|
|        svchost.exe|   19|
+-------------------+-----+

CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 4.44 s
