# Project 3: Understanding User Behavior

As a data scientist at my game development company, I have been interested in tracking the following events for my latest mobile game: 'buy a sword' and 'join guild'

Both events have metadata charateristics such as the sword type and guild name that will be logged as events to Kafka through the API server. In addition, I have assembled a data pipeline that will catch these events by using Spark streaming to filter select event types from Kafka and land them into HDFS/Parquet to make them available for analysis using Presto. I have utilized Apache Bench to generate test data for the data pipeline and have produced an analytics report below to provide a description of the pipeline along with some basic analysis of the events. The following analytics report presents my queries and findings on the generated events in order to produce business-related recommendations.

## The initial steps were taken prior to creating the analytics report on Jupyter Notebook:

### Startup the cluster:

- docker-compose up -d

### Check for stray containers and stray networks:

- docker-compose ps
- docker ps -a
- docker network ls

### Create the topic:

- docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181

### Copy in the game_api.py file:

- cp ~/w205/course-content/12-Querying-Data-II/game_api.py .

### Startup the flask server:

- docker-compose exec mids env FLASK_APP=/w205/project-3-azj0718/game_api.py flask run --host 0.0.0.0

### Create a Jupyter Notebook against a pySpark kernel by first creating the symbolic link so that the notebook is saved to the project 3 repo directory:

- docker-compose exec spark bash
- ln -s /w205 w205
- exit

### Spin up the Jupyter Notebook with my external IP through an incognito window:

- docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark

- Example of URL that was used for a given instance: http://34.145.115.201:8888/?token=e75b71c143a2902d6a8274f359785cedf58f1826e1516783


### Run the apache bench commands including the new events:

- docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/
- docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword
- docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/
- docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword
- docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/buy_a_sword
- docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_guild
- docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/buy_a_sword
- docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_guild

### Run bash shell infinite loop code:

- while true; do docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword; sleep 5; done

### Start up hive:

- docker-compose exec cloudera hive

### Create an external table with the following hive command to create an external table for the schema on read

- create external table if not exists default.sword_purchases (sword_size string, timestamp string, Accept string, Host string, `User-Agent` string, event_type string) stored as parquet location '/tmp/sword_purchases' tblproperties ("parquet.compress"="SNAPPY");

### Drop external hive table if needed:

- drop table default.sword_purchases;

### Startup Presto:

- docker-compose exec presto presto --server presto:8080 --catalog hive --schema default

### Query the external table using the schema on read created in Hive above through the following presto commands:

- desc sword_purchases;
- select * from sword_purchases limit 5;
- select count(*) from sword_purchases;
- select count(*) from sword_purchases where host = 'user1.comcast.com';

### Shutting down the cluster:

- docker-compose down

In [1]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

In [2]:
spark

In [3]:
sc

### Read json data from Kafka in batch mode:

In [4]:
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

### Create a spark dataframe containing the extracted json data:

In [5]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [6]:
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2021-07-27 21:43:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2021-07-27 21:43:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2021-07-27 21:43:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2021-07-27 21:43:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     4|2021-07-27 21:43:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     5|2021-07-27 21:43:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     6|2021-07-27 21:43:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     7|2021-07-27 21:43:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0

### Filter out only the purchase sword extracted events:

In [7]:
purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

In [8]:
purchase_events.show()

+--------------------+--------------------+
|                 raw|           timestamp|
+--------------------+--------------------+
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27 21:43:...|
|{"Host": "user1.c...|2021-07-27

In [9]:
extracted_purchase_events = purchase_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

In [10]:
extracted_purchase_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_size: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [11]:
extracted_purchase_events.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_size|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|us

### Write the purchase sword extracted events to parquet files stored in hdfs:

In [12]:
extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')

### Read the parquet files from hdfs into another spark dataframe:

In [13]:
purchases = spark.read.parquet('/tmp/purchases')

In [14]:
purchases.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_size|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|us

### Register the spark dataframe as a temporary table to allow in memory sql against the dataframe:

In [15]:
purchases.registerTempTable('purchases')

In [16]:
purchases_by_example2 = spark.sql("select * from purchases where Host = 'user1.comcast.com'")

In [17]:
purchases_by_example2.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_size|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-07-27 21:43:...|
|   */*|us

### Copy the dataframe to Pandas to demonstrate mixing MMP code with serial code:

In [18]:
df = purchases_by_example2.toPandas()

In [19]:
df

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_size,timestamp
0,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-07-27 21:43:45.568
1,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-07-27 21:43:45.571
2,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-07-27 21:43:45.572
3,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-07-27 21:43:45.582
4,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-07-27 21:43:45.586
5,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-07-27 21:43:45.59
6,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-07-27 21:43:45.593
7,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-07-27 21:43:45.596
8,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-07-27 21:43:45.599
9,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-07-27 21:43:45.601


In [20]:
df.describe()

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_size,timestamp
count,200,200,200,200,200,200
unique,1,1,1,1,1,200
top,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-07-27 21:45:08.109
freq,200,200,200,200,200,1


### Read and submit live, steady stream of "purchase sword" events data on Kafka and write to Hadoop HDFS in parquet format:

In [21]:
def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("sword_size", StringType(), True),
    ])

In [22]:
@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [23]:
#Different version of raw events for streaming mode that subscribes to the topic without a beginning and end point

raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

In [24]:
sword_purchases = raw_events \
        .filter(is_sword_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_sword_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

In [25]:
sink = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
        .option("path", "/tmp/sword_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()

In [None]:
#sink.stop()

### Basic Business Questions:

##### Question 1: What are the column names in the sword_purchases table?

##### Answer 1: After running the 'desc sword_purchases' command, we could see that the column names are sword_size, timestamp, accept, host, user-agent, and event_type.

```mysql
   Column   |  Type   | Comment 
------------+---------+---------
 sword_size | varchar |         
 timestamp  | varchar |         
 accept     | varchar |         
 host       | varchar |         
 user-agent | varchar |         
 event_type | varchar |         
(6 rows)
```

##### Question 2: What is the count of sword purchases that has a host from 'user1.comcast.com'?

##### Answer 2: After running the 'select count(*) from sword_purchases where host = 'user1.comcast.com';' command, we could see that the count would be 3270 when I ran the command but more sword_purchases would be continuously added unless we stopped the bash shell infinite loop code.

```mysql
 _col0 
-------
  3270 
(1 row)
```

##### Question 3: What are the first 5 hosts that purchased a sword?

##### Answer 3: After running the 'select host from sword_purchases limit 5;' command, we could see that "user1.comcast.com" showed up as the first 5 hosts that purchases a sword.

```mysql
       host        
-------------------
 user1.comcast.com 
 user1.comcast.com 
 user1.comcast.com 
 user1.comcast.com 
 user1.comcast.com 
(5 rows)
```

##### Question 4: What is the count of event types that purchased a sword?

##### Answer 4: After running the 'select count(*) from sword_purchases where event_type = 'purchase_sword';' command, we could see that there was a total count of 5420 but more swords would be continuously purchased unless we stopped the bash shell infinite loop code.

```mysql
 _col0 
-------
  5420 
(1 row)
```

### Summary:

So as you can see from the analysis, we have started tracking the 'buy a sword' and 'join guild' events along with the metadata characteristics for our latest mobile game. We were able to assemble a data pipeline that caught these events using spark streaming to filter the select event types from Kafka, land them into HDFS/Parquet format to make them available for analysis using Presto and used Apache Bench to generate test data for the pipeline. From the basic questions above, we were able to find out that our external sword_purchases table does indeed have 6 columns (sword_size, timestamp, accept, host, user-agent, and event_type) and have been able to find out that there were a count of 3270 from the sword_purchases external table that had a host from user1.comcast.comd along with a count of 5420 from the sword_purchases external table that had an event_type of purchase_sword. These counts were obviously increased continuously due to the following bash shell infinite loop code that was used during the time of analysis: 
- while true; do docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword; sleep 5; done

We hope that the analytics report provides some basic valuable findings on the generated events to produce some business-related recommendations.