# Stroom Spark DataSource - Basic

### How to obtain a DataFrame for further processing or analysis within Spark.

#### Prerequisites
This notebook is designed to work with a Stroom server process running on `localhost`, into which the example data has been loaded (e.g. by running the gradle task `setupSampleData`).

You must set the environmental variable `STROOM_API_KEY` to the API token associated with a suitably privileged Stroom user account before starting the Jupyter notebook server process.

#### Setup
Import standard utility classes/functions, including JSON handling XSLT.

In [124]:
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col
from IPython.display import display
import time,os

#### Basic Usage
Create the most basic kind of DataFrame to pull data from Stroom, and view the first records.

All the fields in the index can be searched using the field name **idxFieldName**, where FieldName is the name of the field within the Stroom index.

The specified pipeline is a Stroom Search Extraction Pipeline that uses the stroom:json XSLT function to create a JSON representation of the entire event.  This field is called "Json" by default but the name of the field that contains the JSON representation can (optionally) be changed with the parameter jsonField.

In this manner, all data is returned as a single JSON structure within the field **json**

In [125]:
basicSchema = StructType([StructField("streamId", StringType(), True,
    metadata={"get": "StreamId"}),
    StructField("eventId", StringType(), True,
    metadata={"get": "EventId"})])

basicDf = spark.read.format('stroom.spark.datasource.StroomDataSource').load(
    token=os.environ['STROOM_API_KEY'],
    timestampField='EventTime', 
    host='localhost', 
    protocol='http', 
    index='57a35b9a-083c-4a93-a813-fc3ddfe1ff44', 
    pipeline='13143179-b494-4146-ac4b-9a6010cada89', 
    schema=basicSchema)

basicDf.groupBy(basicDf['streamId']).count().sort(['count'], ascending=False).show()

+--------+-----+
|streamId|count|
+--------+-----+
|      86|  200|
|      99|  114|
|      98|  111|
|      90|   57|
|      89|   56|
|      57|   37|
|      96|   20|
|     110|    7|
|     162|    7|
|     122|    7|
|     158|    6|
|     143|    6|
|     153|    6|
|     121|    6|
|     129|    6|
|     103|    6|
|     144|    5|
|     130|    5|
|     150|    5|
|     136|    5|
+--------+-----+
only showing top 20 rows



In [126]:
basicDf = spark.read.format('stroom.spark.datasource.StroomDataSource').load(
        token=os.environ['STROOM_API_KEY'],host='localhost',protocol='http',
        timestampField='EventTime',
        index='57a35b9a-083c-4a93-a813-fc3ddfe1ff44',pipeline='13143179-b494-4146-ac4b-9a6010cada89')
basicDf.count()

696

In [127]:
display(basicDf.limit(5).toPandas().head()) 

Unnamed: 0,json,idxStreamId,idxEventId,idxFeed,idxFeed (Keyword),idxAction,idxEventTime,idxUserId,idxSystem,idxEnvironment,...,idxannotation:CreatedOn,idxannotation:CreatedBy,idxannotation:UpdatedOn,idxannotation:UpdatedBy,idxannotation:Title,idxannotation:Subject,idxannotation:Status,idxannotation:AssignedTo,idxannotation:Comment,idxannotation:History
0,"[{""StreamId"":""57"",""EventId"":""2"",""EventTime"":{""...",,,,,,,,,,...,,,,,,,,,,
1,"[{""StreamId"":""71"",""EventId"":""1"",""EventTime"":{""...",,,,,,,,,,...,,,,,,,,,,
2,"[{""StreamId"":""85"",""EventId"":""1"",""EventTime"":{""...",,,,,,,,,,...,,,,,,,,,,
3,"[{""StreamId"":""85"",""EventId"":""2"",""EventTime"":{""...",,,,,,,,,,...,,,,,,,,,,
4,"[{""StreamId"":""85"",""EventId"":""3"",""EventTime"":{""...",,,,,,,,,,...,,,,,,,,,,


#### Working with JSON

The JSON is much easier to work with when parsed into a structure by Spark.

First it is necessary to perform some schema discovery.  Here the schema is made available as a variable called json_schema which can be inspected in the normal way.

In [128]:
json_schema = spark.read.json(basicDf.rdd.map(lambda row: row.json)).schema

json_schema

StructType(List(StructField(EventDetail,StructType(List(StructField(Authenticate,StructType(List(StructField(Action,StringType,true),StructField(LogonType,StringType,true),StructField(User,StructType(List(StructField(Id,StringType,true))),true))),true),StructField(Create,StructType(List(StructField(Object,StructType(List(StructField(Id,StringType,true),StructField(Name,StringType,true),StructField(Type,StringType,true))),true))),true),StructField(Description,StringType,true),StructField(Search,StructType(List(StructField(Query,StructType(List(StructField(Raw,StringType,true))),true),StructField(ResultPage,StructType(List(StructField(From,StringType,true),StructField(To,StringType,true))),true))),true),StructField(TypeId,StringType,true),StructField(Unknown,StructType(List(StructField(Data,ArrayType(StructType(List(StructField(Name,StringType,true),StructField(Value,StringType,true))),true),true))),true),StructField(View,StructType(List(StructField(Object,StructType(List(StructField(Dat

#### Working with JSON (cont)

Now a new DataFrame can be constructed using that schema as an additional (complex) column called evt.

For further ease of use, alias columns can be created from this. Scroll right to see these columns

In [129]:
wideDf = basicDf.withColumn('evt', from_json(col('json'), json_schema)).\
    withColumn ('timestamp', col('evt.EventTime.TimeCreated')).\
    withColumn ('user', col('evt.EventSource.User.Id')).\
    withColumn('operation', col('evt.EventDetail.TypeId'))

In [130]:
wideDf.count()

696

In [131]:
display(wideDf.limit(5).toPandas().head())

Unnamed: 0,json,idxStreamId,idxEventId,idxFeed,idxFeed (Keyword),idxAction,idxEventTime,idxUserId,idxSystem,idxEnvironment,...,idxannotation:Title,idxannotation:Subject,idxannotation:Status,idxannotation:AssignedTo,idxannotation:Comment,idxannotation:History,evt,timestamp,user,operation
0,"[{""StreamId"":""71"",""EventId"":""1"",""EventTime"":{""...",,,,,,,,,,...,,,,,,,"((None, None, None, None, GET, None, (None, Ro...",2021-06-14T12:44:57.729Z,,GET
1,"[{""StreamId"":""57"",""EventId"":""2"",""EventTime"":{""...",,,,,,,,,,...,,,,,,,"((None, None, None, None, POST, None, (None, R...",2021-06-14T12:45:05.614Z,"CN=A Test Client (testuser),O=Test Organizatio...",POST
2,"[{""StreamId"":""57"",""EventId"":""3"",""EventTime"":{""...",,,,,,,,,,...,,,,,,,"((None, None, None, None, POST, None, (None, R...",2021-06-14T12:45:10.507Z,"CN=A Test Client (testuser),O=Test Organizatio...",POST
3,"[{""StreamId"":""57"",""EventId"":""4"",""EventTime"":{""...",,,,,,,,,,...,,,,,,,"((None, None, None, None, POST, None, (None, R...",2021-06-14T12:45:10.593Z,"CN=A Test Client (testuser),O=Test Organizatio...",POST
4,"[{""StreamId"":""57"",""EventId"":""5"",""EventTime"":{""...",,,,,,,,,,...,,,,,,,"((None, None, None, None, POST, None, (None, R...",2021-06-14T12:45:10.691Z,"CN=A Test Client (testuser),O=Test Organizatio...",POST


#### Using Search Extraction Pipeline Created Fields
All fields created by the Stroom Search Extraction Pipeline can be accessed by specifying additional fields with associated metadata that name the field in Stroom using the "get" key.

In [132]:
mySchema = StructType([StructField("user", StringType(), True, 
                                   metadata={"get": "UserId"})])
dfWithExtractedFields = spark.read.format('stroom.spark.datasource.StroomDataSource').load(
        token=os.environ['STROOM_API_KEY'],host='localhost',protocol='http',
        uri='api/stroom-index/v2',
        index='57a35b9a-083c-4a93-a813-fc3ddfe1ff44',pipeline='e5ecdf93-d433-45ac-b14a-1f77f16ae4f7',
        schema=mySchema)
display(dfWithExtractedFields.limit(5).toPandas().head())

Unnamed: 0,user,json,idxStreamId,idxEventId,idxFeed,idxFeed (Keyword),idxAction,idxEventTime,idxUserId,idxSystem,...,idxannotation:CreatedOn,idxannotation:CreatedBy,idxannotation:UpdatedOn,idxannotation:UpdatedBy,idxannotation:Title,idxannotation:Subject,idxannotation:Status,idxannotation:AssignedTo,idxannotation:Comment,idxannotation:History
0,,,,,,,,,,,...,,,,,,,,,,
1,"CN=A Test Client (testuser),O=Test Organizatio...",,,,,,,,,,...,,,,,,,,,,
2,"CN=A Test Client (testuser),O=Test Organizatio...",,,,,,,,,,...,,,,,,,,,,
3,"CN=A Test Client (testuser),O=Test Organizatio...",,,,,,,,,,...,,,,,,,,,,
4,"CN=A Test Client (testuser),O=Test Organizatio...",,,,,,,,,,...,,,,,,,,,,


#### Using XPath Expression fields

If you are running a version of Stroom that supports `XPathOutputFilter`, it is possible to use XPaths to specify the data that you wish to extract into the columns of the Spark DataFrame.

If an Extraction Pipeline that uses an XPathExtractionOutputFilter is used Stroom will extract XPaths for fields, by specifying "get" key metadata.

In [133]:
xpathSchema = StructType([StructField("myUser", StringType(), True, 
                                   metadata={"get": "EventSource/User/Id"}), 
                       StructField("myOperation", StringType(), True, 
                                   metadata={"get": "EventDetail/TypeId"})])

In [134]:
dfWithXPaths = spark.read.format('stroom.spark.datasource.StroomDataSource').load(
        token=os.environ['STROOM_API_KEY'],host='localhost',protocol='http',
        uri='api/stroom-index/v2',
        index='57a35b9a-083c-4a93-a813-fc3ddfe1ff44',pipeline='26ed1000-255e-4182-b69b-00266be891ee',
        schema=xpathSchema)
dfWithXPaths.count()

696

In [135]:
display(dfWithXPaths.limit(5).toPandas().head()) 

Unnamed: 0,myUser,myOperation,json,idxStreamId,idxEventId,idxFeed,idxFeed (Keyword),idxAction,idxEventTime,idxUserId,...,idxannotation:CreatedOn,idxannotation:CreatedBy,idxannotation:UpdatedOn,idxannotation:UpdatedBy,idxannotation:Title,idxannotation:Subject,idxannotation:Status,idxannotation:AssignedTo,idxannotation:Comment,idxannotation:History
0,"CN=A Test Client (testuser),O=Test Organizatio...",POST,,,,,,,,,...,,,,,,,,,,
1,,GET,,,,,,,,,...,,,,,,,,,,
2,"CN=A Test Client (testuser),O=Test Organizatio...",POST,,,,,,,,,...,,,,,,,,,,
3,"CN=A Test Client (testuser),O=Test Organizatio...",POST,,,,,,,,,...,,,,,,,,,,
4,"CN=A Test Client (testuser),O=Test Organizatio...",POST,,,,,,,,,...,,,,,,,,,,


## Comparison
Three / four different ways to access data, they can all be capable of reaching the same point (eventually).  The best approach will vary depending on the situation.

In order to get the same count from each approach, it is necessary to prevent Stroom from adding to the index.  This can be achieved by disabling Data Processor job (use Jobs screen from Stroom UI).


In [136]:
start = time.time()
indexCount = basicDf.filter(basicDf['idxUserId'] == 'admin').count()
end = time.time()
indexTime = end - start

In [137]:
start = time.time()
sparkCount = wideDf.filter(wideDf['user'] == 'admin').count()
end = time.time()
sparkTime = end - start

In [138]:
start = time.time()
stroomExtractionCount = dfWithExtractedFields.filter(dfWithExtractedFields['user'] == 'admin').count()
end = time.time()
stroomExtractionTime = end - start

In [139]:
start = time.time()
stroomXPathCount = dfWithXPaths.filter(dfWithXPaths['myUser'] == 'admin').count()
end = time.time()
stroomXPathTime = end - start

In [140]:
if ((indexCount == sparkCount) and (indexCount == stroomExtractionCount) and (indexCount == stroomXPathCount)):
    print ("All counts are the same (as expected):", sparkCount)
else:
    print ("Counts Differ! - Was Stroom loading/indexing data during execution?")
    print ("Stroom using indexes: ", indexCount)
    print ("Stroom using xpath: ", stroomXPathCount)
    print ("Stroom using extracted field: ", stroomExtractionCount)
    print ("Spark: ", sparkCount)

print ()
print ("Times as follows")
print ("Stroom using indexes: ", indexTime)
print ("Stroom using xpath: ", stroomXPathTime)
print ("Stroom using extracted field: ", stroomExtractionTime)
print ("Spark: ", sparkTime)

Counts Differ! - Was Stroom loading/indexing data during execution?
Stroom using indexes:  226
Stroom using xpath:  225
Stroom using extracted field:  226
Spark:  225

Times as follows
Stroom using indexes:  1.0856473445892334
Stroom using xpath:  1.9339103698730469
Stroom using extracted field:  1.935575246810913
Spark:  2.068563222885132
