# Understanding User Behavior
Author: Allison Schlissel

As a Data Scientist at a game development company, I am interested in better understanding two important game features: buying a sword and joining a guild. 

### Questions
1. How many distinct time stamps are there here?
2. What are the unique event types present?
3. How many sword sizes are there?

## Commands

### Week 12

#### change directories
cd w205/project-3-apschlissel/

#### make sure working directory is clean
ls -lh

#### check git branch & commits
git status

#### create branch & go to branch
git checkout -b assignment

#### go up a directory
cd ..

#### copy the yml
cp ~/w205/course-content/12-Querying-Data-II/docker-compose.yml .

#### bring up the container
docker-compose up -d

#### check everything looks good & no stray containers
docker ps -a <br>
docker network ls

#### create topic events
docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181

#### copy the querying data & edit file in python
cp ~/w205/course-content/12-Querying-Data-II/*.py .

#### flask server
docker-compose exec mids env FLASK_APP=/w205/project-3-apschlissel/game_api.py flask run --host 0.0.0.0

#### start spark
docker-compose exec spark bash

#### create symbolic link
ln -s /w205 w205

#### get the notebook going
docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark

#### copy ip address into link
http://34.83.94.83/:8888/?token=7cdf712791379617ad5c10876f941c9a413a13b1891f6aeb

#### create & run apache bench commands to buy a sword & purchase sword
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/

docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword

docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/

docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword

docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/buy_a_sword

docker-compose exec mids ab -n 10 -H "Host: user1.att.com" http://localhost:5000/buy_a_sword

docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_guild

docker-compose exec mids ab -n 10 -H "Host: user1.att.com" http://localhost:5000/join_guild

#### shut down cluster
docker-compose down

### Week 13

#### copy week 13 yml
cp ~/w205/course-content/13-Understanding-Data/docker-compose.yml .

#### check files/directories 
ls -lh

#### check docker containers
docker ps -a

#### bring up cluster
docker-compose up -d

#### check cluster is up
docker-compose ps
docker ps -a
docker network ls

#### create topic events
docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181

#### start up flask
docker-compose exec mids env FLASK_APP=/w205/project-3-apschlissel/game_api.py flask run --host 0.0.0.0

#### go into spark shell to create symbolic link
docker-compose exec spark bash
ln -s /w205 w205
exit

#### create jupyter notebook w/in pyspark kernel
docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark

#### copy url, get ip address, open into incognito tab
http://34.83.94.83:8888/?token=67d58e32a81b86cec7c7571dfa5d91f5aecf2eee887e7329

#### stream queue
while true; do docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword; sleep 5; done

#### run hive
docker-compose exec cloudera hive

#### create table, make sure to modify correctly
create external table if not exists default.sword_purchases (Accept string, Host string, `User-Agent` string, event_type string, sword_size string, timestamp string) stored as parquet location '/tmp/sword_purchases'  tblproperties ("parquet.compress"="SNAPPY");

#### print the table using hive
select * from sword_purchases limit 5;

#### exit hive
exit()

#### start up presto
docker-compose exec presto presto --server presto:8080 --catalog hive --schema default

#### show the tables
show tables;
select * from sword_purchases;
select host from sword_purchases;
show tables sword_purchases;
#desc=describe
desc sword_purchases;

#### sink stop
sink.stop()

#### tear down cluster
docker-compose down


#### business questions:
###### as mentioned in the slack channe & videol, the columns are jumbled. I chose to go ahead and use the wrongly named columns 
1) How many distinct time stamps are there here? <br>
select count(distinct(host)) from sword_purchases;
- 3790 unique time stamps at this point

2) What are the unique event types present? <br>
select distinct(sword_size) from sword_purchases;
- Only purchase sword

3) How many sword sizes are there? <br>
select distinct(timestamp) from sword_purchases;
- Only large

In [50]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

In [2]:
spark

In [22]:
sc

## Batch Mode
Week 12 Content <br>
Batching is a collection of data points that are grouped together at one point in time.

In [52]:
#create udf

@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [53]:
#create raw events df

raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [54]:
#print raw_events

raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2021-08-03 06:48:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2021-08-03 06:48:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2021-08-03 06:48:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2021-08-03 06:48:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     4|2021-08-03 06:48:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     5|2021-08-03 06:48:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     6|2021-08-03 06:48:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     7|2021-08-03 06:48:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0

In [25]:
#create df pruchase_events

purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

In [26]:
#extract purchase events to dataframe

extracted_purchase_events = purchase_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

In [27]:
#print df summary

extracted_purchase_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_size: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [28]:
#print some rows of extracted_purchase_events

extracted_purchase_events.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_size|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|us

In [29]:
#write purchases to parquet

extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')

In [30]:
#read purchases parquet

purchases = spark.read.parquet('/tmp/purchases')

In [31]:
#print purchases

purchases.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_size|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|us

In [32]:
#register purchases as a temporary table

purchases.registerTempTable('purchases')

In [33]:
#query using spark sql

purchases_by_example2 = spark.sql("select * from purchases where Host = 'user1.comcast.com'")

In [34]:
#print the result

purchases_by_example2.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_size|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-08-03 06:48:...|
|   */*|us

In [35]:
#export to pandas dataframe

df = purchases_by_example2.toPandas()

In [36]:
#dataframe summary

df.describe()

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_size,timestamp
count,5440,5440,5440,5440,5440,5440
unique,1,1,1,1,1,5440
top,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-08-03 07:00:28.013
freq,5440,5440,5440,5440,5440,1


## Streaming Mode
Week 13 content <br>
Streaming mode happens when the data continuously populate

In [42]:
#create event schema, add sword size

def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("sword_size", StringType(), True)
    ])

In [43]:
#rerun same udf as above

@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [47]:
#subscribe to topic, but special for streaming

raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

In [48]:
#sword purchase event

sword_purchases = raw_events \
    .filter(is_sword_purchase(raw_events.value.cast('string'))) \
    .select(raw_events.value.cast('string').alias('raw_event'),
            raw_events.timestamp.cast('string'),
            from_json(raw_events.value.cast('string'),
                      purchase_sword_event_schema()).alias('json')) \
    .select('raw_event', 'timestamp', 'json.*')

In [49]:
#sink calls start so we don't need sinkawaitTermination, will run in background until stop

sink = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
        .option("path", "/tmp/sword_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()

In [55]:
#stop the sink

sink.stop()