# Single Event with Complex Analysis

## Part Three (Streaming Analysis)

#### Prerequisites
This notebook is designed to work with a Stroom server process running on `localhost`, into which data from `EventGen` application has been ingested and indexed in the manner described in `stroom-analytic-demo`.

You must set the environmental variable `STROOM_API_KEY` to the API token associated with a suitably privileged Stroom user account before starting the Jupyter notebook server process.

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, coalesce, unix_timestamp,lit,to_timestamp,hour,date_format,date_trunc,window
from pyspark.ml.feature import OneHotEncoderEstimator,VectorAssembler,StringIndexer
from pyspark.ml.classification import LogisticRegression,LogisticRegressionModel
from pyspark.ml import Pipeline,PipelineModel
from IPython.display import display
import time,os

#### Schema Discovery
It is necessary to specify the structure of the JSON data arriving on the topic.  This structure can be determined at runtime.

As the same format of data is also available via an indexed search using the `stroom-spark-datasource`, one way to determine the JSON schema is by interrogating the data held in the `Sample Index` Stroom index.

The specified pipeline is a Stroom Search Extraction Pipeline that uses the stroom:json XSLT function to create a JSON representation of the entire event.  This field is called "Json" by default but the name of the field that contains the JSON representation can (optionally) be changed with the parameter jsonField.

In this manner, all data is returned as a single JSON structure within the field **json**

In [2]:
spark = SparkSession \
    .builder \
    .appName("MyTestApp") \
    .getOrCreate()

schemaDf = spark.read.format('stroom.spark.datasource.StroomDataSource').load(
        token=os.environ['STROOM_API_KEY'],host='localhost',protocol='http',
        uri='api/stroom-index/v2',traceLevel="0",
        index='32dfd401-ee11-49b9-84c9-88c3d3f68dc2',
        pipeline='13143179-b494-4146-ac4b-9a6010cada89',
        maxResults='50000').filter((col('idxEventTime') > '2018-01-01T00:00:00.000Z')
            & (col('idxEventTime') < '2018-01-02T00:00:00.000Z'))

print ('Using ', schemaDf.count(), ' records for schema discovery.')
json_schema = spark.read.json(schemaDf.rdd.map(lambda row: row.json)).schema

json_schema

Using  2851  records for schema discovery.


StructType(List(StructField(EventDetail,StructType(List(StructField(Authenticate,StructType(List(StructField(Action,StringType,true),StructField(Outcome,StructType(List(StructField(Permitted,StringType,true),StructField(Reason,StringType,true),StructField(Success,StringType,true))),true),StructField(User,StructType(List(StructField(Id,StringType,true))),true))),true),StructField(Process,StructType(List(StructField(Action,StringType,true),StructField(Command,StringType,true),StructField(Type,StringType,true))),true),StructField(TypeId,StringType,true))),true),StructField(EventId,StringType,true),StructField(EventSource,StructType(List(StructField(Device,StructType(List(StructField(HostName,StringType,true))),true),StructField(Generator,StringType,true),StructField(System,StructType(List(StructField(Environment,StringType,true),StructField(Name,StringType,true))),true),StructField(User,StructType(List(StructField(Id,StringType,true))),true))),true),StructField(EventTime,StructType(List(S

## Spark Dataframe for streaming analysis
Create a Spark DF driven via the Kafka topic

### Kafka Headers
These are not supported until Spark v3.0, however previous versions do not baulk when the necessary option is provided.

In [3]:
#includeHeaders isn't supported until Spark Version > 3.0
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "ANALYTIC-DEMO-UEBA") \
    .option("startingoffsets", "latest")\
    .option("includeHeaders", "true")\
    .load()

## Function for each batch of data
Although Spark will automatically create aggregations (in this case the hourly counts), it is still necessary to define a function that assesses these and if necessary creates an alert to send to Stroom.

In [4]:
def process_batch(df, epoch_id):
    df.filter((col('count') > col('prediction') * 2) & 
             (col('count') > 10)).show()
  

# Streaming Analysis
This analytic is based on code used within the second notebook of this series (Part2-MLTraining).  Hourly counts can be created using Spark Structured Streaming aggregations over time windows. 

### Two ways to work with windows
It is possible to create the windowed counts using window functions using the dataframe function `over` or aggregate functions (grouped by timestamp).

The aggregate function approach follows.

The window definition used is `window ("timestamp", "1 hour")`, but it is possible to use sliding windows, e.g. using `window("timestamp", "1 hour", "10 minutes")` will generate 6 sliding (overlapping) windows in each hour period.  This could allow an analytic to report more rapidly (i.e. not having to wait until an hour has elapsed), but it makes multiple alerts for the same data more likely, and therefore some deduplication of alerts would need to be performed downstream.

### Kafka Headers
Although Spark v3.0 supports Kafka Headers, there is currently no documentation relating to this topic.  It appears that a new column `headers` is available, which can be cast to `MapType(String,String)`, and then accessed as per any Python dict. 

In [8]:
#col('headers') should be available on the DataFrame (Spark > 3.0)
#Code will be something similar to the line below to be added to the DataFrame definition
#withColumn('headerMap', col('headers').cast('MapType(String,String)')).\
wideDf = df.withColumn('json',col('value').cast('string')).\
    withColumn('evt', from_json(col('json'), json_schema)).\
    withColumn ('timestamp', to_timestamp(col('evt.EventTime.TimeCreated')).cast("timestamp")).\
    withColumn('operation', col('evt.EventDetail.TypeId')).\
    filter(col('operation') == 'Authentication Failure' ).\
    withColumn('streamid', col('evt.StreamId')).\
    withColumn('eventid', col('evt.EventId')).\
    dropDuplicates(["eventid", "streamid"]).\
    groupBy(window ("timestamp", "1 hour"),
            date_format('timestamp', 'EEEE').alias("day"), 
            hour("timestamp").alias("hour"),).count()



pipelineModel = PipelineModel.load("models/inputVecPipelineModel")

featuresDf = pipelineModel.transform(wideDf)

vectorAssembler = VectorAssembler(inputCols = ['hourVec','dayVec'], outputCol = 'features')

fullDf = vectorAssembler.transform(featuresDf).select('window','features','count')

lrModel = LogisticRegressionModel.load("models/logisticRegressionAuthFailuresModel")

lrDf = lrModel.transform (fullDf)

#outputMode can be append, complete or update
query = lrDf.writeStream.\
    outputMode("update").\
    foreachBatch (process_batch).\
    start()

query.awaitTermination()

 


+------+--------+-----+-------------+-----------+----------+
|window|features|count|rawPrediction|probability|prediction|
+------+--------+-----+-------------+-----------+----------+
+------+--------+-----+-------------+-----------+----------+

+------+--------+-----+-------------+-----------+----------+
|window|features|count|rawPrediction|probability|prediction|
+------+--------+-----+-------------+-----------+----------+
+------+--------+-----+-------------+-----------+----------+

+------+--------+-----+-------------+-----------+----------+
|window|features|count|rawPrediction|probability|prediction|
+------+--------+-----+-------------+-----------+----------+
+------+--------+-----+-------------+-----------+----------+

+------+--------+-----+-------------+-----------+----------+
|window|features|count|rawPrediction|probability|prediction|
+------+--------+-----+-------------+-----------+----------+
+------+--------+-----+-------------+-----------+----------+

+------+--------+---

KeyboardInterrupt: 