# Project 2 Notebook
### Class: W205.5 - Data Engineering
    Student: Blair Jones

    Date: 23 Oct 2020

## Context
This project is an end-to-end simulation of a event streaming analytics solution.  A fictional game, Placebo, generates events as players interact with the system.  The system processes events as they are received and prepares them for analysis by data scientists working for the game company.

## Objective / Problem Statement
- Create an event processing pipeline to receive event data from game clients
- Transform game events into a form suitable for usage by data scientists
- Save event data into storage that will be accessible to the data scientists
- Demonstrate sample business queries

### Sample business queries include

#### For Game Designers

* Descriptive: What is the mean time between when a player logs in for the first time and when they purchase an item (weapon, spell, etc.)?
* Descriptive: What is the mean time between when a player logs in for the first time and they join an org (guild, team, etc.)?	
* Explanatory Analysis:  Which tactics (special offers, lower prices, social shares, etc.) are most effective at reducing the mean time for a player to purchase an item?
* Predictive:  How likely is it that the average player will be able to defeat a new AI-characters?  (ie. is the AI too smart, too tough?)

#### For Player Dashboard

* Descriptive Analysis:  What is the distribution of player wealth? (ie how many players have what amount of wealth?)
* Descriptive Analysis:  What is the distribution of player wealth by time spent playing the game? (ie do players just accumulate wealth or do they spend it?)
* Explanatory Analysis:  What are the highest contributing factors (ie. character attributes, length of play, # guilds joined, # items in inventory)?
* Predictive Analysis:  What amount of wealth is a player likely to accumulate as they play the game?


## Solution Approach
- The architecture is intended to receive data from game clients (apps, web pages, consoles), and transform and publish it in a way that data scientists can analyze the information.
- The data may also be used to create dashboards for players.


## Notebook setup

In [11]:
# No specific Python libraries were used for this project.

## Solution Overview

### Logical component view

<div style='background-color: gray'>
    <img src='./images/pipeline-overall.svg' />
</div>

## Technology Choices (for in-scope components)

#### Driving principles
The solution requires low-latency, scalability and portability.  It must also be flexibility to adapt as new functional requirements are identified.


#### Technologies
- Data Sources (event logs)
    - Game client is not in scope.
    - A Game API will be created and used to generate synthetic business events.  Flask will be used to create the API.
    
    
- Streaming Context
    - Topic Queues: Kafka
    - Data Transformation: Spark
    
    
- Distributed Analytics Storage
    - Files:  Hadoop (HDFS)


- Player Session Data
    - Key-value store: Redis


- Runtime Platform
    - Containers: Docker, Docker-Compose.  All other technologies will be run inside Docker containers.  The use of Docker ensure portability to different hosting options.
    - Hosting: Google Cloud Platform (GCP).  All components will be deployed to GCP for development and testing.  The choice of production environment can be made later.  


- Query Tools
    - Presto
    
    
- Test Data Generation
    - curl (for unit test cases)
    - Apache Bench (for bulk data generation)
    

## Solution Design

### Game Events

The Game API module receives events as routed urls that contain arguments.

* GET methods are used by some routes to return the requested information, for example to display a user profile.

* POST methods are used by some routes to receive parameters for specific actions, such as "buy a sword".  In this case the arguments will specify the type of sword and its cost.

Game events implemented in this project (arguments shown in parentheses):
- Register player (name, password, wealth:optional)
- Login (username, password)
- Buy a sword (type, cost)
- Join a guild (org, level, cost)
- Buy a spell (type, cost)
- Join a team (org, level)
- View user profile (username)

Events are implemented with Flask as routes.  Each route generates a unique ID for the event, stores key metadata associated with the event, extracts expected parameters from the http request, applies relevant business logic, writes an event out to a Kafka topic, then returns a suitable response to the invoking client.

For the purpose of this project, the simulated response to the client is either a simple html-formatted response or a simple text message.  In future versiona a more formal json data structure is appropriate to consider.


### Player Session

A player's session data is stored in Redis while the platform is running.  The basic data structure is a single value for each player's state saved under the key of the player's id.  The system is intended to generate a unique ID for each player.

Specific player attributes implemented for the purpose of this project:
- ID (string of UUID)
- Name (string)
- Password (string encrypted)
- Wealth (int)
- Health (int)
- Date created (string of timestamp)
- Inventory (list of strings)
- Affiliations (list of dictionary items)

Inventory represents a list containing individual items in the player's inventory.  Each item is a string specifying the item.  Future versions could implement this as a list of json objects where each object contains additional data about the item, such as number of same type of items, availability, etc.

Affiliations represents a list containing individual organizations a player belongs to.  Each organization is a dictionary specifying the name of the organization, the level of the player, and the timestamp when joined.  An organization can be a guild, or a team created amongst players.

For the purpose of this project, the fictional game client software is responsible for consulting the latest version of the player's state (health, wealth, inventory, etc.).  For this reason, this simulation does not return modified data to the client in the API response.


### Event Topics 

The Topic queues are the front-end for the event streaming and analytics platform.  They receive relevant business events from the game client/API.  Transformation logic is applied to wrangle the data into a form suitable for use by data scientists.

The inbound Topics for this project are:

* all_events: Every game event, regardless of type, is written to all_events.  This can then be filtered for any type of event or multiple types of events to analyze player behaviors.
* buy_events: Related to all commercial events.
* social_events: Related to joining or leaving guilds or teams, advancing in level, attaining achievements, etc.   Intended to use for specialized machine learning.


Outbound Topics identified are:

* player_notifications: Messages intended to be displayed to players at next-login or during game play.  Can also be used for analytics.  This topic is not implemented in this project.


### Data Transformation

The data extraction and transformation logic is relatively simple for this project due to the fairly streamlined definition of objects in the game API.

The key requirement is to flatten objects that have nested structures, which may not have consistent structures in the nested objects, so that they can be stored in dataframes for easy use in queries.

For example, the Player Session object contains lists for both Inventory and Affiliations.  Each of these lists may contain none, one or multiple items.  Each item may be a simple string or a json object.  To simplify queries, these structures need to be flattened in PySpark for storage into Hadoop.


### Data Storage

Storage in Hadoop is simplified after the transformations achieved with Spark.  We simply save the dataframe generated through the transformations into an appropriately named Hadoop fileset.


### Queries

Queries are built in Presto.



## Solution Implementation

#### 0) Environment Launch

In [None]:
!docker-compose up -d

#### 1) Game API Setup

**Instance Setup**

Redis is used in the Game API for managing persistent player session data.  We must install Redis before launching the Game API.  In the future this should be added to the MIDS image.

In [1]:
!docker-compose exec mids pip install redis

[33mThe directory '/w205/.cache/pip/http' or its parent directory is not owned by the current user and the cache has been disabled. Please check the permissions and owner of that directory. If executing pip with sudo, you may want sudo's -H flag.[0m
[33mThe directory '/w205/.cache/pip' or its parent directory is not owned by the current user and caching wheels has been disabled. check the permissions and owner of that directory. If executing pip with sudo, you may want sudo's -H flag.[0m
[33mYou are using pip version 8.1.1, however version 20.2.4 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


**API Launch**

In [None]:
!docker-compose exec mids env FLASK_APP=/w205/project-3-bjonesneu/game_api.py flask run --host 0.0.0.0

#### 2) Kafka Topic Setup

In [3]:
!docker-compose exec kafka kafka-topics --create --topic placebo_all_events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
!docker-compose exec kafka kafka-topics --create --topic placebo_buy_events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
!docker-compose exec kafka kafka-topics --create --topic placebo_social_events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181



In [4]:
!docker-compose exec kafka kafka-topics --describe --topic placebo_all_events --zookeeper zookeeper:32181
!docker-compose exec kafka kafka-topics --describe --topic placebo_buy_events --zookeeper zookeeper:32181
!docker-compose exec kafka kafka-topics --describe --topic placebo_social_events --zookeeper zookeeper:32181

Topic: placebo_all_events	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: placebo_all_events	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
Topic: placebo_buy_events	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: placebo_buy_events	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
Topic: placebo_social_events	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: placebo_social_events	Partition: 0	Leader: 1	Replicas: 1	Isr: 1


#### 3) Test Data Generation

**Simple calls to Game API to create events in Kafka Topics for development and testing.  These statements were executed multiple times to generate a full set of test data.**

In [9]:
!curl http://localhost:5000/
!curl -d 'userid=987611&userpwd=blahdh&username=dirk&wealth=12345' http://localhost:5000/create/user
!curl -d 'userid=987611&type=cutlass&cost=5000' http://localhost:5000/buy/sword
!curl -d 'userid=987611&type=longsword&cost=10000' http://localhost:5000/buy/sword
!curl -d 'userid=987611&org=soldiers&level=masterofarms&cost=9999' http://localhost:5000/join/guild
!curl -d 'userid=1234&userpwd=yahbadaba&username=merly&wealth=98712' http://localhost:5000/create/user
!curl -d 'userid=1234&type=charm&cost=4999' http://localhost:5000/buy/spell
!curl -d 'userid=1234&type=charm&cost=4999' http://localhost:5000/buy/spell
!curl -d 'userid=1234&org=magicians&level=accolyte&cost=99' http://localhost:5000/join/guild
!curl http://localhost:5000/data/user?userid=987611

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<title>Redirecting...</title>
<h1>Redirecting...</h1>
<p>You should be redirected automatically to target URL: <a href="static/register.html">static/register.html</a>.  If not click the link.Created user +:)
Sword purchased [:)
You do not have sufficient funds [:(
Joined #guild
Created user +:)
Spell purchased <:)
Spell purchased <:)
Joined #guild
User data: 

{"user_id": "987611", "wealth": -2654.0, "created": "2020-11-23 14:58:44.356273", "user_pwd": "****", "affiliations": [{"joined_ts": "2020-11-23 14:58:44.738252", "org": "soldiers", "level": "masterofarms"}], "health": 100, "inventory": ["cutlass"], "user_name": "dirk"}



**Verify events created in Kafka Topics**

In [10]:
!docker-compose exec mids bash -c "kafkacat -C -b kafka:29092 -t placebo_all_events -o beginning -e" | wc -l

345


#### 4) Hadoop Storage Setup

**Verify Status**

In [2]:
!docker-compose exec cloudera hadoop fs -ls /tmp/

Found 7 items
drwxr-xr-x   - root   supergroup          0 2020-11-23 14:36 /tmp/buys
drwxrwxrwt   - root   supergroup          0 2020-11-22 21:33 /tmp/checkpoints_for_buys
drwxrwxrwt   - root   supergroup          0 2020-11-22 21:33 /tmp/checkpoints_for_social
drwxrwxrwt   - mapred mapred              0 2016-04-06 02:26 /tmp/hadoop-yarn
drwx-wx-wx   - root   supergroup          0 2020-11-22 21:33 /tmp/hive
drwxrwxrwt   - mapred hadoop              0 2016-04-06 02:28 /tmp/logs
drwxr-xr-x   - root   supergroup          0 2020-11-23 14:36 /tmp/social


### 5) Spark Stream Setup

This section is executed using the CLI.  The code used is copied here for reference.

In a dedicated terminal session:
```code
CLI:   docker-compose exec spark spark-submit /w205/project-3-bjonesneu/process_buy_stream.py
```

In a dedicated terminal session:
```code
CLI:   docker-compose exec spark spark-submit /w205/project-3-bjonesneu/process_social_stream.py
```

#### 6) Presto Query Setup

#### Setup Hive table(s)

6a) Launch Hive in a dedicated terminal session:
```code
CLI:  docker-compose exec cloudera hive
```

6b) Within the Hive console, run:
```code
create external table if not exists default.buys ( 
    raw_event string,
    spk_ts string,
    accept string,
    host string,
    user_agent string,
    timestamp string,
    event_type string,
    user_id string,
    status string,
    type string,
    cost double,
    player_created string,
    player_wealth double,
    player_num_items int
)
stored as parquet
location '/tmp/buys'
tblproperties ("parquet.compress"="SNAPPY");

create external table if not exists default.social ( 
    raw_event string,
    spk_ts string,
    accept string,
    host string,
    user_agent string,
    timestamp string,
    event_type string,
    user_id string,
    status string,
    org string,
    level string,
    cost double,
    player_created string,
    player_wealth double,
    player_num_affiliations int

)
stored as parquet
location '/tmp/social'
tblproperties ("parquet.compress"="SNAPPY");
```

6c) Launch Presto in a dedicated terminal session:
```code
CLI:  docker-compose exec presto presto --server presto:8080 --catalog hive --schema default
```

6d) Run queries within the Presto console:
```code
show tables;
```
        Output:
             Table  
            --------
             buys   
             social 
            (2 rows)

```code
describe buys;
```
        Output:
                  Column      |  Type   | Comment 
            ------------------+---------+---------
             raw_event        | varchar |         
             spk_ts           | varchar |         
             accept           | varchar |         
             host             | varchar |         
             user_agent       | varchar |         
             timestamp        | varchar |         
             event_type       | varchar |         
             user_id          | varchar |         
             status           | varchar |         
             type             | varchar |         
             cost             | double  |         
             player_created   | varchar |         
             player_wealth    | double  |         
             player_num_items | integer |         
            (14 rows)

```code
select timestamp, status, type, cost from buys;
```
    Output:
                 timestamp          |      status       |  type   | cost  
        ----------------------------+-------------------+---------+-------
         2020-11-22 22:00:15.706369 | approved_purchase | saber   | 999.0 
         2020-11-22 22:00:15.712180 | approved_purchase | saber   | 999.0 
         2020-11-22 22:00:15.725177 | approved_purchase | saber   | 999.0 
         2020-11-22 22:00:15.729610 | approved_purchase | saber   | 999.0 
         2020-11-22 22:00:15.735913 | approved_purchase | saber   | 999.0 
         2020-11-22 22:00:15.739464 | approved_purchase | saber   | 999.0 
         2020-11-22 22:00:15.743363 | approved_purchase | saber   | 999.0 
         2020-11-22 22:00:15.753620 | approved_purchase | saber   | 999.0 
         2020-11-22 22:00:16.357225 | approved_purchase | potion  |  50.0 
         2020-11-22 22:00:16.362849 | approved_purchase | potion  |  50.0 
         2020-11-22 22:00:16.374195 | approved_purchase | potion  |  50.0 
         2020-11-22 22:00:16.381072 | approved_purchase | potion  |  50.0 
         2020-11-22 22:00:16.388540 | approved_purchase | potion  |  50.0 
         2020-11-22 22:00:16.398242 | approved_purchase | potion  |  50.0 
         2020-11-22 22:00:16.407975 | approved_purchase | potion  |  50.0 
         2020-11-22 22:00:17.001598 | approved_purchase | cutlass | 777.0 
         2020-11-22 22:00:17.010693 | approved_purchase | cutlass | 777.0 
         2020-11-22 22:00:17.017223 | approved_purchase | cutlass | 777.0 
         2020-11-22 22:00:17.026247 | approved_purchase | cutlass | 777.0



## Solution Demonstration

**Setup base data for stream**

In [None]:
!curl -d 'userid=987611&userpwd=blahdh&username=dirk&wealth=1234500' http://localhost:5000/create/user
!curl -d 'userid=123456&userpwd=yahbadaba&username=merly&wealth=9871200' http://localhost:5000/create/user

**Generate continuous event stream**

In a dedicated terminal session:
```code
while true; do

  docker-compose exec mids ab -n $(($RANDOM%10 )) \
  -p /w205/project-3-bjonesneu/test/post-buy$(($RANDOM%5 )).txt \
  -T "application/json" -H "Host: user1.comcast.com" http://localhost:5000/buy/spell
  
  docker-compose exec mids ab -n $(($RANDOM%10 )) \
  -p /w205/project-3-bjonesneu/test/post-buy$(($RANDOM%5 )).txt \
  -T "application/json" -H "Host: user1.comcast.com" http://localhost:5000/buy/sword

  docker-compose exec mids ab -n $(($RANDOM%10 )) \
  -p /w205/project-3-bjonesneu/test/post-buy$(($RANDOM%5 )).txt \
  -T "application/json" -H "Host: user1.comcast.com" http://localhost:5000/buy/spell

  docker-compose exec mids ab -n $(($RANDOM%10 )) \
  -p /w205/project-3-bjonesneu/test/post-join$(($RANDOM%5 )).txt \
  -T "application/json" -H "Host: user1.comcast.com" http://localhost:5000/join/guild

  docker-compose exec mids ab -n $(($RANDOM%10 )) \
  -p /w205/project-3-bjonesneu/test/post-join$(($RANDOM%5 )).txt \
  -T "application/json" -H "Host: user1.comcast.com" http://localhost:5000/join/team

sleep 5
done
```

### Business Queries

Here are sample queries that may be helpful for future data science team users of this platform.

NOTE:  All these queries are executed within the dedicated Presto session.


### Query - How many items of each type were purchased?

```sql
select status, type, count(type) as num_items from buys group by status, type order by status;
```
        Output:
                   status       |   type    | num_items 
            --------------------+-----------+----------
             approved_purchase  | saber     |   106 
             approved_purchase  | potion    |    44 
             approved_purchase  | dagger    |    36 
             approved_purchase  | cutlass   |    39 
             approved_purchase  | charm     |     2 
             insufficient_funds | longsword |     1 
            (6 rows)


### Query - What is the average price paid for each type of item?

```sql
select type, round(avg(cost), 2) as avg_cost from buys where status='approved_purchase' group by type order by type;
```
        Output:
                type    | avg_cost 
            ------------+----------
             broadsword |   1034.0 
             charm      |   4999.0 
             cutlass    |    777.0 
             dagger     |     55.0 
             potion     |     50.0 
             saber      |    999.0 
            (6 rows)

### Query - Which team or guild is most popular?

```sql
select event_type, org, count(org) as num_joins from social group by event_type, org order by num_joins desc; 
```
        Output:
             event_type |      org       | num_joins 
            ------------+----------------+-----------
             join_team  | Magician Guild |       121 
             join_team  | team ABCD      |       100 
             join_guild | Magician Guild |        94 
             join_guild | Golds Gym      |        85 
             join_guild | team BLAH      |        83 
             join_guild | santas shop    |        80 
             join_team  | Golds Gym      |        80 
             join_team  | santas shop    |        77 
             join_guild | team ABCD      |        70 
             join_team  | team BLAH      |        61 
            (10 rows)

### Query - What is the average wealth of players joining each organization?

```sql
select org, round(avg(player_wealth), 2) as avg_wealth from social group by org order by org; 
```
        Output:
                  org       | avg_wealth 
            ----------------+------------
             Golds Gym      | 1202045.52 
             Magician Guild | 9617308.02 
             santas shop    | 1204290.71 
             team ABCD      | 9615629.38 
             team BLAH      | 9521707.42 
            (5 rows)

## Optional Features

A number of optional features were implemented in this project:

- Several types of events were generated from the Game API and filtered in the pyspark stream.  These events had different API signatures as well as generated different data structures.

- The API used two http verbs for different purposes:
    * `GET` for data-request style functions (lookup user profile)
    * `POST` for data-update style functions (buy an item, join an org)


- The API accepted different *parameters* for each function.

- The Redis database was used by the Game API to track player state during gameplay.  This maintained the player's inventory of items, wealth after purchases, and list of affiliations.  Other attributes were configured but not used for this project.


## Closing Thoughts

This project demonstrates the implementation of an end-to-end event streaming and analytics pipeline using a number of open-source technologies:
* Simulated clients invoke an API
* The API generates events into queues
* Events are incrementally read from queues, filtered and written to relevant event datastores
* Queries are used to analyze the data streaming into the event datastores

This is a simple demonstration of a configurable, resilient and scalable platform typically used for streaming analytics work.
