# Project 3: Understanding User Behavior
## Alissa Stover
## Due April 14th, 2020

## Pipeline up to here

There are previous steps in the pipeline that exist before this step. This process exists within a Docker container. These prior processes use:

- Kafka to stream an "events" topic.
- Flask to instrument the API server
- Apache Bench to generate the test data analyzed below


## Spark Streaming 

In this section, I use Spark streaming to filter select event types from Kafka and land them into HDFS/parquet to make them available for analysis using Presto. 

In [1]:
# import modules and functions
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf

### Read in raw events

This section uses Spark to read in raw events from Kafka. 

In [2]:
# load raw events
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [3]:
# view raw events
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2020-04-13 01:25:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2020-04-13 01:25:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2020-04-13 01:25:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2020-04-13 01:25:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     4|2020-04-13 01:25:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     5|2020-04-13 01:25:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     6|2020-04-13 01:25:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     7|2020-04-13 01:25:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0

### Filter to select "purchase_sword" events

In this section, I filter the Kafka events such that I remove all but the purchase_sword events. 

In [4]:
# define predicate 
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [5]:
# extract purchase from raw events
purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

In [6]:
# view purchase events schema
purchase_events.printSchema()

root
 |-- raw: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [7]:
# view purchase events
purchase_events.show()

+--------------------+--------------------+
|                 raw|           timestamp|
+--------------------+--------------------+
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13

In [8]:
# lambda function
extracted_purchase_events = purchase_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()


In [9]:
# view extracted purchases schema
extracted_purchase_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [10]:
# view extracted purchases
extracted_purchase_events.show()

+------+-----------------+---------------+--------------+--------------------+
|Accept|             Host|     User-Agent|    event_type|           timestamp|
+------+-----------------+---------------+--------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_s

In [11]:
# write to parquet file
extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')

In [12]:
# read in from parquet file
purchases = spark.read.parquet('/tmp/purchases')

In [13]:
# view purchases
purchases.show()

+------+-----------------+---------------+--------------+--------------------+
|Accept|             Host|     User-Agent|    event_type|           timestamp|
+------+-----------------+---------------+--------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_s

In [14]:
# register temp table
purchases.registerTempTable('purchases')

In [15]:
# query with spark sql
purchases_by_example2 = spark.sql("select * from purchases where Host = 'user1.comcast.com'")

In [16]:
# view purchases
purchases_by_example2.show()

+------+-----------------+---------------+--------------+--------------------+
|Accept|             Host|     User-Agent|    event_type|           timestamp|
+------+-----------------+---------------+--------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_s

In [17]:
# convert to python pandas dataframe
df = purchases_by_example2.toPandas()

In [18]:
# view pandas dataframe
df

Unnamed: 0,Accept,Host,User-Agent,event_type,timestamp
0,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2020-04-13 01:25:25.149
1,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2020-04-13 01:25:25.153
2,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2020-04-13 01:25:25.156
3,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2020-04-13 01:25:25.158
4,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2020-04-13 01:25:25.162
5,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2020-04-13 01:25:25.167
6,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2020-04-13 01:25:25.172
7,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2020-04-13 01:25:25.179
8,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2020-04-13 01:25:25.183
9,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2020-04-13 01:25:25.186


### Analysis

In this section, I perform some basic analyses about the purchase_sword events. This code can be used to generate similar descriptions of other events generated using this same pipeline. 

In [19]:
# import pandas for easier analysis of file
import pandas as pd

In [20]:
# view descriptive stats 
df.describe()

Unnamed: 0,Accept,Host,User-Agent,event_type,timestamp
count,10,10,10,10,10
unique,1,1,1,1,10
top,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,2020-04-13 01:25:25.167
freq,10,10,10,10,1


#### How many unique users are represented in this dataframe? What are their host name(s)? 

In [21]:
df['Host'].unique()

array(['user1.comcast.com'], dtype=object)

#### How many unique event types are represented? What are they? 

In [22]:
df['event_type'].unique()

array(['purchase_sword'], dtype=object)

#### What program was used to generate this event?

In [23]:
df['User-Agent'].unique()

array(['ApacheBench/2.3'], dtype=object)

#### What day of the week did these events occur? 

In [24]:
# convert timestampe to datetime object
df['timestamp_datetime'] = pd.to_datetime(df['timestamp'])
# extract weekday name
df['timestamp_datetime'].dt.weekday_name.unique()

array(['Monday'], dtype=object)

#### What hour did these events occur? 

In [25]:
# extract hour
df['timestamp_datetime'].dt.hour.unique()

array([1])

#### What month did these events occur? 

In [26]:
df['timestamp_datetime'].dt.month.unique()

array([4])

### Filter to select "purchase_knife" events

In this section, I filter the Kafka events such that I remove all but the purchase_knife events. 

In [27]:
# define predicate 
@udf('boolean')
def is_purchase_knife(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_knife':
        return True
    return False

In [28]:
# extract purchase knife from raw events
purchase_knife_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase_knife('raw'))

In [29]:
purchase_knife_events.show()

+--------------------+--------------------+
|                 raw|           timestamp|
+--------------------+--------------------+
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13

In [30]:
# lambda function
extracted_purchase_knife_events = purchase_knife_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()


In [31]:
# view extracted purchases schema
extracted_purchase_knife_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- description: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [32]:
# view extracted purchases
extracted_purchase_knife_events.show()

+------+-----------------+---------------+----------------+--------------+--------------------+
|Accept|             Host|     User-Agent|     description|    event_type|           timestamp|
+------+-----------------+---------------+----------------+--------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|very sharp knife|purchase_knife|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|very sharp knife|purchase_knife|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|very sharp knife|purchase_knife|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|very sharp knife|purchase_knife|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|very sharp knife|purchase_knife|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|very sharp knife|purchase_knife|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|very sharp knife|purchase_knife|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.

In [33]:
# write to parquet file
extracted_purchase_knife_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchase_knives')

In [34]:
# read in from parquet file
purchase_knives = spark.read.parquet('/tmp/purchase_knives')

In [35]:
# view purchases of knives
purchase_knives.show()

+------+-----------------+---------------+----------------+--------------+--------------------+
|Accept|             Host|     User-Agent|     description|    event_type|           timestamp|
+------+-----------------+---------------+----------------+--------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|very sharp knife|purchase_knife|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|very sharp knife|purchase_knife|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|very sharp knife|purchase_knife|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|very sharp knife|purchase_knife|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|very sharp knife|purchase_knife|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|very sharp knife|purchase_knife|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|very sharp knife|purchase_knife|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.

In [36]:
# register temp table
purchase_knives.registerTempTable('purchase_knives')

In [37]:
# query with spark sql
purchase_knives = spark.sql("select * from purchase_knives")

In [38]:
# convert to python pandas dataframe
knives_df = purchase_knives.toPandas()

In [39]:
# view dataframe
knives_df.head()

Unnamed: 0,Accept,Host,User-Agent,description,event_type,timestamp
0,*/*,user1.comcast.com,ApacheBench/2.3,very sharp knife,purchase_knife,2020-04-13 01:25:33.184
1,*/*,user1.comcast.com,ApacheBench/2.3,very sharp knife,purchase_knife,2020-04-13 01:25:33.188
2,*/*,user1.comcast.com,ApacheBench/2.3,very sharp knife,purchase_knife,2020-04-13 01:25:33.192
3,*/*,user1.comcast.com,ApacheBench/2.3,very sharp knife,purchase_knife,2020-04-13 01:25:33.197
4,*/*,user1.comcast.com,ApacheBench/2.3,very sharp knife,purchase_knife,2020-04-13 01:25:33.2


### Analysis

In this section, I perform some basic analyses about the purchase_knife events. This code can be used to generate similar descriptions of other events generated using this same pipeline. 

In [40]:
# view descriptive stats 
knives_df.describe()

Unnamed: 0,Accept,Host,User-Agent,description,event_type,timestamp
count,20,20,20,20,20,20
unique,1,2,1,1,1,20
top,*/*,user1.comcast.com,ApacheBench/2.3,very sharp knife,purchase_knife,2020-04-13 01:25:37.021
freq,20,10,20,20,20,1


#### How many unique users are represented in this dataframe? What are their host name(s)? 

In [41]:
knives_df['Host'].unique()

array(['user1.comcast.com', 'user2.att.com'], dtype=object)

#### How many unique event types are represented? What are they? 

In [42]:
knives_df['event_type'].unique()

array(['purchase_knife'], dtype=object)

#### What program was used to generate this event?

In [43]:
knives_df['User-Agent'].unique()

array(['ApacheBench/2.3'], dtype=object)

#### What day of the week did these events occur? 

In [44]:
# convert timestampe to datetime object
knives_df['timestamp_datetime'] = pd.to_datetime(knives_df['timestamp'])
# extract weekday name
knives_df['timestamp_datetime'].dt.weekday_name.unique()

array(['Monday'], dtype=object)

#### What hour did these events occur? 

In [45]:
# extract hour
knives_df['timestamp_datetime'].dt.hour.unique()

array([1])

#### What month did these events occur? 

In [46]:
knives_df['timestamp_datetime'].dt.month.unique()

array([4])

#### What unique descriptions do we see of these knives?

In [47]:
knives_df['description'].unique()

array(['very sharp knife'], dtype=object)

### Filter to select any type of "declare" events

In this section, I filter the Kafka events such that I remove all but the events that involve a declaration. 

In [48]:
# define predicate 
@udf('boolean')
def is_declare(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'].startswith('declare'):
        return True
    return False

In [49]:
# extract declarations from raw events
declare_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_declare('raw'))
        
declare_events.show()

+--------------------+--------------------+
|                 raw|           timestamp|
+--------------------+--------------------+
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user1.c...|2020-04-13 01:25:...|
|{"Host": "user2.a...|2020-04-13 01:26:...|
|{"Host": "user2.a...|2020-04-13 01:26:...|
|{"Host": "user2.a...|2020-04-13 01:26:...|
|{"Host": "user2.a...|2020-04-13 01:26:...|
|{"Host": "user2.a...|2020-04-13 01:26:...|
|{"Host": "user2.a...|2020-04-13 01:26:...|
|{"Host": "user2.a...|2020-04-13 01:26:...|
|{"Host": "user2.a...|2020-04-13 01:26:...|
|{"Host": "user2.a...|2020-04-13 01:26:...|
|{"Host": "user2.a...|2020-04-13

In [50]:
# lambda function
extracted_declare_events = declare_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

# view schema
extracted_declare_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- description: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [51]:
# view extracted events
extracted_declare_events.show()

+------+-----------------+---------------+------------+--------------+--------------------+
|Accept|             Host|     User-Agent| description|    event_type|           timestamp|
+------+-----------------+---------------+------------+--------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|great leader|declare_fealty|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|great leader|declare_fealty|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|great leader|declare_fealty|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|great leader|declare_fealty|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|great leader|declare_fealty|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|great leader|declare_fealty|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|great leader|declare_fealty|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|great leader|declare_fealty|2020-04-13

In [52]:
# write to parquet file
extracted_declare_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/declare')
        
# read in from parquet file
declare = spark.read.parquet('/tmp/declare')

# view 
declare.show()

+------+-----------------+---------------+------------+--------------+--------------------+
|Accept|             Host|     User-Agent| description|    event_type|           timestamp|
+------+-----------------+---------------+------------+--------------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|great leader|declare_fealty|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|great leader|declare_fealty|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|great leader|declare_fealty|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|great leader|declare_fealty|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|great leader|declare_fealty|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|great leader|declare_fealty|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|great leader|declare_fealty|2020-04-13 01:25:...|
|   */*|user1.comcast.com|ApacheBench/2.3|great leader|declare_fealty|2020-04-13

In [53]:
# register temp table
declare.registerTempTable('declare')

# query with spark sql
declare = spark.sql("select * from declare")

In [54]:
# convert to python pandas dataframe
declare_df = declare.toPandas()

# view first rows
declare_df.head()

Unnamed: 0,Accept,Host,User-Agent,description,event_type,timestamp
0,*/*,user1.comcast.com,ApacheBench/2.3,great leader,declare_fealty,2020-04-13 01:25:57.512
1,*/*,user1.comcast.com,ApacheBench/2.3,great leader,declare_fealty,2020-04-13 01:25:57.517
2,*/*,user1.comcast.com,ApacheBench/2.3,great leader,declare_fealty,2020-04-13 01:25:57.52
3,*/*,user1.comcast.com,ApacheBench/2.3,great leader,declare_fealty,2020-04-13 01:25:57.524
4,*/*,user1.comcast.com,ApacheBench/2.3,great leader,declare_fealty,2020-04-13 01:25:57.529


In [56]:
# view last rows
declare_df.tail()

Unnamed: 0,Accept,Host,User-Agent,description,event_type,timestamp
35,*/*,user2.att.com,ApacheBench/2.3,bloody war,declare_war,2020-04-13 01:26:09.738
36,*/*,user2.att.com,ApacheBench/2.3,bloody war,declare_war,2020-04-13 01:26:09.741
37,*/*,user2.att.com,ApacheBench/2.3,bloody war,declare_war,2020-04-13 01:26:09.745
38,*/*,user2.att.com,ApacheBench/2.3,bloody war,declare_war,2020-04-13 01:26:09.754
39,*/*,user2.att.com,ApacheBench/2.3,bloody war,declare_war,2020-04-13 01:26:09.758


### Analysis

In this section, I generate one simple table of descriptive statistics. You can use code from the analysis sections above to dig deeper into the declare event data.  

In [55]:
# view descriptive stats 
declare_df.describe()

Unnamed: 0,Accept,Host,User-Agent,description,event_type,timestamp
count,40,40,40,40,40,40
unique,1,2,1,2,2,40
top,*/*,user1.comcast.com,ApacheBench/2.3,great leader,declare_war,2020-04-13 01:26:09.754
freq,40,20,40,20,20,1


### Other events 

The previous sections filtered events of the following types:

- purchase_sword
- purchase_knife
- declare_fealty
- declare_war

There are two additional event types that can occur with this API. You can adjust the code defining your predicate for filtering in order to pull these from the Kafka event log. 

- purchase_shield
- join_guild