# Understanding User Behavior For Mobile Gaming

Author: David Trinidad  
MIDS W205 Data Engineering  
Project 3  


### Part I: Summary

The intention of this project is to demonstrate a basic pipeline for collecting user metadata within the mobile-gaming space. We will step through the process where data from a Flask web server will publish into Kafka where the messages will be ingested into Spark, and finish off by saving the data into a Hadoop file system (HDFS) where basic analytical quiries can be conducted to answer business questions. 

**Introduction**

As a Data Scientist of a gaming company, we take the scenario of tracking two events; "purchasing a sword" and "joining a guild". We assume these events are generated by a web interface of some sort. Below are the following tasks that will be highlighted during the step by step implementation process in section 3. 

**Tasks**  
- 1. Instrument API server to log events to Kafka: The web server will be used to log the events where users buy a sword or join guild as json messages. 
- 2. Assemble a data pipeline to catch these events: use Spark streaming to filter select event types from Kafka, land them into HDFS/parquet to make them available for analysis using Presto.  
- 3. Use Apache Bench to generate test data for your pipeline.  
- 4. Produce an analytics report providing a description of your pipeline and some basic analysis of the events. Explaining the pipeline is key for this project!  

**Files**

- **Report.md**: This file is the writeup with step-by-step command annotations.
- **docker-compose.yml**: Contains the cluster configuration
- **mobileGame_api.py**: Python API for the Web Server.
- **separate_events.py**: Python spark scripts to separate game event messages.
- **event_filtering.py**: Python spark scripts to filter game event messages.
- **filtered_writes.py**: is the Python spark scripts to save game event messages to file after filtering.
- **JupyterQuery.ipynb**: Jupyter Notebook to save demonstrate queries from saved data.


### Part II: Data Pipeline Architecture

![](gaming_architecture.png)

### Part III: Step By Step Implementation Process 

**step 1: Create project directory**


In [None]:
## make project directory 
mkdir -p w205/project_3

## Copy docker compose file from week 13 w205 course content
cp ~/w205/course-content//13-Querying-Data/docker-compose.yml .
cat docker-compose.yml

**step 2: Execute docker container**

Services within the docker cluster include kafka, zookeeper, saprk and mids. 

In [None]:
## Spin up docker cluster
docker-compose up -d

jupyter@python-20210907-215615:~/w205/w205/project_3$ docker-compose ps
        Name                       Command                State                      Ports                  
------------------------------------------------------------------------------------------------------------
project_3_cloudera_1    /usr/bin/docker-entrypoint ...   Exit 139                                           
project_3_kafka_1       /etc/confluent/docker/run        Up         29092/tcp, 9092/tcp                     
project_3_mids_1        /bin/bash                        Up         8888/tcp                                
project_3_presto_1      /usr/bin/docker-entrypoint ...   Up         8080/tcp                                
project_3_spark_1       docker-entrypoint.sh bash        Up         0.0.0.0:8889->8888/tcp,:::8889->8888/tcp
project_3_zookeeper_1   /etc/confluent/docker/run        Up         2181/tcp, 2888/tcp, 32181/tcp, 3888/tcp 
jupyter@python-20210907-215615:~/w205/w205/project_3$ 

**step 3: set up logs for hadoop and kafka**

In [None]:
#set up monitor log for hadoop
docker-compose logs -f cloudera

#set up monitor log for kafka
docker-compose logs -f kafka


In [None]:
Attaching to project_3_cloudera_1
project_3_cloudera_1 exited with code 139

**step 4: Create Kafka topic "Events"**


In [None]:
docker-compose exec kafka 

**step 5: Create the Web API (mobileGame_api.py)**

Below is the  mobileGame_api.py file. (This script was modified from the example from week 13 async)The code contains two Web API calls;1. buy_sword and join_guild. Both use the json to log the events to kafka. Moreover, meta data from user request was also added to the event logging. Finally, I enhanced the API to allow parameters in purchase_a_sword Web API.

**step 6: Run Flask**  
- Run flask withthe mobileGame_api.py
- launch the flask web server 

In [None]:
##Code
docker-compose exec mids env FLASK_APP=/w205/project_3/mobileGame_api.py flask run --host 0.0.0.0

**step 7: Apache Bench to generate data (**  
Utilizing a different terminal, below you can see Apache bench commands to generate Web API buy_a_sword and join_guild calls. Note that the output from the flask server matches with Apache bench commands.

In [None]:
#code
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_guild

In [None]:
## Code
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword

In [None]:
#code
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_guild

**step 8: Used kafkacat to monitor streaming messages into kafka**


In [None]:
## monitor events into kafka port 29092
docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning

**step 9:** filter kafka events  

- below is the event_filtering script for filtering the kafka events for "purchase_sword" and "join_guild"

In [None]:
#!/usr/bin/env python
"""Python Spark Script for filtering event messages"""

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf
import json

@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False


def main():
    """main"""
    spark = SparkSession \
        .builder \
        .appName("ExtractEventsJob") \
        .getOrCreate()

    events_raw = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

    event_purchases = events_raw \
        .select(events_raw.value.cast('string').alias('raw'),
                events_raw.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

    event_purchases_extracted = event_purchases \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()
    event_purchases_extracted.printSchema()
    event_purchases_extracted.show()


if __name__ == "__main__":
    main()


**step 10:** execute Spark service container with the filtering.py script

- For readability portions of the output was condensed or left out. 


In [None]:
##code
docker-compose exec spark spark-submit /w205/project_3/event_filtering.py


**Step 11: Write filtered events from Kafka into HDFS**   

- below is the script for "**filtered_writes.py**" which will take the events from Kafka and filter them for "purchase_sword" and "join_guild"
- note, the filtered write script was slightly modified from the the example in week 13. 


In [None]:
#!/usr/bin/env python
"""Python spark script for saving filtered events"""

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf
import json

@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False


def main():
    """main"""
    
    spark = SparkSession \
        .builder \
        .appName("ExtractEventsJob") \
        .getOrCreate()

    events_raw = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

    event_purchases = events_raw \
        .select(events_raw.value.cast('string').alias('raw'),
                events_raw.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

    event_joinGuild = events_raw \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_joinguild('raw'))
    
    event_purchases_extracted = event_purchases \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()
    event_purchases_extracted.printSchema()
    event_purchases_extracted.show()

    events_joinGuild_extracted = event_joinguild \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()
    event_joinGuild_extracted.printSchema()
    event_joinGuild_extracted.show()
    
    event_purchases_extracted \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')


if __name__ == "__main__":
    main()


**step 12:** Execute the filtered_writes.py script with Spark


In [None]:
##Code
docker-compose exec spark spark-submit /w205/prj3/filtered_writes.py


**step 13:** check parquet files in hadoop

In [None]:
#Code
docker-compose exec cloudera hadoop fs -ls /tmp/

#check sword purchases are written into hdfs
docker-compose exec cloudera hadoop fs -ls /tmp/purchases

#check join guilds events written to hdfs
docker-compose exec cloudera hadoop fs -ls /tmp/joinguilds

**step 14:** Spin up Jupyter Notebook and run quiries

In [None]:
##code
docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark

**Step 15:** Querey Data from Spark Using Jupyter Notebooks

-Quereying for sword purchases and joining guilds utilizing lesson from week 12  

- **(the following were copied over from the working jupyter notebook)**


In [None]:
purchases = spark.read.parquet('/tmp/purchases')
purchases.show()

In [None]:
purchases.registerTempTable('purchases')
purchases_by_example2 = spark.sql("select * from purchases where Host = 'user1.comcast.com'")
purchases_by_example2.show()

In [None]:
newdf1 = purchases_by_example2.toPandas()
newdf1.describe()


![](purchases.png)

In [None]:
join_guilds = spark.read.parquet('/tmp/joinguilds')
join_guilds.show()


In [None]:
join_guilds_example = spark.sql("select * from joinguilds where Host = 'user2.att.com'")
join_guilds_example.show()


In [None]:
newdf2 = join_guilds_example.toPandas()
newdf2.describe()

![](guild.png)

## Part IV Summary


In this project, we successfully demonstrated how to track two game events through a web surver. With Apache Bench, we were able to generate web API calls, process streaming event logs with Spark, filter those events and have them saved into Hadoop. At this point we are able to quirey upon the saved data. Overall, we were able to successfully meet all the target tasks listed in part 1.  

