### Import Statements

In [1]:
import json
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

### CLI Commands for Data Pipeline

**Start up the cluster**

`docker-compose up -d`

**Create kafka topic 'events'**

`docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181`

**Bring up flask server**

`docker-compose exec mids env FLASK_APP=/w205/project-3-philpapapolyzos/game_api.py flask run --host 0.0.0.0`

**Shutdown cluster**

`docker-compose down`

### Munging and extracting events 

In [2]:
@udf('string')
def munge_event(event_as_json):
    event = json.loads(event_as_json)
    event['Host'] = "moe"
    event['Cache-Control'] = "no-cache"
    return json.dumps(event)

In [3]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [4]:
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2020-12-10 12:07:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2020-12-10 12:07:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2020-12-10 12:07:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2020-12-10 12:07:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     4|2020-12-10 12:07:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     5|2020-12-10 12:07:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     6|2020-12-10 12:07:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     7|2020-12-10 12:07:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0

In [5]:
munged_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .withColumn('munged', munge_event('raw'))

In [6]:
munged_events.show()

+--------------------+--------------------+--------------------+
|                 raw|           timestamp|              munged|
+--------------------+--------------------+--------------------+
|{"Host": "user1.c...|2020-12-10 12:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 12:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 12:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 12:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 12:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 12:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 12:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 12:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 12:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 12:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 12:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 12:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|202

In [7]:
extracted_events = munged_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.munged))) \
        .toDF()

In [8]:
extracted_events.show()

Py4JJavaError: An error occurred while calling o79.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, localhost, executor driver): java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 6 fields are required while 7 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:138)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:728)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:728)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:235)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 6 fields are required while 7 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:138)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:728)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:728)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:235)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [9]:
sword_purchases = extracted_events \
        .filter(extracted_events.event_type == 'purchase_sword')

In [10]:
sword_purchases.show()

Py4JJavaError: An error occurred while calling o124.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, localhost, executor driver): java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 6 fields are required while 7 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:138)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:728)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:728)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 6 fields are required while 7 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:138)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:728)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:728)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


### Generating events with Apache Bench

**Generating events as user 1**

`docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/buy_a_sword
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_guild`

**Generating events as user 2**

`docker-compose exec mids ab -n 10 -H "Host: user2.att.com” http://localhost:5000/
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/buy_a_sword
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_guild`


**User Defined Functions for event filtering**

In [11]:
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [12]:
@udf('boolean')
def is_guild(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'join_guild':
        return True
    return False

**Get raw events from kafka**

In [13]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

**Filter using UDFs and extract to dataframe format**

In [14]:
guild_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_guild('raw'))

In [15]:
purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

In [16]:
extracted_guild_events = guild_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

In [17]:
extracted_purchase_events = purchase_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

**Show schemas for events**

In [18]:
extracted_purchase_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [19]:
extracted_guild_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- guild_name: string (nullable = true)
 |-- timestamp: string (nullable = true)



### Write to parquet format

In [20]:
extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')

In [21]:
extracted_guild_events.write.mode('overwrite').parquet('/tmp/guilds')

### PySpark SQL Queries

**Read from HDFS**

In [22]:
guilds = spark.read.parquet('/tmp/guilds')

In [23]:
purchases = spark.read.parquet('/tmp/purchases')

**Register temp tables**

In [24]:
guilds.registerTempTable('guilds')

In [25]:
purchases.registerTempTable('purchases')

### Do SQL queries on data

**Purchases Data**

In [69]:
purchase_query = spark.sql("select Host, sword_type, timestamp from purchases")

In [70]:
purchase_query.show()

+-----------------+----------+--------------------+
|             Host|sword_type|           timestamp|
+-----------------+----------+--------------------+
|user1.comcast.com|   cutlass|2020-12-10 12:07:...|
|user1.comcast.com|   cutlass|2020-12-10 12:07:...|
|user1.comcast.com|   cutlass|2020-12-10 12:07:...|
|user1.comcast.com|   cutlass|2020-12-10 12:07:...|
|user1.comcast.com|   cutlass|2020-12-10 12:07:...|
|user1.comcast.com|   cutlass|2020-12-10 12:07:...|
|user1.comcast.com|   cutlass|2020-12-10 12:07:...|
|user1.comcast.com|   cutlass|2020-12-10 12:07:...|
|user1.comcast.com|   cutlass|2020-12-10 12:07:...|
|user1.comcast.com|   cutlass|2020-12-10 12:07:...|
|    user2.att.com|   cutlass|2020-12-10 12:09:...|
|    user2.att.com|   cutlass|2020-12-10 12:09:...|
|    user2.att.com|   cutlass|2020-12-10 12:09:...|
|    user2.att.com|   cutlass|2020-12-10 12:09:...|
|    user2.att.com|   cutlass|2020-12-10 12:09:...|
|    user2.att.com|   cutlass|2020-12-10 12:09:...|
|    user2.a

In [68]:
import pandas as pd

In [83]:
purchase_query_df = purchase_query.toPandas()

An interesting insight would be to see how many sword purchases per minute there are in the game.

In [85]:
purchase_query_df['minute'] = pd.to_datetime(purchase_query_df['timestamp']).dt.minute
purchase_query_df.head()

Unnamed: 0,Host,sword_type,timestamp,minute
0,user1.comcast.com,cutlass,2020-12-10 12:07:35.52,7
1,user1.comcast.com,cutlass,2020-12-10 12:07:35.524,7
2,user1.comcast.com,cutlass,2020-12-10 12:07:35.528,7
3,user1.comcast.com,cutlass,2020-12-10 12:07:35.532,7
4,user1.comcast.com,cutlass,2020-12-10 12:07:35.537,7


In [97]:
purchase_query_df[['timestamp','minute']].groupby(['minute']).agg(['count'])

Unnamed: 0_level_0,timestamp
Unnamed: 0_level_1,count
minute,Unnamed: 1_level_2
7,10
9,10
11,40
12,110
13,110
14,100
15,110
16,110
17,100
18,90


From the dataframe above, it looks like for the first minutes, there were around 10 purchases per minute but then they started increasing, reaching 110 on the 12th minute of the hour and then falling to 90 on the 18th minute.

In [99]:
purchase_query_df.set_index(["Host", "timestamp"]).count(level="Host")

Unnamed: 0_level_0,sword_type,minute
Host,Unnamed: 1_level_1,Unnamed: 2_level_1
user1.comcast.com,780,780
user2.att.com,10,10


Furthermore, it looks like these requests came from 2 users. User 1 purchased 780 swords and user 2 purchased only 10.

** Guild Data**

In [100]:
guild_query = spark.sql('select Host, guild_name, timestamp from guilds')

In [101]:
guild_query.show()

+-----------------+--------------+--------------------+
|             Host|    guild_name|           timestamp|
+-----------------+--------------+--------------------+
|user1.comcast.com|the liberators|2020-12-10 12:08:...|
|user1.comcast.com|the liberators|2020-12-10 12:08:...|
|user1.comcast.com|the liberators|2020-12-10 12:08:...|
|user1.comcast.com|the liberators|2020-12-10 12:08:...|
|user1.comcast.com|the liberators|2020-12-10 12:08:...|
|user1.comcast.com|the liberators|2020-12-10 12:08:...|
|user1.comcast.com|the liberators|2020-12-10 12:08:...|
|user1.comcast.com|the liberators|2020-12-10 12:08:...|
|user1.comcast.com|the liberators|2020-12-10 12:08:...|
|user1.comcast.com|the liberators|2020-12-10 12:08:...|
|    user2.att.com|the liberators|2020-12-10 12:09:...|
|    user2.att.com|the liberators|2020-12-10 12:09:...|
|    user2.att.com|the liberators|2020-12-10 12:09:...|
|    user2.att.com|the liberators|2020-12-10 12:09:...|
|    user2.att.com|the liberators|2020-12-10 12:

In [102]:
guild_query_df = guild_query.toPandas()

In [106]:
guild_query_df['hour'] = pd.to_datetime(guild_query_df['timestamp']).dt.hour
guild_query_df.head()

Unnamed: 0,Host,guild_name,timestamp,hour
0,user1.comcast.com,the liberators,2020-12-10 12:08:04.397,12
1,user1.comcast.com,the liberators,2020-12-10 12:08:04.402,12
2,user1.comcast.com,the liberators,2020-12-10 12:08:04.406,12
3,user1.comcast.com,the liberators,2020-12-10 12:08:04.411,12
4,user1.comcast.com,the liberators,2020-12-10 12:08:04.415,12


In [112]:
guild_query_df[['guild_name','hour']].groupby(['hour']).agg(['count'])

Unnamed: 0_level_0,guild_name
Unnamed: 0_level_1,count
hour,Unnamed: 1_level_2
12,310


In [114]:
guild_query_df[['guild_name','hour']].groupby(['guild_name']).agg(['count'])

Unnamed: 0_level_0,hour
Unnamed: 0_level_1,count
guild_name,Unnamed: 1_level_2
the liberators,310


There were 310 requests to join a guild at 12 o'clock and there is only one guild, "The Liberators".

### **Infinite Loops for Apache Bench**

**Loop 1: User1, `purchase_a_sword`**

`while true; do docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword; sleep 5; done
`
________________________
**Loop 2: User2, `join_guild`**

`while true; do docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_guild; sleep 10; done`

In [123]:
def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])

In [124]:
def guild_join_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- guild_name: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("guild_name", StringType(), True),
    ])

In [125]:
@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [126]:
@udf('boolean')
def is_guild_join(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'join_guild':
        return True
    return False

In [127]:
raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

In [128]:
sword_purchases = raw_events \
        .filter(is_sword_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_sword_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

In [129]:
guild_joins = raw_events \
        .filter(is_guild_join(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          guild_join_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

In [130]:
sink = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
        .option("path", "/tmp/sword_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()

In [131]:
sink_guild = guild_joins \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_guild_joins") \
        .option("path", "/tmp/guild_joins") \
        .trigger(processingTime="15 seconds") \
        .start()

In [132]:
sink.stop()
sink_guild.stop()

### Hive command to create external table

`docker-compose exec cloudera hive`

`create external table if not exists default.sword_purchases (Accept string, Host string, User_Agent string, event_type string, timestamp string) stored as parquet location '/tmp/purchases'  tblproperties ("parquet.compress"="SNAPPY");`

`create external table if not exists default.guild_joins (Accept string, Host string, User_Agent string, event_type string, guild_name string, timestamp string) stored as parquet location '/tmp/guilds'  tblproperties ("parquet.compress"="SNAPPY");`

`exit;`
________________________
### Presto Query for External Tables

**To bring up Presto**

`docker-compose exec presto presto --server presto:8080 --catalog hive --schema default`
________________________
**To show tables**

`show tables;`

      Table  
`-----------------`

 `guild_joins  `   
` sword_purchases` 
`(2 rows)`

________________________
**To describe tables**

`describe guild_joins`

`
   Column   |  Type   | Comment 
------------+---------+---------
 accept     | varchar |         
 host       | varchar |         
 user_agent | varchar |         
 event_type | varchar |         
 guild_name | varchar |         
 timestamp  | varchar |         
(6 rows)`

`describe sword_purchases`

`
   Column   |  Type   | Comment 
------------+---------+---------
 accept     | varchar |         
 host       | varchar |         
 user_agent | varchar |         
 event_type | varchar |         
 timestamp  | varchar |         
(5 rows)`
________________________
**SQL**

`select guild_name from guild_joins group by guild_name;`

   `guild_name`   
`----------------`

` the liberators` 
`(1 row)`