# W205 Project 3
### Andy Cui, Ivan Fan, Miroslava Walekova

### Introduction

In this project, we are working as data scientists for a gaming company focused on understanding the revenue streams and game retention for our newest game. There are two major actions that players can use to interact with our game financially: they can (1) add money to their accounts, after which they can (2) purchase in game items. There are also two major events that users can take on the social site, they can (1) join a guild to network with other players, and (2) cancel or leave the guild if they are unhappy. We've set up our pipeline to stream in data from the game api into our data pipeline, and are able to issue queries on these pieces of data to better understand our users and drive business decisions. 

### Tasks and Setup

Below is a list of commands that we ran to set up our data collection pipeline for our game. 

In [1]:
!cat walekova_annotations.md

# My annotations, Project 3

## New window (1) - Navigating to w205 directory
cd w205/

## Window (1) - Creating full-stack2 directory
mkdir full-stack2

## Window (1) - Next, I'm checking what's in my directory
ls

## Window (1) - Navigate to my new full-stack2 directory
cd full-stack2

## Window (1) - Looking at the docker-compose file to check content
cat docker-compose.yml

## Window (1) - Quick Sanity Check 
docker-compose ps

## Window (1) - Spinning up my kafka, zookeeper, mids, cloudera/hadoop cluster.
docker-compose up -d

## Window (1) - Quick Sanity Check
docker-compose ps

## Window (1) - Check whether the binding succeeded
docker-compose logs zookeeper | grep -i binding

## Window (1) - Check that the right kafka components have started
docker-compose logs kafka | grep -i started

## Window (1) - Checking out Hadoop
docker-compose exec cloudera hadoop fs -ls /tmp/

## Window (1) - Bring Flask Up
docker-compose exec mids env FLASK_APP=/w205/full-stack2/game_api.py flask run

##### game_api.py
Our game api has major changes. We added the purchasing of a knife, as well as descriptions for both of these types of purchases. In addition, we added a way for users to add money to their accounts. They can do this by querying the api with the amount parameter set. We also allow for users to join guilds and cancel their memberships in such guilds. Finally, we allow users to post message to the shared message board. 

In [2]:
! cat game_api.py

#!/usr/bin/env python
import json
from kafka import KafkaProducer
from flask import Flask, request

app = Flask(__name__)
producer = KafkaProducer(bootstrap_servers='kafka:29092')


def log_to_kafka(topic, event):
    event.update(request.headers)
    producer.send(topic, json.dumps(event).encode())


@app.route("/")
def default_response():
    default_event = {'event_type': 'default'}
    log_to_kafka('game_events', default_event)
    return "This is the default response!\n"

@app.route("/purchase_a_sword")
def purchase_a_sword():
    sword_type = str(request.args.get('sword_type'))
    purchase_sword_event = {'event_type': 'purchase_sword',
                            'description': sword_type,
                             'amount': '-10'}
    log_to_kafka('game_events', purchase_sword_event)
    return "Sword Purchased!\n"

@app.route("/purchase_a_knife")
def purchase_a_knife():
    knife_type = str(request.args.get('knife_type'))
    purchase_knife_event = {'event_type': 'purchase_

##### write_stream.py
This is our major spark streaming job. Every two minutes, we will filter out all the purchase, add money, guild related, and messaging events, decorate them with some request header information like host name, and write them out to HDFS. Each type of event is written into separate loactions because they have different schemas.  

In [3]:
! cat write_stream.py

#!/usr/bin/env python
"""Extract events from kafka and write them to hdfs
"""
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType


def transaction_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- description: string (nullable = true)
    |-- amount: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("amount", StringType(), True),
        StructField("description", StringType(), True),
    ])


def purchase_event_schema():
    return transact

### Analysis

#### Transactions 

The following user actions are tracked: addition of money, individual purchases. 

This information is combined to enable presentation of all transactions and current balance for all as well as each individual user.


##### Purchases

In [5]:
purchases = spark.read.parquet('/tmp/purchases')
purchases.registerTempTable('purchases')

In [6]:
spark.sql('select * from purchases').toPandas().head(1000)

Unnamed: 0,raw_event,timestamp,Accept,Host,User-Agent,event_type,amount,description
0,"{""event_type"": ""purchase_sword"", ""amount"": ""-1...",2019-08-09 20:49:21.542,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_sword,-10,Great Sword
1,"{""event_type"": ""purchase_sword"", ""amount"": ""-1...",2019-08-09 20:49:21.546,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_sword,-10,Great Sword
2,"{""event_type"": ""purchase_knife"", ""amount"": ""-1...",2019-08-09 20:49:29.191,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_knife,-10,
3,"{""event_type"": ""purchase_knife"", ""amount"": ""-1...",2019-08-09 20:49:29.195,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_knife,-10,
4,"{""event_type"": ""purchase_knife"", ""amount"": ""-1...",2019-08-09 20:49:29.2,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_knife,-10,
5,"{""event_type"": ""purchase_knife"", ""amount"": ""-1...",2019-08-09 20:49:29.203,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_knife,-10,
6,"{""event_type"": ""purchase_knife"", ""amount"": ""-1...",2019-08-09 20:49:29.213,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_knife,-10,
7,"{""event_type"": ""purchase_knife"", ""amount"": ""-1...",2019-08-09 20:49:29.224,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_knife,-10,
8,"{""event_type"": ""purchase_knife"", ""amount"": ""-1...",2019-08-09 20:49:29.226,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_knife,-10,
9,"{""event_type"": ""purchase_knife"", ""amount"": ""-1...",2019-08-09 20:49:29.231,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_knife,-10,


It is easy to count the total number of purchases made by all players. Here we can see that knives seem to be far more popular than swords. Perhaps this means we should offer incentives for players to buy knives! 

In [7]:
spark.sql('select event_type, count(event_type) from purchases group by event_type').toPandas().head(1000)

Unnamed: 0,event_type,count(event_type)
0,purchase_sword,2
1,purchase_knife,15


The data contains host information, it can be used to analyse buying behavior of individual users. In the example below oski user has bought all the knives and 2 of the swords in preparation for a raid, indicating that oski is a power spender and should be retained at all costs. 

In [8]:
spark.sql('select event_type, count(event_type) from purchases where host = "oski.berkeley.edu" group by event_type ').toPandas().head(1000)

Unnamed: 0,event_type,count(event_type)
0,purchase_sword,2
1,purchase_knife,15


##### Add Money

All data related users adding money to their account is captured in money table.

In [9]:
import pyspark.sql.types as types
import pyspark.sql.functions as fn

money = spark.read.parquet('/tmp/add_money')
money = money.withColumn("amount_int", money["amount"].cast(types.IntegerType()))
money.registerTempTable('add_money')

In [10]:
spark.sql('select * from add_money').toPandas().head(1000)

Unnamed: 0,raw_event,timestamp,Accept,Host,User-Agent,event_type,amount,description,amount_int
0,"{""event_type"": ""add_money"", ""amount"": ""5"", ""Ac...",2019-08-09 20:48:57.599,*/*,localhost,ApacheBench/2.3,add_money,5,money,5
1,"{""event_type"": ""add_money"", ""amount"": ""5"", ""Ac...",2019-08-09 20:48:57.606,*/*,localhost,ApacheBench/2.3,add_money,5,money,5
2,"{""event_type"": ""add_money"", ""amount"": ""5"", ""Ac...",2019-08-09 20:48:57.615,*/*,localhost,ApacheBench/2.3,add_money,5,money,5
3,"{""event_type"": ""add_money"", ""amount"": ""5"", ""Ac...",2019-08-09 20:48:57.631,*/*,localhost,ApacheBench/2.3,add_money,5,money,5
4,"{""event_type"": ""add_money"", ""amount"": ""5"", ""Ac...",2019-08-09 20:48:57.647,*/*,localhost,ApacheBench/2.3,add_money,5,money,5
5,"{""event_type"": ""add_money"", ""amount"": ""5"", ""Ac...",2019-08-09 20:48:57.649,*/*,localhost,ApacheBench/2.3,add_money,5,money,5
6,"{""event_type"": ""add_money"", ""amount"": ""5"", ""Ac...",2019-08-09 20:48:57.654,*/*,localhost,ApacheBench/2.3,add_money,5,money,5
7,"{""event_type"": ""add_money"", ""amount"": ""5"", ""Ac...",2019-08-09 20:48:57.656,*/*,localhost,ApacheBench/2.3,add_money,5,money,5
8,"{""event_type"": ""add_money"", ""amount"": ""5"", ""Ac...",2019-08-09 20:48:57.66,*/*,localhost,ApacheBench/2.3,add_money,5,money,5
9,"{""event_type"": ""add_money"", ""amount"": ""5"", ""Ac...",2019-08-09 20:48:57.662,*/*,localhost,ApacheBench/2.3,add_money,5,money,5


We can break down the spending habits per user, and we immediately see that oski alone has spent 10k on the game analysed, adding \$100 to their account 100 times. 

This further reinforces the idea that oski is an invested player that we should keep. 

In [11]:
spark.sql('select Host, sum(amount_int), avg(amount_int), count(amount_int)  from add_money group by Host ').toPandas()

Unnamed: 0,Host,sum(amount_int),avg(amount_int),count(amount_int)
0,localhost,150,7.5,20
1,oski.berkeley.edu,10000,100.0,100
2,tree.stanfurd.edu,1,1.0,1


##### Transactions

All data related to addition of money and purchases is combined in money table. This enables easy analysis of all user transactions and their current balance.

In [12]:
transactions = spark.read.parquet('/tmp/transactions')
transactions = transactions.withColumn("amount_int", transactions["amount"].cast(types.IntegerType()))
transactions.registerTempTable('transactions')

In [13]:
spark.sql('select * from transactions where Host = "oski.berkeley.edu"').toPandas().head(1000)

Unnamed: 0,raw_event,timestamp,Accept,Host,User-Agent,event_type,amount,description,amount_int
0,"{""event_type"": ""purchase_sword"", ""amount"": ""-1...",2019-08-09 20:49:21.542,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_sword,-10,Great Sword,-10
1,"{""event_type"": ""purchase_sword"", ""amount"": ""-1...",2019-08-09 20:49:21.546,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_sword,-10,Great Sword,-10
2,"{""event_type"": ""purchase_knife"", ""amount"": ""-1...",2019-08-09 20:49:29.191,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_knife,-10,,-10
3,"{""event_type"": ""purchase_knife"", ""amount"": ""-1...",2019-08-09 20:49:29.195,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_knife,-10,,-10
4,"{""event_type"": ""purchase_knife"", ""amount"": ""-1...",2019-08-09 20:49:29.2,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_knife,-10,,-10
5,"{""event_type"": ""purchase_knife"", ""amount"": ""-1...",2019-08-09 20:49:29.203,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_knife,-10,,-10
6,"{""event_type"": ""purchase_knife"", ""amount"": ""-1...",2019-08-09 20:49:29.213,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_knife,-10,,-10
7,"{""event_type"": ""purchase_knife"", ""amount"": ""-1...",2019-08-09 20:49:29.224,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_knife,-10,,-10
8,"{""event_type"": ""purchase_knife"", ""amount"": ""-1...",2019-08-09 20:49:29.226,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_knife,-10,,-10
9,"{""event_type"": ""purchase_knife"", ""amount"": ""-1...",2019-08-09 20:49:29.231,*/*,oski.berkeley.edu,ApacheBench/2.3,purchase_knife,-10,,-10


Example: view current balance for the users

In [16]:
spark.sql('select Host, sum(amount_int) from transactions group by Host').toPandas().head(1000)

Unnamed: 0,Host,sum(amount_int)
0,localhost,150
1,oski.berkeley.edu,9830
2,tree.stanfurd.edu,1


### Memberships and cancellations

In [18]:
guild = spark.read.parquet('/tmp/guild_membership')
guild.registerTempTable('guild_membership')
cancels = spark.read.parquet('/tmp/cancel_membership')
cancels.registerTempTable('cancel_membership')

#### Memberships

We can see that the stanford user joined the stanford guild, and the berkeley user joined the berkeley guild. This is important information to be able to see what users are connected, how and when. These insights may allow us to attract new users, build stronger connections between current players and increase our retention. 

In [19]:
spark.sql('select * from guild_membership').toPandas()

Unnamed: 0,raw_event,timestamp,Accept,Host,User-Agent,event_type,guild_name
0,"{""Host"": ""tree.stanford.edu"", ""User-Agent"": ""A...",2019-08-09 20:50:04.701,*/*,tree.stanford.edu,ApacheBench/2.3,join_guild,stanford
1,"{""Host"": ""berkeley.edu"", ""User-Agent"": ""Apache...",2019-08-09 20:50:11.355,*/*,berkeley.edu,ApacheBench/2.3,join_guild,berkeley


#### Cancellations

In fact, we can see this in action. The stanford host canceled their membership not long after joining the stanford guild. Is this due to being dissatisfied? Perhaps the berkeley guild beat the stanford guild and the stanford host is changing sides? They did not leave a reason for leaving, but such user feedback is extremeley important. 

In [20]:
spark.sql('select * from cancel_membership').toPandas()

Unnamed: 0,raw_event,timestamp,Accept,Host,User-Agent,event_type,cancel_reason
0,"{""Host"": ""tree.stanford.edu"", ""cancel_reason"":...",2019-08-09 20:50:29.541,*/*,tree.stanford.edu,ApacheBench/2.3,cancel_membership,


#### Messaging (GET & POST)

Finally, we can see all the messages posted by users to the message board. The only message we see currently is the admin reminding everyone that cheating is not allowed. However, activity on this message board is also important to gauge user involvement. 

In [42]:
messages2 = spark.read.parquet('/tmp/messages2')
messages2.registerTempTable('messages2')

In [44]:
spark.sql('select * from messages2').toPandas()

Unnamed: 0,raw_event,timestamp,Accept,Host,User-Agent,event_type,message_post
0,"{""message_post"": ""this is a GET message"", ""Hos...",2019-08-09 22:11:13.056,*/*,localhost:5000,curl/7.47.0,mensaje,this is a GET message
1,"{""Content-Length"": ""45"", ""event_type"": ""mensaj...",2019-08-09 22:11:17.059,*/*,localhost:5000,curl/7.47.0,mensaje,this is a POST message - booyah!
