# Understanding User Behavior

- As a data scientist at a game development company, our latest mobile game has two events that I'm interested in tracking: `buy a
  sword` & `join guild`. 

- Each has metadata characterstic of such events (i.e., sword type, guild name,
  etc). 


### Tasks

- Instrument my API server to log events to Kafka

- Assemble a data pipeline to catch these events: use Spark streaming to filter
  select event types from Kafka, land them into HDFS/parquet to make them
  available for analysis using Presto

- Use Apache Bench to generate test data for my pipeline

- Produce an analytics report where I provide a description of my pipeline
  and some basic analysis of the events

Use a notebook to present my queries and findings. 

It is understood that events in this pipeline are _generated_ events which make
them hard to connect to _actual_ business decisions.  However, I would like to
demonstrate an ability to plumb this pipeline end-to-end, which
includes initially generating test data as well as submitting a notebook-based
report of at least simple event analytics.


## Linux Commands

### Bring up cluster and ensure that it is running
``` 
docker-compose up -d
docker-compose ps 
docker ps -a
```

### Create the topic
```
docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```

### Start up the flask server 

```
docker-compose exec mids env FLASK_APP=/w205/project-3-hgchoi/game_api.py flask run --host 0.0.0.0
``` 


### Add individual apache bench commands 
```
docker-compose exec mids curl http://localhost:5000/
docker-compose exec mids curl http://localhost:5000/purchase_a_sword
docker-compose exec mids curl http://localhost:5000/buy_a_sword
docker-compose exec mids curl http://localhost:5000/join_guild
```

### Add apache bench commands for the new events added
``` 
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/buy_a_sword
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_guild


docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/buy_a_sword
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_guild
```

### Read the topic with kafkacat to make sure everything is on it

```
docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning -e
```

### Hive command to create schema on read
```
docker-compose exec cloudera hive

create external table if not exists default.sword_purchases (Accept string, Host string, User_Agent string, event_type string, timestamp string) stored as parquet location '/tmp/sword_purchases'  tblproperties ("parquet.compress"="SNAPPY");
```

### Run Presto
```
docker-compose exec presto presto --server presto:8080 --catalog hive --schema default

select * from sword_purchases;

select count(*) from sword_purchases; 
```

### Shutdown the cluster
```
docker-compose down
docker-compose ps
docker ps -a

```


## Running Spark Jobs

In [1]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf

In [2]:
@udf('string')
def munge_event(event_as_json):
    event = json.loads(event_as_json)
    event['Host'] = "moe"
    event['Cache-Control'] = "no-cache"
    return json.dumps(event)

In [3]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [4]:
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2020-12-08 03:15:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2020-12-08 03:15:...|            0|
|null|[7B 22 41 63 63 6...|events|        0|     2|2020-12-08 03:15:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2020-12-08 03:15:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     4|2020-12-08 03:15:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     5|2020-12-08 03:15:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     6|2020-12-08 03:15:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     7|2020-12-08 03:15:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0

In [5]:
munged_events = raw_events \
    .select(raw_events.value.cast('string').alias('raw'),
            raw_events.timestamp.cast('string')) \
    .withColumn('munged', munge_event('raw'))


In [6]:
munged_events.show()

+--------------------+--------------------+--------------------+
|                 raw|           timestamp|              munged|
+--------------------+--------------------+--------------------+
|{"Host": "localho...|2020-12-08 03:15:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-12-08 03:15:...|{"Host": "moe", "...|
|{"Accept": "*/*",...|2020-12-08 03:15:...|{"Accept": "*/*",...|
|{"Host": "localho...|2020-12-08 03:15:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-08 03:15:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-08 03:15:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-08 03:15:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-08 03:15:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-08 03:15:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-08 03:15:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-08 03:15:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-08 03:15:...|{"Host": "moe", "...|
|{"Host": "user1.c...|202

In [7]:
extracted_events = munged_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.munged))) \
        .toDF()

####  The game_api.py has varying schemas, which is why the following error occurs: 

In [8]:
extracted_events.show()

Py4JJavaError: An error occurred while calling o79.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, localhost, executor driver): java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 6 fields are required while 7 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:138)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:728)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:728)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:235)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 6 fields are required while 7 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:138)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:728)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:728)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:235)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [9]:
sword_purchases = extracted_events \
        .filter(extracted_events.event_type == 'purchase_sword')

####  The game_api.py has varying schemas, which is why the following error occurs: 

In [10]:
sword_purchases.show()
    # sword_purchases \
        # .write \
        # .mode("overwrite") \
        # .parquet("/tmp/sword_purchases")

Py4JJavaError: An error occurred while calling o124.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, localhost, executor driver): java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 6 fields are required while 7 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:138)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:728)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:728)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 6 fields are required while 7 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:138)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:728)
	at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:728)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
default_hits = extracted_events \
        .filter(extracted_events.event_type == 'default')

####  The game_api.py has varying schemas, which is why the following error occurs: 

In [11]:
default_hits.show()
    # default_hits \
        # .write \
        # .mode("overwrite") \
        # .parquet("/tmp/default_hits")

NameError: name 'default_hits' is not defined

## Flask-Kafka-Spark-Hadoop-Presto Part I

### Code from filtered_writes.py

In [12]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf


In [13]:
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [14]:
 raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [15]:
purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

In [16]:
extracted_purchase_events = purchase_events \
    .rdd \
    .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
    .toDF()
extracted_purchase_events.printSchema()
extracted_purchase_events.show()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_type: string (nullable = true)
 |-- timestamp: string (nullable = true)

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|   localhost:5000|    curl/7.47.0|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheB

In [17]:
extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')

### Pyspark code

In [18]:
purchases = spark.read.parquet('/tmp/purchases')


In [19]:
purchases.show()


+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|   localhost:5000|    curl/7.47.0|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|us

In [20]:
purchases.registerTempTable('purchases')


In [21]:
purchases_by_example2 = spark.sql("select * from purchases where Host = 'user1.comcast.com'")


In [22]:
purchases_by_example2.show()


+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-08 03:15:...|
|   */*|us

In [23]:
df = purchases_by_example2.toPandas()


In [24]:
df.describe()

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_type,timestamp
count,330,330,330,330,330,330
unique,1,1,1,1,1,330
top,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,knights,2020-12-08 03:17:46.693
freq,330,330,330,330,330,1


### Simple analytics using Spark SQL on the Spark Dataframe in Memory

##### 1) How many purchases were made by the host user1.comcast.com?

In [25]:
host_comcast_count = spark.sql("select count(Host) from purchases where Host = 'user1.comcast.com'")
host_comcast_count.show()

+-----------+
|count(Host)|
+-----------+
|        330|
+-----------+



##### 2) How many purchases were made by the host user2.att.com?

In [26]:
host_att_count = spark.sql("select count(Host) from purchases where Host = 'user2.att.com'")
host_att_count.show()

+-----------+
|count(Host)|
+-----------+
|         10|
+-----------+



##### 3) How many purchases were made at the time "2020-12-03 21:05:11.215"? 

In [27]:
timestamp = spark.sql("select * from purchases where timestamp = '2020-12-03 21:05:11.215' ").show()

+------+----+----------+----------+----------+---------+
|Accept|Host|User-Agent|event_type|sword_type|timestamp|
+------+----+----------+----------+----------+---------+
+------+----+----------+----------+----------+---------+



### Flask-Kafka-Spark-Hadoop-Presto Part II


In [28]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType


In [29]:
def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])


In [30]:
@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [31]:
 raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

In [32]:
sword_purchases = raw_events \
        .filter(is_sword_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_sword_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')


In [33]:
sink = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
        .option("path", "/tmp/sword_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()

In [34]:
sink.stop()
