# Understanding User Behavior: Tracking Game Events

*Project 3 by Derek Topper, for MIDS W205, 8/2/2020*

##### Summary

In this project, we are working as a data scientist for a gaming company interested in understanding the actions that players take in a game. In this project, we'll focus on different user purchases and their ability to join a guild. In this pipeline, we'll be able to stream data through our pipeline and complete queries on these items to better understand our data. Specifically, we'll deploy a docker cluster to demonstrate how events are captured by flask web server, how these events are published to the Kafka topic, the ingestion of the message and queries in Presto.

##### Introduction

In this project, I am a data scientist at a game development company. The company's latest mobile game has two events you're interested in tracking: buy a sword & join guild. Each has metadata characterstic of such events (i.e. sword amount, guild description, etc)

##### Tasks

- Instrument your API server to log events to Kafka

- Assemble a data pipeline to catch these events: use Spark streaming to filter
  select event types from Kafka, land them into HDFS/parquet to make them
  available for analysis using Presto

- Use Apache Bench to generate test data for your pipeline

- Produce an analytics report where you provide a description of your pipeline
  and some basic analysis of the events

There are four files associated with this project:

* This file is the report with step-by-step command annotations and the queries.
* docker-compose.yml is the docker cluster configuration file.
* game_api.py is the Python API for the Web Server.
* write_stream.py is the Python spark scripts to separate, filter, and write the game event messages.

### Execution and Exhibition

##### Setup

I started off by creating and navigating to my directory called w205/project-3-derektopper and copied the files used in my course's Week 14 materials. 

```
cd w205/project-3-derektopper
cp ~/w205/course-content/14-Patterns-for-Data-Pipelines/docker-compose.yml .
cp ~/w205/course-content/14-Patterns-for-Data-Pipelines/game_api.py .
cp ~/w205/course-content/14-Patterns-for-Data-Pipelines/*.py .
```

The docker-compose.yml file I used below contained the following docker containers. The container labeled mids will expose port 5000, which I will use for the game events web API. The Zookeeper container listens on port 32181, which is how Zookeeper connects to Kafka. Kafka, alternatively, listens for data streaming on port 29092. The Spark container allows us 

The docker-compose.yml has various docker containers within it. The mids container exposes port 5000, which will be used for game events Web API. Zookeeper listens on port number 32181, which Kafka uses to connect to Zookeeper. Kafka listens on port number 29092 for data streaming. The Presto container will allow us to query this data.




Below is the docker-compose.yml file.

```
---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
    expose:
      - "2181"
      - "2888"
      - "32181"
      - "3888"
    extra_hosts:
      - "moby:127.0.0.1"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    expose:
      - "9092"
      - "29092"
    extra_hosts:
      - "moby:127.0.0.1"

  cloudera:
    image: midsw205/hadoop:0.0.2
    hostname: cloudera
    expose:
      - "8020" # nn
      - "8888" # hue
      - "9083" # hive thrift
      - "10000" # hive jdbc
      - "50070" # nn http
    ports:
      - "8888:8888"
    extra_hosts:
      - "moby:127.0.0.1"

  spark:
    image: midsw205/spark-python:0.0.6
    stdin_open: true
    tty: true
    volumes:
      - ~/w205:/w205
    expose:
      - "8888"
    ports:
      - "8889:8888" # 8888 conflicts with hue
    depends_on:
      - cloudera
    environment:
      HADOOP_NAMENODE: cloudera
      HIVE_THRIFTSERVER: cloudera:9083
    extra_hosts:
      - "moby:127.0.0.1"
    command: bash

  presto:
    image: midsw205/presto:0.0.1
    hostname: presto
    volumes:
      - ~/w205:/w205
    expose:
      - "8080"
    environment:
      HIVE_THRIFTSERVER: cloudera:9083
    extra_hosts:
      - "moby:127.0.0.1"

  mids:
    image: midsw205/base:0.1.9
    stdin_open: true
    tty: true
    volumes:
      - ~/w205:/w205
    expose:
      - "5000"
    ports:
      - "5000:5000"
    extra_hosts:
      - "moby:127.0.0.1"
```

Next, I wanted to edit the game_api to increase the functionality offered within our game. As a result, I added various events to the game_api.py file. These events will allow us to track new user purchases. Whereas, this file initially contained sword purchasing, I added the ability to purchase armor, a knife, bow and arrows. I also included the ability to join a guild. Thus, I used the following Python code, and web API calls to log the events to kafka and allowed additional parameters in these calls.

`vim game_api.py`

The new `game_api.py` file:

```
#!/usr/bin/env python
import json
from kafka import KafkaProducer
from flask import Flask, request

app = Flask(__name__)
producer = KafkaProducer(bootstrap_servers='kafka:29092')


def log_to_kafka(topic, event):
    event.update(request.headers)
    producer.send(topic, json.dumps(event).encode())


@app.route("/")
def default_response():
    default_event = {'event_type': 'default'}
    log_to_kafka('events', default_event)
    return "This is the default response!\n"


@app.route("/purchase_a_sword")
def purchase_a_sword():
    purchase_sword_event = {'event_type': 'purchase_sword',
                           'description': 'upgrade sword strength',
                           'amount': '1'}
    log_to_kafka('events', purchase_sword_event)
    return "Sword Purchased!\n"

@app.route("/purchase_a_knife")
def purchase_a_knife():
    purchase_knife_event = {'event_type': 'purchase_knife',
                           'description': 'upgrade knife strength',
                           'amount': '1'}
    log_to_kafka('events', purchase_knife_event)
    return "Knife Purchased!\n"

@app.route("/purchase_armor")
def purchase_armor():
    purchase_armor_event = {'event_type': 'purchase_armor',
                           'description': 'upgrade armor strength',
                           'amount': '1'}
    log_to_kafka('events', purchase_armor_event)
    return "Armor Purchased!\n"

@app.route("/purchase_bow")
def purchase_bow():
    purchase_bow_event = {'event_type': 'purchase_bow',
                           'description': 'upgrade bow strength',
                           'amount': '1'}
    log_to_kafka('events', purchase_bow_event)
    return "Bow Purchased!\n"

@app.route("/purchase_arrow")
def purchase_arrow():
    purchase_arrow_event = {'event_type': 'purchase_arrow',
                           'description': 'upgrade arrow strength',
                           'amount': '1'}
    log_to_kafka('events', purchase_arrow_event)
    return "Arrow Purchased!\n"


@app.route("/join_guild")
def join_guild():
    join_guild_event = {'event_type': 'join_guild',
                        'description': 'joining a guild'}
    log_to_kafka('events', join_guild_event)
    return "Joined Guild!\n"
```

Next in order to be able to work with this data, I used PySpark to batch the contents from kafka and write them to hdfs. We were able to keep the extracted events whole, but was able to create two separate parquet files, one for all of the purchases that users made, such as buying a sword, and one for the guild membership, such as a user joining a guild. This allows us to do everything in a stream.


The `write_stream.py` file now:

```
#!/usr/bin/env python
"""Extract events from kafka and write them to hdfs
"""
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

def transaction_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- description: string (nullable = true)
    |-- amount: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("amount", StringType(), True),
        StructField("description", StringType(), True),
    ])



def join_guild_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])

@udf('boolean')
def is_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if 'purchase' in event['event_type']:
        return True
    return False

@udf('boolean')
def is_join_guild(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'join_guild':
        return True
    return False


def main():
    """main
    """
    spark = SparkSession \
        .builder \
        .appName("ExtractEventsJob") \
        .getOrCreate()

    raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

    purchases = raw_events \
        .filter(is_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          transaction_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')
    
    guild_membership = raw_events \
        .filter(is_join_guild(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          join_guild_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

    sink_purchases = purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_purchases") \
        .option("path", "/tmp/purchases") \
        .trigger(processingTime="120 seconds") \
        .start()
    
    sink_guild_membership = guild_membership \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_guild_membership") \
        .option("path", "/tmp/guild_membership") \
        .trigger(processingTime="120 seconds") \
        .start()

    sink_purchases.awaitTermination()
    sink_guild_membership.awaitTermination()

if __name__ == "__main__":
    main()
```

##### Docker

Next I want to spin up the docker container cluster. The docker containers included in the cluster are kafka, zookeeper, presto, spark and mids. I first ensure that there are no stray containers and that all the networks are used by at least by one container. Then I spin up the docker container cluster.


* `docker ps -a`
* `docker network ls`
* `docker network prune`
* `docker-compose up -d`

Next, I created a Kafka topic called events.


```docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181```

Next, I wanted to be able to run Flask, using the game_api.py I created so that I could generate game events. Thus, I used a command line interface window to run the following command. This allows me to launch a flask web server listening on port 5000 from any hosts, specifically, 0.0.0.0.

```docker-compose exec mids env FLASK_APP=/w205/project-3-derektopper/game_api.py flask run --host 0.0.0.0```

To ensure that all of our messages that we'll publish into kafka are monitored, I continuously ran kafkacat in a new terminal to ensure this data is collected.

`docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning`

Then, we execute the `write_stream.py` file, in a new terminal, to write purchase events using Spark. We then run the streaming job that we want to link up using the following command.

```docker-compose exec spark spark-submit /w205/project-3-derektopper/write_stream.py```

Then, in a new terminal, I generated test data for our pipeline with Apache Bench. The following commands represent test data for 3 users (user1.comcast.com, user2.att.com and user3.spectrum.com) generating 5, 10, or 15 of each event (default, purchase_a_sword, purchase_a_knife, join_guild, purchase_armor, leave_guild, attack_enemy).

* `docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/`
* `docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/`
* `docker-compose exec mids ab -n 10 -H "Host: user3.spectrum.com" http://localhost:5000/`

* `docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword`
* `docker-compose exec mids ab -n 15 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword`
* `docker-compose exec mids ab -n 15 -H "Host: user3.spectrum.com" http://localhost:5000/purchase_a_sword`

* `docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_knife`
* `docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_knife`
* `docker-compose exec mids ab -n 10 -H "Host: user3.spectrum.com" http://localhost:5000/purchase_a_knife`

* `docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_a_guild`
* `docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_a_guild`
* `docker-compose exec mids ab -n 5 -H "Host: user3.spectrum.com" http://localhost:5000/join_a_guild`

* `docker-compose exec mids ab -n 15 -H "Host: user1.comcast.com" http://localhost:5000/purchase_armor`
* `docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_armor`
* `docker-compose exec mids ab -n 10 -H "Host: user3.spectrum.com" http://localhost:5000/purchase_armor`

* `docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_arrow`
* `docker-compose exec mids ab -n 15 -H "Host: user2.att.com" http://localhost:5000/purchase_bow`
* `docker-compose exec mids ab -n 5 -H "Host: user3.spectrum.com" http://localhost:5000/purchase_arrow`

* `docker-compose exec mids ab -n 5 -H "Host: user1.comcast.com" http://localhost:5000/purchase_bow`
* `docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_arrow`
* `docker-compose exec mids ab -n 15 -H "Host: user3.spectrum.com" http://localhost:5000/purchase_bow`


Next, in a new terminal, we open hive in our cloudera container. We'll then use hive to create each of the external tables that we'll want to query. Now we've created two tables, one for all our purchases and the other for our guild membership.

`docker-compose exec cloudera hive`




`create external table if not exists default.purchases (Accept string, Host string, User_Agent string, event_type string, timestamp string, raw_event string, amount string) stored as parquet location '/tmp/purchases'  tblproperties ("parquet.compress"="SNAPPY");`

`create external table if not exists default.guild_membership (Accept string, Host string, User_Agent string, event_type string, timestamp string, raw_event string) stored as parquet location '/tmp/guild_membership'  tblproperties ("parquet.compress"="SNAPPY");`

##### Queries

Then, we're going to use Presto to run queries to check that the datastream is being saved.

docker-compose exec presto presto --server presto:8080 --catalog hive --schema default

(in Presto)

show tables;
describe purchases;
select * from purchases;

Now, we write commands in Presto. The first one, `show tables`, allows us to see the two tables we created in the earlier hive step.

`presto:default> show tables;`

```
      Table       
------------------
 guild_membership 
 purchases        
(2 rows)
```

Then we can look at the command `describe purchases` then we can see the various columns and types for the table's variables. There's one column for each piece of data we have.

`presto:default> describe purchases;`

```
   Column   |  Type   | Comment 
------------+---------+---------
 accept     | varchar |         
 host       | varchar |         
 user_agent | varchar |         
 event_type | varchar |         
 timestamp  | varchar |         
 raw_event  | varchar |         
 amount     | varchar |         
(7 rows)
``` 

Next we look at how many rows are in our purchase data. I added 165 entries to the purchases table in our data and thus there 165 different item purchases.

`presto:default> select count(amount) from purchases;`

```
 _col0 
-------
   165 
(1 row)
```

We can also look at the guild_membership table we made, and we can see that this table only has 6 columns, with there not being an amount column in the guild membership table.

`presto:default> describe guild_membership;`

```   Column   |  Type   | Comment 
------------+---------+---------
 accept     | varchar |         
 host       | varchar |         
 user_agent | varchar |         
 event_type | varchar |         
 timestamp  | varchar |         
 raw_event  | varchar |         
(6 rows)
```

By analyzing this data, we can also see that there are just 25 rows in the guild_membership table.


`presto:default> select count(*) from guild_membership;`

```_col0 
-------
    25 
(1 row)
```

Next, we can use this data to answer some of the questions we have about the data. For example, we might want to know how many different purchases each user made to see which user made the most. For example, we might want to see this information to see how invested in our game a user is. Thus, we can run a command that allows us to group each user by their number of purchases and see how many purchases they made. For example, in this table, we can see that `user2.att.com` made the most purchases, with 60, while `user1.comcast.com` made the least purchases with just 50. 

`presto:default> select host, count(*) from purchases group by host;`

```   host      | _col1 
--------------------+-------
 user3.spectrum.com |    55 
 user1.comcast.com  |    50 
 user2.att.com      |    60 
(3 rows)
```

Next, we may want to analyze user behavior to see what items are purchased the most often. If we have this information, we can better understand how our users interact with the game's options and see what mechanics they use most often. For example, in this case, we can see that arrow purchses are the least common, with just 25, while sword purchases are much more common with 40. Using this table, we can also see things like that our users bought more swords than knives, which could potentially tell our company that we should focus on future expansions around the sword mechanic of the game, such as by offering different sword customization options or allowing users to purchase sword sheaths, rather than those centered around knives. 

`presto:default> select event_type , count(amount) as count from purchases group by event_type  order by count;`

```event_type     | count 
----------------+-------
 purchase_arrow |    25 
 purchase_knife |    30 
 purchase_bow   |    35 
 purchase_armor |    35 
 purchase_sword |    40 
(5 rows)
```


We may also want to evaluate an individual user. In this case, we look at the user history for `user3.spectrum.com`. We can see that they purchased many more bows than they did arrows. As one would expect that more arrows are purchased, this could indicate some user behavior that we may want to evaluate more deeply.

`presto:default> select event_type , count(*) as count from purchases where Host = 'user3.spectrum.com' group by event_type  order by count;`

```
    event_type    | count 
----------------+-------
 purchase_arrow |     5 
 purchase_knife |    10 
 purchase_armor |    10 
 purchase_sword |    15 
 purchase_bow   |    15 
(5 rows)
```

We can also evaluate the number of different purchases that are made of a specific type by users. Let's say we want to know the breakdown of user purchases of a specific item. We are able to do this, such as the breakdown of bow and arrow purchases as shown below. This type of information could be useful to potentially offer different discounts or deals based on users who purchase a certain amount of multiple types of items.

`presto:default> select event_type, count(amount) as count from purchases where raw_event = 'purchase_bow' group by event_type order by count;`
```
     event_type     | count 
--------------------+-------
 user1.comcast.com  |     5 
 user2.att.com      |    15 
 user3.spectrum.com |    15 
(3 rows)
```

`presto:default> select event_type, count(amount) as count from purchases where raw_event = 'purchase_arrow' group by event_type order by count;`
```
event_type     | count 
--------------------+-------
 user3.spectrum.com |     5 
 user1.comcast.com  |    10 
 user2.att.com      |    10 
(3 rows)
```

Finally, we can evaluate data related to the guilds. We have various data about guild membership requests. Thus, we are able to use our data to see which users sent join guild requests. If we look below, we can see that every user requested to join a guild. While obviously this 100% join rate is related to our lack of users, this can be helpful for knowing the ultization rate of guilds and allow us to make game decisions as a result.

`presto:default> select distinct(event_type) from guild_membership group by event_type;`

```
     event_type     
--------------------
 user1.comcast.com  
 user2.att.com      
 user3.spectrum.com 
(3 rows)
```

Finally, we want to take down the docker cluster and confirm that it was taken down properly. 

`docker-compose down`

`docker-compose ps`

`docker ps -a`

Finally, in summary, this assignment saw me successfully create a docker cluster. I showed that multiple game events could be captured by a web-based server and log information to publish to kafka.  I used Spark streaming to filter selected event types from Kafka, and converted them into parquet to make them available for analysis using Presto. I used Apache Bench to generate web server API calls to generate test data for my pipeline. I ran various Presto queries and produced an analytics report to provide a description of the pipeline and some analysis of the events.
