# Project 3: Understanding User Behavior - End to End Pipeline
#### Carlos Moreno
#### W205 - Fall 2021



#### **This project is organized in the following sections:**

#### I. Project Description
#### II. Data Pipeline Architecture
#### III. Steps for Implementing End-to-End Pipeline
#### IV. Business Questions
#### V. Appendix

-------------------------------------------------------------------------------------------

## I. Project Description
As a data scientist at a game development company.  Your latest mobile game has several events the company is interested in tracking: 

1. `buy a sword` 
2. `buy a knife`
3. `buy a shield`
4. `join guild - Game_of_Thrones, Castle_of_Rock and Knights_of_the_Round_Table`
5. `fight - users may engage in a fight, this event captures if the user won or lost the fight`

- Each event has the following metadata:

(1) `buy a sword`;  
- userid: provided by the user generating the event.
- event_type: buy a sword
- name: excalibur
- strength: 1000 points
- number of purchases = provided by the user (integer)
- price = 2000 credits

(2) `buy a knife`;  
- userid: provided by the user generating the event.
- event_type: buy a knife
- name: kukri
- strength: 500 points
- number of purchases = provided by the user (integer)
- price = 1000 credits

(3) `buy a shield`;  
- userid: provided by the user generating the event.
- event_type: buy a shield
- name: parma.
- strength: 800 points,
- number of purchases = provided by the user (integer)
- price = 1500 credits

(4) `join guild`;  
- userid: provided by the user generating the event.
- event_type: Join Guild
- name: Three Guilds to Join in - Game_of_Thrones, Castle_of_Rock and Knights_of_the_Round_Table
- strength: 800 points,
- number of purchases = provided by the user (integer)
- price = 1500 credits

(5) `fight event`;  
- userid: provided by the user generating the event.
- event_type: fight event
- score: 0 points if user lost, 100 point if user won
- win_status = win or lost.

## Purpose
The purpose of this Project is to create a data pipeline (end to end).  In this pipeline, a mobile device is running a game which is generating events.  These events are being consumed using Kafka (events associated to a topic). The events are being captured by a streaming application which is writing the events in a file.  As the events are being stored in files, we will be reading the files (tables), analyzing the data, and answering relevant business questions (game related questions). 

Note: This data is synthetically generated using Apache Bench included in the file "data_generator.sh".

Through this we will be able to address the following game specific questions via querying from Presto.

1. How many swords, shields, and knifes were purchased?
2. Who are the top three players by money spent?
3. What item is the most popular (sword, shields or knifes)?
4. Which player has at least one sword, one knife and one shield?
5. Which player has the most strength as measure by the strength of purchased elements?
6. How many players have joined at least one guild?


## Deliverables

- docker-compose.yml file
- data_generator.sh
- game_api.py file
- write_events_stream.py file
- Jupyter Notebook (this notebook)

# II. Data Pipeline Architecture

The following diagram presents the structure of the pipeline being implemented, which includes four steps:

![Conceptual%20Flow%20Diagram%20Project%203.PNG](attachment:Conceptual%20Flow%20Diagram%20Project%203.PNG)

> **Step 1** Use Apache Bench to generate events (purchase, guild related events, fight events). Generated events will hit points in a Flas Application (gamie_api.py).

> **Step 2** The game event hitting the game_api are consumed using Kafka - as events hit the "game_api.py", they are logged to a kafka topic (event).

> **Step 3** Spark Streaming filters select event types from Kafka and land them on HDFS (write_events_stream.py)

> **Step 4** Querying HDFS tables using Presto query engine

The following sections present the specific steps and commands used to implement the pipeline.

# III. Steps for Implementing End-to-End Pipeline

## Step 1: Use Apache Bench to generate events (purchase, guild & fight events). 

Generated events will hit points in a Flas Application (gamie_api.py).

#### **(1). Activate Docker Compose Cluster:**

```bash
> docker-compose up -d
```

#### **(2). Run Game Application:**

```bash
> docker-compose exec mids env FLASK_APP=/w205/project-3-cmorenoUCB2021/game_api.py flask run --host 0.0.0.0
```

#### **(3). Set up to Watch Kafka:** 
to observe how messages are being captured. Open a new terminal, and run the following (run it twice, first time to create the topic):

```bash
> docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning
```

#### **(4). Use Apache Bench to generate data:**  

To generate events, we created a "data generator" **.sh** script which we called "**data_generator.sh**".  The details of the script are found in the Appendix section of this document.  This script has three required inputs and one optional. The descriptions of the required arguments are as follows:
- **u** = Number of Users (integer >1).  Use this argument to define the number of users that the game has.
- **e** = Number of endpoints (always set to 5) - this argument indicates the number of endpoints used in the game.  For this game we have five end points which are: purchase_a_sword, purchase_a_knife, purchase_a_shield, join_guild, and fight_event.
- **n** = Number of total requests (integer > 1). Use this argument to indicate how many events should be generated.

The script includes one optional argument (**b**).  When **b** is included (**-b**), the script will make calls to the game_api using **"Apache Bench"**.  When **b** is not included, the script will make calls to the game_api suing **"CURL"**.

The following lines present two examples of calls to the "data_generator.sh" script:

> **(a) Example of a call including option for Apache Bench:**
```bash
> bash data_generator.sh -u 10 -e 5 -n 100 -b
```

> **(b) Example of call without option for Apache Bench:**
```bash
> bash data_generator.sh -u 10 -e 5 -n 100
```

## Step 2: Consume events using Kafka

The game event hitting the game_api are consumed using Kafka - as events hit the "game_api.py", they are logged to a kafka topic (event).

The following sessions present the game_api.py application which reads the information from the game (data_generator), and the log the information into a kafka topic. The structure of the code is as follows:

**(a)** Import the required libraries, and set-up the variables.
```python
#!/usr/bin/env python
import json
from kafka import KafkaProducer
from flask import Flask, request

app = Flask(__name__)
producer = KafkaProducer(bootstrap_servers='kafka:29092')

```

**(b)**  Define routine that will log events to kafka (to topic called event)
```python
def log_to_kafka(topic, event):
    event.update(request.headers)
    producer.send(topic, json.dumps(event).encode())
    
```

**(c)** Define routines (functions) that dictate behavior depending on the api route being hit.  The api points are the following:
- default_response
- purchase_a_sword
- purchase_a_knife
- purchase_a_shield
- join_a_guild
- fight_event

The following code presents example for the **fight_event** api call: 

```python    
@app.route("/fight_event/", methods=['POST','GET'])
def fight_event():
    """
    @function: This function generate a Join Guild event from a user mobile device request or Apache Bench
    @param: User Request (via URL endpoint) 
    @return: Returns string of User Id and Event 
    """
    userid = request.args.get('userid', default='001', type=str)
    win_status = request.args.get('win_status', default="'won'", type=str)
    n = request.args.get('n',default=1,type=int)

    if win_status == "'lost'":
        score = "0"
    else: 
        score = "100"

    win_status = win_status.replace("'", '')
        
    fight_event = {'userid': userid,
                   'event_type': 'fight_event',
                   'win_status': win_status,
                   'score': score}
    log_to_kafka('events', fight_event)
    return "fight event - you:" +" "+ win_status +" "+ "Score: " + score+ "\n"
```

## Step 3: Spark Streaming/Save filtered data

Spark Streaming filters select event types from Kafka and land them on HDFS (write_events_stream.py)

#### **(5). Run Application to Read Messages from Kafka and write them to hdfs:** 
using separate terminals, run applications to read Messages from Kafka associated with the key events from the game as follows:  

```
> docker-compose exec spark spark-submit /w205/project-3-cmorenoUCB2021/write_events_stream.py
```

The streaming application (**write_events_stream.py**) has the following structure:

**(a) Define the schema for the events:** below please see example for purchase_events_schema.  

```python
def purchase_events_schema():
    """
    @function: This function provides the table schema for purchase events (knife, sword, shield)
    @param: None 
    @return: Returns the table schema for purchase events
    """  
    return StructType(
    [
        StructField('Accept', StringType(), True),
        StructField('Host', StringType(), True),
        StructField('User-Agent', StringType(), True),
        StructField('price', StringType(), True),
        StructField('n_purchased', LongType(), True),
        StructField('strength', StringType(), True),
        StructField('name', StringType(), True),
        StructField('event_type', StringType(), True),
        StructField('userid', StringType(), True)
    ]
)
```
**(b) Create a SparkSession:**   

```python
    spark = SparkSession \
        .builder \
        .appName("ExtractEventsJob") \
        .getOrCreate()

```
**(c) Read the Raw Events (from Kafka topic):**   

```python
    raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()
```

**(d) Filter Events for a specific Event:** the following code, filters the events related to purchases of items. 
```python
    purchases = raw_events \
        .filter(is_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_events_schema()).alias('json')) \
        .select('timestamp', 'json.*') \
        .select( \
#                   F.from_utc_timestamp(F.col('timestamp'),'GMT').alias('event_ts') \
                  F.col('timestamp').alias('event_ts') \
                 ,F.col('userid') \
                 ,F.col('Host') \
                 ,F.col('event_type') \
                 ,F.col('name') \
                 ,F.col('strength') \
                 ,F.col('n_purchased') \
                 ,F.col('price') \
                ) \
        .distinct()
    
# FUNCTION USED TO FILTER EVENTS RELATED TO PURCHASE OF OBJECTS (SWORD, KNIFE, SHIELD)
@udf('boolean')
def is_purchase(event_as_json):
    """
    @function: This function uses a json to filter out records by purchase event type (knife, sword, shield)
    @param: Takes in extracted json data as a string
    @return: Returns a boolean value
    """    
    event = json.loads(event_as_json)
    if 'purchase' in event['event_type']:
        return True
    return False
```

**(e) Write filtered events to file:** the following code saves the filtered events (purchase_events) into a file **"purchase_events"**. 

```python
    sink_purchases = purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_purchase_events") \
        .option("path", "/tmp/purchase_events") \
        .trigger(processingTime="10 seconds") \
        .start()

```





#### **(6). Check what was written in Hadoop:** 

The following command would check what was written for sword_purchases. 
Note: check files for each event (sanity check).
```
> docker-compose exec cloudera hadoop fs -ls /tmp/purchase_events
> docker-compose exec cloudera hadoop fs -ls /tmp/guild_events
> docker-compose exec cloudera hadoop fs -ls /tmp/fight_events
```


## Step 4 Querying HDFS tables using Presto query engine

#### **(7). Create tables scheme within hive:**

**(a).** Run hive in hadoop container:*

```bash
> docker-compose exec cloudera hive
```

**(b).** Create tables: within hive.

**purchase_events**

```bash
create external table if not exists default.purchase_events (
    event_ts string,
    userid string,
    Host string,
    event_type string,
    name string,
    strength int,
    n_purchased int,
    price int
  )
  stored as parquet 
  location '/tmp/purchase_events'
  tblproperties ("parquet.compress"="SNAPPY");
```
```bash
create external table if not exists default.purchase_events (event_ts string, userid string, Host string, event_type string, name string, strength int, n_purchased int, price int) stored as parquet location '/tmp/purchase_events' tblproperties ("parquet.compress"="SNAPPY");

```

**guild_events**  
```bash
create external table if not exists default.guild_events (
    event_ts string,
    userid string,
    Host string,
    event_type string,
    name string,
    strength int,
    n_purchased int,
    price int
  )
  stored as parquet 
  location '/tmp/guild_events'
  tblproperties ("parquet.compress"="SNAPPY");
```
```bash
create external table if not exists default.guild_events (event_ts string, userid string, Host string, event_type string, name string, strength int, n_purchased int, price int) stored as parquet location '/tmp/guild_events' tblproperties ("parquet.compress"="SNAPPY");
```

**fight_events**  
```bash
create external table if not exists default.fight_events (
    event_ts string,
    userid string,
    Host string,
    event_type string,
    score int,
    win_status string
  )
  stored as parquet 
  location '/tmp/fight_events'
  tblproperties ("parquet.compress"="SNAPPY");
```
```bash
create external table if not exists default.fight_events (event_ts string, userid string, Host string, event_type string, score int, win_status string) stored as parquet location '/tmp/fight_events' tblproperties ("parquet.compress"="SNAPPY");
```

**Note:** `ctrl-D` to exit the hive shell.

#### **(8). Query Tables with Presto:**  

&nbsp;&nbsp;&nbsp;**(a). Run Presto:**

```bash
> docker-compose exec presto presto --server presto:8080 --catalog hive --schema default
```

&nbsp;&nbsp;&nbsp;**(b). Examples of Queries with Presto:**

&nbsp;&nbsp;&nbsp;&nbsp;-What tables are there in Presto?

```bash
> presto:default> show tables;
```

&nbsp;&nbsp;&nbsp;&nbsp;-Describe one of the tables (sword_purchases):
```bash
> presto:default> describe purchase_events;
> presto:default> describe guild_events;
> presto:default> describe fight_events;
```

&nbsp;&nbsp;&nbsp;&nbsp;-Query `purchases` table:  
```bash
> presto:default> select * from purchase_events;
> presto:default> select * from guild_events;
```
&nbsp;&nbsp;&nbsp;&nbsp;-Count the number of events in `purchases` table:  
```bash
> presto:default> select count(*) from purchase_events;
> presto:default> select count(*) from guild_events;
```

#### **(9). HOW TO RUN QUERIES USING PYSPARK?**
- ** (a) Run spark to make sure it is available when running Jupyter Notebook:**

```bash
> docker-compose exec spark ln -s /w205 w205
```

- **(b) Run Jupyter Notebook in Google Cloud to read kafka topic and explore data:**

```bash
> docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 7000 --ip 0.0.0.0 --allow-root' pyspark
```

- **(c) Get the token and include the address for your notebook instance in Google Cloud:** For example:

```bash
> http://0.0.0.0:7000/?token=9d42832da7a128cbc081a3eb4a5f30b70c853987d8666395
```
Replace 0.0.0.0 with the address associated to your Google Cloud Instance.

34.139.108.62

# Business Questions:

**(1) What is the most popular guild (by number of members)?**  

**Presto Query:**  
```sql
> select name, count(*) as members from guild_events group by name order by members desc;
```
> Output:

```
            name            | members 
----------------------------+---------
 Knights_of_the_Round_Table |      39 
 Castle_of_Rock             |      38 
 Game_of_Thrones            |      31 
(3 rows)
```

```sql
> select name, count(*) as members from guild_events where name = 'Game_of_Thrones' group by name order by members desc;
```


**(2) What users won the most times?** User with more than 100 wins will get special treatment.

**Presto Query:**
```sql
> select userid, win_status, count(*) as wins from fight_events where win_status='won' group by userid, win_status order by wins desc;
```
```
  userid   | win_status | wins 
-----------+------------+------
 user-004  | won        |    6 
 user-003  | won        |    5 
 user-005  | won        |    4 
 user-0010 | won        |    4 
 user-008  | won        |    4 
 user-002  | won        |    2 
 user-007  | won        |    2 
 user-006  | won        |    2 
 user-009  | won        |    1 
(9 rows)
```

**2.a What users lost the most times?**

```sql
> select userid, win_status, count(*) as losts from fight_events where win_status='lost' group by userid, win_status order by losts desc;
```
```
  userid   | win_status | losts 
-----------+------------+-------
 user-005  | lost       |     7 
 user-009  | lost       |     6 
 user-008  | lost       |     6 
 user-003  | lost       |     5 
 user-002  | lost       |     5 
 user-001  | lost       |     4 
 user-006  | lost       |     4 
 user-007  | lost       |     4 
 user-0010 | lost       |     3 
 user-004  | lost       |     1 
(10 rows)
```

**(3) What are the least purchased items?** 

**Presto Query:**
```sql
> select event_type, count(*) as number from purchase_events group by event_type order by number asc;
```

```
   event_type    | number 
-----------------+--------
 purchase_shield |    171 
 purchase_sword  |    175 
 purchase_knife  |    227 
(3 rows)
```

**(4) What item generated the most sales (tokens)?** 

**Presto Query:**
```sql
> select event_type, count(*) as number, sum(price) as revenue from purchase_events group by event_type order by revenue desc;
```

```
   event_type    | number | revenue 
-----------------+--------+---------
 purchase_sword  |    175 |  350000 
 purchase_shield |    171 |  256500 
 purchase_knife  |    227 |  227000 
(3 rows)
```

**(5) Which players have the highest score after fighting?** 

**Presto Query:**
```sql
> select userid, sum(score) as total_score from fight_events group by userid order by total_score desc;
```
```
  userid   | total_score 
-----------+-------------
 user-009  |         400 
 user-003  |         300 
 user-001  |         300 
 user-005  |         300 
 user-007  |         300 
 user-0010 |         200 
 user-004  |         100 
 user-002  |         100 
 user-006  |         100 
 user-008  |         100 
(10 rows)
```



## APPENDIX

### Appendix A: Docker Compose Content

```bash
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
    expose:
      - "2181"
      - "2888"
      - "32181"
      - "3888"
    extra_hosts:
      - "moby:127.0.0.1"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    expose:
      - "9092"
      - "29092"
    extra_hosts:
      - "moby:127.0.0.1"

  cloudera:
    image: midsw205/hadoop:0.0.2
    hostname: cloudera
    expose:
      - "8020" # nn
      - "8888" # hue
      - "9083" # hive thrift
      - "10000" # hive jdbc
      - "50070" # nn http
    ports:
      - "8888:8888"
    extra_hosts:
      - "moby:127.0.0.1"

  spark:
    image: midsw205/spark-python:0.0.6
    stdin_open: true
    tty: true
    volumes:
      - ~/w205:/w205
    expose:
      - "8888"
    ports:
      - "8889:8888" # 8888 conflicts with hue
    depends_on:
      - cloudera
    environment:
      HADOOP_NAMENODE: cloudera
      HIVE_THRIFTSERVER: cloudera:9083
    extra_hosts:
      - "moby:127.0.0.1"
    command: bash

  presto:
    image: midsw205/presto:0.0.1
    hostname: presto
    volumes:
      - ~/w205:/w205
    expose:
      - "8080"
    environment:
      HIVE_THRIFTSERVER: cloudera:9083
    extra_hosts:
      - "moby:127.0.0.1"

  mids:
    image: midsw205/base:0.1.9
    stdin_open: true
    tty: true
    volumes:
      - ~/w205:/w205
    expose:
      - "5000"
    ports:
      - "5000:5000"
    extra_hosts:
      - "moby:127.0.0.1"
```

### Appendix B: Game Application - game_api.py

```python
#!/usr/bin/env python
import json
from kafka import KafkaProducer
from flask import Flask, request

app = Flask(__name__)
producer = KafkaProducer(bootstrap_servers='kafka:29092')

def log_to_kafka(topic, event):
    event.update(request.headers)
    producer.send(topic, json.dumps(event).encode())
    

@app.route("/")
def default_response():
    default_event = {'event_type': 'default',
                     'name': 'doing_nothing',
                     'strength':'NA',
                     'price': 'NA'}
    log_to_kafka('events', default_event)
    return "What are you waiting for?\n"


@app.route("/purchase_a_sword/", methods=['POST','GET'])
def purchase_a_sword():
    """
    @function: This function generate a Purchase a Sword event from a user mobile device request or Apache Bench
    @param: User Request (via URL endpoint) 
    @return: Returns string of User Id and Event 
    """

    userid = request.args.get('userid', default='001', type=str)
    n = request.args.get('n',default=1,type=int)
    purchase_sword_event = {'userid':userid,
                            'event_type': 'purchase_sword',
                            'name': 'excalibur',
                            'strength': '1000',
                            'n_purchased': n,
                            'price': '2000'}
    log_to_kafka('events', purchase_sword_event)
    return "USER " + userid + ": "+ str(n)+" "+ " Sword(s) Purchased!\n"


@app.route("/join_guild/", methods=['POST','GET'])
def join_guild():
    """
    @function: This function generate a Join Guild event from a user mobile device request or Apache Bench
    @param: User Request (via URL endpoint) 
    @return: Returns string of User Id and Event 
    """
    userid = request.args.get('userid', default='001', type=str)
    guild_name = request.args.get('guild_name', default="'Knights_of_the_Round_Table'", type=str)
    n = request.args.get('n',default=1,type=int)

    if guild_name == "'Game_of_Thrones'":
        price = "2000"
        strength = "1500"
    elif guild_name == "'Castle_of_Rock'":
        price = "1000"
        strength = "1200"
    else: 
        price = "3000"
        strength = "5000"
        
    guild_name = guild_name.replace("'", '')
    join_guild_event = {'userid': userid,
                        'event_type': 'join_guild',
                        'name': guild_name,
                        'strength': strength,
                        'n_purchased': 1,
                        'price': price}
    log_to_kafka('events', join_guild_event)
    return "Joined" +" "+ guild_name +" "+ "Guild!\n"


@app.route("/purchase_a_knife/", methods=['POST','GET'])
def purchase_a_knife():
    userid = request.args.get('userid', default='001', type=str)
    n = request.args.get('n',default=1,type=int)
    purchase_knife_event = {'userid': userid,
                            'event_type': 'purchase_knife',
                            'name': 'kukri',
                            'strength': '500',
                            'n_purchased': n,
                            'price': '1000'}
    log_to_kafka('events', purchase_knife_event)
    return "USER " + userid + ": "+ str(n)+" "+ " Kniefe(s) Purchased!\n"


@app.route("/purchase_a_shield/", methods=['POST','GET'])
def purchase_a_shield():
    
    userid = request.args.get('userid', default='001', type=str)
    n = request.args.get('n',default=1,type=int)
    purchase_shield_event = {'userid': userid,
                             'event_type': 'purchase_shield',
                             'name': 'parma',
                             'strength': '800',
                             'n_purchased': n,
                             'price': '1500'}
    log_to_kafka('events', purchase_shield_event)
    return "USER " + userid + ": "+ str(n)+" "+ " Shield(s) Purchased!\n"

@app.route("/fight_event/", methods=['POST','GET'])
def fight_event():
    """
    @function: This function generate a Join Guild event from a user mobile device request or Apache Bench
    @param: User Request (via URL endpoint) 
    @return: Returns string of User Id and Event 
    """
    userid = request.args.get('userid', default='001', type=str)
    win_status = request.args.get('win_status', default="'won'", type=str)
    n = request.args.get('n',default=1,type=int)

    if win_status == "'lost'":
        score = "0"
    else: 
        score = "100"

    win_status = win_status.replace("'", '')
        
    fight_event = {'userid': userid,
                   'event_type': 'fight_event',
                   'win_status': win_status,
                   'score': score}
    log_to_kafka('events', fight_event)
    return "fight event - you:" +" "+ win_status +" "+ "Score: " + score+ "\n"
```

### Appendix C: Applications to Extract events from kafka and write them to hdfs

```python
#!/usr/bin/env python
"""Extract events from kafka and write them to hdfs
"""
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

import pyspark.sql.functions as F
from datetime import datetime
from datetime import date


def purchase_events_schema():
    """
    @function: This function provides the table schema for purchase events (knife, sword, shield)
    @param: None 
    @return: Returns the table schema for purchase events
    """  
    return StructType(
    [
        StructField('Accept', StringType(), True),
        StructField('Host', StringType(), True),
        StructField('User-Agent', StringType(), True),
        StructField('price', StringType(), True),
        StructField('n_purchased', LongType(), True),
        StructField('strength', StringType(), True),
        StructField('name', StringType(), True),
        StructField('event_type', StringType(), True),
        StructField('userid', StringType(), True)
    ]
)

# LongType()

def guild_events_schema():
    """
    @function: This function provides the table schema for  guild events
    @param: None 
    @return: Returns the table schema for guild events 
    """  
    return StructType(
    [
        StructField('Accept', StringType(), True),
        StructField('Host', StringType(), True),
        StructField('User-Agent', StringType(), True),
        StructField('price', StringType(), True),
        StructField('n_purchased', LongType(), True),
        StructField('strength', StringType(), True),
        StructField('name', StringType(), True),
        StructField('event_type', StringType(), True),
        StructField('userid', StringType(), True)
    ]
)

def fight_events_schema():
    """
    @function: This function provides the table schema for  fight events
    @param: None 
    @return: Returns the table schema for fight events 
    """  
    return StructType(
    [
        StructField('Accept', StringType(), True),
        StructField('Host', StringType(), True),
        StructField('User-Agent', StringType(), True),
        StructField('score', StringType(), True),
        StructField('win_status', StringType(), True),
        StructField('event_type', StringType(), True),
        StructField('userid', StringType(), True)
    ]
)


@udf('boolean')
def is_purchase(event_as_json):
    """
    @function: This function uses a json to filter out records by purchase event type (knife, sword, shield)
    @param: Takes in extracted json data as a string
    @return: Returns a boolean value
    """    
    event = json.loads(event_as_json)
    if 'purchase' in event['event_type']:
        return True
    return False

@udf('boolean')
def is_join_guild(event_as_json):
    """
    @function: This function uses a json to filter out records by guild event type
    @param: Takes in extracted json data as a string
    @return: Returns a boolean value
    """   
    event = json.loads(event_as_json)
    if event['event_type'] == 'join_guild':
        return True
    return False

@udf('boolean')
def is_fight_event(event_as_json):
    """
    @function: This function uses a json to filter out records by fight event type
    @param: Takes in extracted json data as a string
    @return: Returns a boolean value
    """   
    event = json.loads(event_as_json)
    if event['event_type'] == 'fight_event':
        return True
    return False

def main():

    """
    @main function: This is a main function that executes a spark job - extracting string, parsing json using a provided schema, etc.
    @param: none, uses previously defined functions
    @return: none, lands tables via streaming on HDFS
    """ 
    
    spark = SparkSession \
        .builder \
        .appName("ExtractEventsJob") \
        .getOrCreate()

    raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

    purchases = raw_events \
        .filter(is_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_events_schema()).alias('json')) \
        .select('timestamp', 'json.*') \
        .select( \
#                   F.from_utc_timestamp(F.col('timestamp'),'GMT').alias('event_ts') \
                  F.col('timestamp').alias('event_ts') \
                 ,F.col('userid') \
                 ,F.col('Host') \
                 ,F.col('event_type') \
                 ,F.col('name') \
                 ,F.col('strength') \
                 ,F.col('n_purchased') \
                 ,F.col('price') \
                ) \
        .distinct()
    
    guild = raw_events \
        .filter(is_join_guild(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          guild_events_schema()).alias('json')) \
        .select('timestamp', 'json.*') \
        .select( \
#                   F.from_utc_timestamp(F.col('timestamp'),'GMT').alias('event_ts') \
                  F.col('timestamp').alias('event_ts') \
                 ,F.col('userid') \
                 ,F.col('Host') \
                 ,F.col('event_type') \
                 ,F.col('name') \
                 ,F.col('strength') \
                 ,F.col('n_purchased') \
                 ,F.col('price') \
                ) \
        .distinct()

    fight = raw_events \
        .filter(is_fight_event(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          fight_events_schema()).alias('json')) \
        .select('timestamp', 'json.*') \
        .select( \
                  F.col('timestamp').alias('event_ts') \
                 ,F.col('userid') \
                 ,F.col('Host') \
                 ,F.col('event_type') \
                 ,F.col('score') \
                 ,F.col('win_status') \
                ) \
        .distinct()
    
    
    sink_purchases = purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_purchase_events") \
        .option("path", "/tmp/purchase_events") \
        .trigger(processingTime="10 seconds") \
        .start()
    
    sink_guild = guild \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_guild_events") \
        .option("path", "/tmp/guild_events") \
        .trigger(processingTime="10 seconds") \
        .start()

    sink_fight = fight \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_fight_events") \
        .option("path", "/tmp/fight_events") \
        .trigger(processingTime="10 seconds") \
        .start()
        
    sink_purchases.awaitTermination()
    sink_guild.awaitTermination()
    sink_fight.awaitTermination()


if __name__ == "__main__":
    main()
```

### Appendix D: Data Generator Script (data_genarator.sh)

```bash
#! /usr/bin/bash

# usage: ./data_generator.sh -u 15 -e 5 -n 20 -b

helpFunction()
{
   echo ""
   echo "Welcome CARLOS 2020 to mids 205 Project 3 synthetic data generator"
   echo "Usage: $0 -u NOOFUSERS -e ENDPOINTS -n GENERATEREQS"
   echo -e "\t-u Number of users"
   echo -e "\t-e Number of endpoints"
   echo -e "\t-n Number of total requests"
   echo -e "\t-b (Optional) use Apache Bench to send the requests to flask, uses no args just the flag"
   exit 1 # Exit script after printing help
}

while getopts "u:e:n:b" opt
do
   case "$opt" in
      u ) NOOFUSERS="$OPTARG" ;;
      e ) ENDPOINTS="$OPTARG" ;;
      n ) GENERATEREQS="$OPTARG" ;;
      b ) ABFLAG="SET" ;;
      ? ) helpFunction ;; # Print helpFunction in case parameter is non-existent
   esac
done

# Print helpFunction in case parameters are empty
if [ -z "$NOOFUSERS" ] || [ -z "$ENDPOINTS" ] || [ -z "$GENERATEREQS" ]
then
   echo "Some or all of the parameters are empty";
   helpFunction
fi


## ** Set limits to per user items
REQS=0
MAXNOOFSWORDS=10 # Max swords purchased at a time per user
MAXNOOFSHIELDS=10 # max shields purchased at a time per user
MAXNOOFKNIFES=10 # max potions purchased at a time per user
MAXGUILDS=3 # max number of guilds joined at a time per user
CONCURRENTUSERS=1 #max users accessing the flask API (* Cannot use concurrency level greater than total number of requests [CONCURRENTUSERS < GENERATEREQS] )
RANDVAR=2

## Event Types will be randomly assigned to a number between 1 and 9 based on endpoints specified.
# 1) Purchase a Sword
# 2) Purchase a Shield
# 3) Purchase a Knife
# 4) Join a Guild - this randomly generate 1 of three guild names Game of Thrones, Castle of Rock, and Knights of the Round table
# 5) fight_event

## ** Check if apache bench optional param is passed 
if [ "$ABFLAG" ]
then
    echo "Apache Bench flag is $ABFLAG";
    # docker exec -it project-3-elizkhan_mids_1 ab -n 2 -H "Host: liz.comcast.com" 'http://localhost:5000/purchase_a_potion/?userid=002&n=10' ## works
    # docker-compose exec mids ab -n 2 -H "Host: liz.comcast.com" http://localhost:5000/ #does not work in windows
    until [ $REQS -gt $GENERATEREQS ]; do
        ID=$(( ( RANDOM % $NOOFUSERS )  + 1 ))
        EP=$(( ( RANDOM % $ENDPOINTS )  + 1 ))
        NOOFSWORDS=$(( ( RANDOM % $MAXNOOFSWORDS )  + 1 ))
        NOOFSHIELDS=$(( ( RANDOM % $MAXNOOFSHIELDS )  + 1 ))
        NOOFKNIFES=$(( ( RANDOM % $MAXNOOFKNIFES )  + 1 ))
        case $EP in
            1)
            docker-compose exec mids ab -n 5 -H "Host: user-00$ID.comcast.com" "http://localhost:5000/purchase_a_sword/?userid=%27user-00$ID%27&n=$NOOFSWORDS"
            ;;
        esac
        case $EP in
            2)
            docker-compose exec mids ab -n 4 -H "Host: user-00$ID.comcast.com" "http://localhost:5000/purchase_a_shield/?userid=%27user-00$ID%27&n=$NOOFSHIELDS"
            ;;
        esac
        case $EP in
            3)
            docker-compose exec mids ab -n 4 -H "Host: user-00$ID.comcast.com" "http://localhost:5000/purchase_a_knife/?userid=%27user-00$ID%27&n=$NOOFKNIFES"
            ;;
        esac    
        case $EP in
            4)
              GUILDID=$(( ( RANDOM % $MAXGUILDS )  + 1 ))
              case $GUILDID in
                1)
                docker-compose exec mids ab -n 2 -H "Host: user-00$ID.comcast.com" "http://localhost:5000/join_guild/?userid=%27user-00$ID%27"
                ;;
                2)
                docker-compose exec mids ab -n 2 -H "Host: user-00$ID.comcast.com" "http://localhost:5000/join_guild/?userid=%27user-00$ID%27&guild_name=%27Game_of_Thrones%27"
                ;;
                3)
                docker-compose exec mids ab -n 2 -H "Host: user-00$ID.comcast.com" "http://localhost:5000/join_guild/?userid=%27user-00$ID%27&guild_name=%27Castle_of_Rock%27"
                ;;
              esac
        esac
        case $EP in
                5)
                WINFLAG=$(( ( RANDOM % $RANDVAR ) + 1))
                case $WINFLAG in
                   1)
                   docker-compose exec mids ab -n 2 -H "Host: user-00$ID.comcast.com" "http://localhost:5000/fight_event/?userid=%27user-00$ID%27&win_status=%27lost%27"
                   ;;
                   2)
                   docker-compose exec mids ab -n 2 -H "Host: user-00$ID.comcast.com" "http://localhost:5000/fight_event/?userid=%27user-00$ID%27&win_status=%27won%27"               
                   ;;
                 esac
        esac
        let REQS=REQS+1
    done
else
    until [ $REQS -gt $GENERATEREQS ]; do
        ID=$(( ( RANDOM % $NOOFUSERS )  + 1 ))
        EP=$(( ( RANDOM % $ENDPOINTS )  + 1 ))
        NOOFSWORDS=$(( ( RANDOM % $MAXNOOFSWORDS )  + 1 ))
        NOOFSHIELDS=$(( ( RANDOM % $MAXNOOFSHIELDS )  + 1 ))
        NOOFKNIFES=$(( ( RANDOM % $MAXNOOFKNIFES )  + 1 ))
        case $EP in
            1)
            docker-compose exec mids curl "http://localhost:5000/purchase_a_sword/?userid=%27user-00$ID%27&n="$NOOFSWORDS
            ;;
        esac
        case $EP in
            2)
            docker-compose exec mids curl "http://localhost:5000/purchase_a_shield/?userid=%27user-00$ID%27&n="$NOOFSHIELDS
            ;;
        esac
        case $EP in
            3)
            docker-compose exec mids curl "http://localhost:5000/purchase_a_knife/?userid=%27user-00$ID%27&n="$NOOFKNIFES
            ;;
        esac
        case $EP in
            4)
            GUILDID=$(( ( RANDOM % $MAXGUILDS )  + 1 ))
#             docker-compose exec mids curl "Host: user-00$ID.comcast.com" "http://localhost:5000/join_guild/?userid=%27user-00$ID%27&n=$GUILDID"
              case $GUILDID in
                1)
                docker-compose exec mids curl "Host: user-00$ID.comcast.com" "http://localhost:5000/join_guild/?userid=%27user-00$ID%27"
                ;;
                2)
                docker-compose exec mids curl "Host: user-00$ID.comcast.com" "http://localhost:5000/join_guild/?userid=%27user-00$ID%27&guild_name=%27Game_of_Thrones%27"
                ;;
                3)
                docker-compose exec mids curl "Host: user-00$ID.comcast.com" "http://localhost:5000/join_guild/?userid=%27user-00$ID%27&guild_name=%27Castle_of_Rock%27"
                ;;
              esac
        esac
        case $EP in
            5)
             WINFLAG=$(( ( RANDOM % $RANDVAR ) + 1))
             case $WINFLAG in
               1)
               docker-compose exec mids curl "Host: user-00$ID.comcast.com" "http://localhost:5000/fight_event/?userid=%27user-00$ID%27&win_status=%27lost%27"
               ;;
               2)
               docker-compose exec mids curl "Host: user-00$ID.comcast.com" "http://localhost:5000/fight_event/?userid=%27user-00$ID%27&win_status=%27won%27"
               ;;
             esac
        esac
        let REQS=REQS+1
    done
fi
```

#### Useful References
- https://towardsdatascience.com/jupyter-magics-with-sql-921370099589

**Imports for working with Presto:**
> pip install pandas

> pip install sqlalchemy # ORM for databases

> pip install ipython-sql # SQL magic function