# KENL: Checking Spark and Graphframe Integrations
----------------------------------------------------------------------------
## Goals:
* Test if Jupyter can talk to Spark & Graphframes
* Test if Spark & Graphframes can pull data from ES
* Show the basics of the KENL integrations with advanced analytics libraries

## Check the Spark Context via the variable sc 

In [1]:
spark

## Create a Spark RDD from Elasticsearch Index (logs-endpoint-winevent-sysmon-*)

In [2]:
es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf={
        "es.resource" : "logs-endpoint-winevent-sysmon-*/doc",
        "es.nodes" : "kenl-elasticsearch"
    })
es_rdd.first()

('611341183',
 {'@timestamp': '2018-05-02T21:57:12.196Z',
  '@version': '1',
  'action': 'networkconnect',
  'beat': {'hostname': 'DESKTOP-WARDOG',
   'name': 'DESKTOP-WARDOG',
   'version': '6.1.2'},
  'event_id': 3,
  'geoip': {},
  'host_dst_name': 'DESKTOP-WARDOG',
  'host_name': 'DESKTOP-WARDOG',
  'host_src_name': 'DESKTOP-WARDOG',
  'ip_dst': '0:0:0:0:0:0:0:1',
  'ip_dst_isipv6': 'true',
  'ip_src': '0:0:0:0:0:0:0:1',
  'ip_src_isipv6': 'true',
  'level': 'Information',
  'log_name': 'Microsoft-Windows-Sysmon/Operational',
  'network_initiated': True,
  'network_protocol': 'udp',
  'opcode': 'Info',
  'port_dst_number': 50917,
  'port_src_number': 50917,
  'process_guid': 'A98268C1-9E9A-5AE6-0000-00102B6B1300',
  'process_id': 3880,
  'process_name': 'svchost.exe',
  'process_path': 'C:\\Windows\\System32\\svchost.exe',
  'provider_guid': '5770385F-C22A-43E0-BF4C-06F5698FFBD9',
  'record_number': '13712475',
  'source_name': 'Microsoft-Windows-Sysmon',
  'tags': ('_geoip_lookup_

## Create a Spark RDD from Elasticsearch Index (logs-endpoint-winevent-security-*)

In [3]:
es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf={ 
        "es.resource" : "logs-endpoint-winevent-security-*/doc",
        "es.nodes" : "kenl-elasticsearch"
    })
es_rdd.first()

('972328025',
 {'@timestamp': '2018-05-03T07:03:11.309Z',
  '@version': '1',
  'beat': {'hostname': 'DESKTOP-WARDOG',
   'name': 'DESKTOP-WARDOG',
   'version': '6.1.2'},
  'event_data': {},
  'event_id': 4689,
  'host_name': 'DESKTOP-WARDOG',
  'keywords': ('Audit Success',),
  'level': 'Information',
  'log_name': 'Security',
  'message': 'A process has exited.\n\nSubject:\n\tSecurity ID:\t\tS-1-5-20\n\tAccount Name:\t\tDESKTOP-WARDOG$\n\tAccount Domain:\t\tWORKGROUP\n\tLogon ID:\t\t0x3E4\n\nProcess Information:\n\tProcess ID:\t0x10e0\n\tProcess Name:\tC:\\Windows\\System32\\msdtc.exe\n\tExit Status:\t0x0',
  'opcode': 'Info',
  'process_id': 4,
  'process_name': 'msdtc.exe',
  'process_parent_id': 0,
  'process_path': 'C:\\Windows\\System32\\msdtc.exe',
  'process_status': '0x0',
  'provider_guid': '{54849625-5478-4994-A5BA-3E3B0328C30D}',
  'record_number': '34554',
  'source_name': 'Microsoft-Windows-Security-Auditing',
  'task': 'Process Termination',
  'thread_id': 4020,
  'type

## Import Graphframes package

In [4]:
from graphframes import *

In [5]:
# Create a Vertex DataFrame with unique ID column "id"
v = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
], ["id", "name", "age"])
# Create an Edge DataFrame with "src" and "dst" columns
e = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
], ["src", "dst", "relationship"])
# Create a GraphFrame
from graphframes import *
g = GraphFrame(v, e)

# Query: Get in-degree of each vertex.
g.inDegrees.show()

# Query: Count the number of "follow" connections in the graph.
g.edges.filter("relationship = 'follow'").count()

+---+--------+
| id|inDegree|
+---+--------+
|  c|       1|
|  b|       2|
+---+--------+



2

## Create a basic SparkSession

In [6]:
spark = SparkSession \
    .builder \
    .appName("KENL") \
    .config("es.read.field.as.array.include", "tags") \
    .config("es.nodes","kenl-elasticsearch:9200") \
    .getOrCreate()

## Spark SQL Basic Query (logs-endpoint-winevent-security-* as source)

In [7]:
df = spark.read.format("org.elasticsearch.spark.sql").load("logs-endpoint-winevent-security-*/doc")

In [8]:
df.printSchema()

root
 |-- @date_new_time: timestamp (nullable = true)
 |-- @date_previous_time: timestamp (nullable = true)
 |-- @timestamp: timestamp (nullable = true)
 |-- @version: string (nullable = true)
 |-- activity_id: string (nullable = true)
 |-- beat: struct (nullable = true)
 |    |-- hostname: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- version: string (nullable = true)
 |-- event_data: struct (nullable = true)
 |    |-- AdvancedOptions: string (nullable = true)
 |    |-- ConfigAccessPolicy: string (nullable = true)
 |    |-- DisableIntegrityChecks: string (nullable = true)
 |    |-- FlightSigning: string (nullable = true)
 |    |-- HypervisorDebug: string (nullable = true)
 |    |-- HypervisorLaunchType: string (nullable = true)
 |    |-- HypervisorLoadOptions: string (nullable = true)
 |    |-- KernelDebug: string (nullable = true)
 |    |-- LoadOptions: string (nullable = true)
 |    |-- PackageName: string (nullable = true)
 |    |-- RemoteEventLogging:

In [9]:
df.select("task").show()

+--------------------+
|                task|
+--------------------+
|    Process Creation|
| Process Termination|
|Other Policy Chan...|
|    Process Creation|
|    Process Creation|
|               Logon|
|               Logon|
|               Logon|
|               Logon|
|    Process Creation|
|    Process Creation|
|    Process Creation|
|    Process Creation|
|    Process Creation|
|    Process Creation|
|    Process Creation|
|    Process Creation|
|    Process Creation|
|    Process Creation|
|    Process Creation|
+--------------------+
only showing top 20 rows

