# **Project 3 - Building a Game**

## **Pipeline Details**
***Flow overview: Generate app server requests with Flask $\rightarrow$ Kafka queue $\rightarrow$ stream processing of events in Kafka using Spark $\rightarrow$ land the processes into HDFS $\rightarrow$ use Presto to query from tables in HDFS***

### docker-compose.yml Content Overview

The setup allows us to .
In addition to the setup in project 2 ... ,
- Cloudera is now listening on more ports to allows other services such as HUE or Hive connection. HUE essntially allows us to get GUI of information related to HDFS, and port connection "8888:8888" is specified under this container to ensure this works. Cloudera is also listening on port "9083" for Hive.
- Spark connects to port "8888" via host port "8889" since "8888" is already occupied by Cloudera for HUE and each host port only supports one container. Spark now also connects to Hive, and that connection is specified in the ```HIVE_THRIFTSERVER``` name under ```environment```. Hive is a metastore that is part of the Hadoop ecosystem, and it gives query tools like Presto access to tables. Hive is similar to Zookeeper in the sense that it is a "phonebook" that will tell you what tables exist.
- Presto is our query engine scalable to large datasets, allowing us to run SQL queries to derive business insights. The image specifies that we're using version 0.0.1 of Presto, and ```HIVE_THRIFTSERVER: cloudera:9083``` specified under ```environment``` establishes connection between Presto and Hive. This allows the presto container to submit requests to Hive so we can query Hive tables in Presto.

### Creating in-game events and run Flask server
*Imititating how an app server captures user events in the real world*

- **Step 1: Add "join guild," "slay a dragon," and "catch a butterfly" into game_api.py, the Flask server python script that serves as our web app server.**
    - Example code w/ explanation in comments:
    ```
    app = Flask(__name__) #create Flask app
    producer = KafkaProducer(bootstrap_servers='kafka:29092') #connect the app to Kafka via port "29092" so we can send events to the Kafka topic

    #function for sending event information and meta-information (eg. user host name) to specified Kafka topic
    def log_to_kafka(topic, event):
        event.update(request.headers) #attach headers of the http requests (ie. meta-info) to the event itself
        producer.send(topic, json.dumps(event).encode()) #send json format events to Kafka topic with its name specified by argument "topic"
    
    @app.route("/slay_a_dragon") #create a path for in server for this event by calling a higher order function that lets this function to be routed to this path
    def slay_a_dragon():
        slay_dragon_event = {'event_type': 'slay_dragon'} #specify the event_type so we know later which events we read in
        log_to_kafka('events', slay_dragon_event)
        return "Dragon Slayed!\n"

    @app.route("/catch_a_butterfly") #change path for each different event
    def catch_a_butterfly():
        catch_butterfly_event = {'event_type': 'catch_butterfly'} #change event type name for each event
        log_to_kafka('events', catch_butterfly_event) #log to Kafka as a catching butterfly event
        return "Butterfly Captured!\n"
    ```


- **Step 2: ```docker-compose up -d```**
    - Spin up all the containers and run them in background.
    
    
- **Step 3: ```docker-compose exec mids env FLASK_APP=/w205/project-3-MeerWu/game_api.py flask run --host 0.0.0.0```**
    - Run the Flask server using the ```game_api``` Python script in my project 3 folder. This is our web app server.
    - Note: In actual sequence, I generated a Kafka topic before running this line since this holds the terminal up.

### Organize events into tables in Hive
*Filter events, format dataframe schema, and write to HDFS through Spark in stream before registering the dataframes as tables in Hive*

**Note: we run this in terminal before generating simulated user events through Apache Bench because we want to be able to capture and organize all data right from the beginning**

- **Step 4: Read in data in stream using Spark then filter and format the dataframes.**
    - Code w/ explanations commented:
    ```
    #function for filtering events. It returns True if the event is "join_guild" and False otherwise. Have one of these for every event.
    @udf('boolean') #specifies that this function is a user-defined function that returns boolean values.
    def is_join_guild(event_as_json):
        event = json.loads(event_as_json) #load event information in json format
        if event['event_type'] == 'join_guild': #check if event type is the type we want to filter in
            return True
        return False
    
    def main():
    """main
    """
        #start up Spark Session
        spark = SparkSession \
            .builder \
            .appName("ExtractEventsJob") \
            .enableHiveSupport() \
            .getOrCreate()
        
        #read in data in stream from Kafka topic "events" then load it as a dataframe
        raw_events = spark \
            .readStream \ #we read in data in stream, meaning we are reading events from Kafka as they come in
            .format("kafka") \
            .option("kafka.bootstrap.servers", "kafka:29092") \
            .option("subscribe", "events") \
            .load()

        #this block of code filters events and formats the datafrmaes into the schema we want. Have one of these for every event.
        join_guilds = raw_events \
            .filter(is_join_guild(raw_events.value.cast('string'))) \ #take the value, which has the json data, and filter for only "join_guild" events using the udf.
            .select(raw_events.value.cast('string').alias('raw_event'), #cast the format of raw events (json format) to string b/c we don't want the inferred schema.
                    raw_events.timestamp.cast('string'), #cast Kafka timestamp type (timestap for when the event arrived at Kafka) to type string
                    from_json(raw_events.value.cast('string'), #from_json helps extract json data (our raw events are json_data)
                              purchase_sword_event_schema()).alias('json')) \ #purchase_sword_event_schema: function specifying our data schema so nothing's inferred.
            .select('raw_event', 'timestamp', 'json.*') #since all events have the same json data setup, we can use the same schema for all events.
   ```


- **Step 5: Register the dataframes as tables in Hive.**
    - Code w/ explanations commented:
    
    ```
    sql_strings = []
    spark.sql("drop table if exists join_guilds") #drop the table if it already exists so we don't get error messages
    
    #SQL code for creating the Hive table for the "join_guild" event with specified schema and landing the data into HDFS
    sql_strings.append("""
        create external table if not exists join_guilds (
            raw_event string,
            timestamp string,
            Accept string,
            Host string,
            `User-Agent` string,
            event_type string
            )
            stored as parquet
            location '/tmp/join_guilds'
            tblproperties ("parquet.compress"="SNAPPY")
            """)
   spark.sql(sql_string[0])
   ```
 

- **Step 6: Write data in stream as parquet files into HDFS.**
    - Code w/ explanations commented:
    
    ```
    #create Hive entry and land data to HDFS in stream (ie. continuously, as data come in) as parquet files. This process is repeated every 10 seconds.
    sink_guild = join_guilds \
        .writeStream \
        .format("parquet") \ 
        .option("checkpointLocation", "/tmp/checkpoints_for_join_guilds") \
        .option("path", "/tmp/join_guilds") \ 
        .trigger(processingTime="10 seconds") \ 
        .start()
        
    spark.streams.awaitAnyTermination() #since I have 4 sinks, I have to just wait for any termination instead of specify to await termination for one sink.   
   ```


- **Step 7: ```docker-compose exec spark spark-submit /w205/project-3-MeerWu/spark_stream_and_hive.py```**
    - run the Python script with code for reading and writing in stream with Spark as well as registering tables to Hive
    - ```spark-submit```

### Generating user events and logging user events into Kafka
*Simulate real-world user events using Apache Bench & write the events into Kafka (the writing is done in game_api.py)*

- **Step 8: ```docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181```**
    - Create a kafka topic to read in all events
 
 
- **Step 9: ```docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning```**
    - Use kafkacat to read/consume test data or simulated user events as they come in before generating them with Apache Bench.
    - There is no ```-e``` for signaling when to stop reading since we're continuously reading in data.
    - As a result, this line holds up a terminal.


- **Step 10: Generate artificial user events that's written into Kafka using Apache Bench.** 

   ```while true; do for ((i=1; i<=3; i++)); do docker-compose exec mids ab -n 2 -H "Host: user$i.datafun.com" http://localhost:5000/purchase_a_sword; done; sleep 10; done```
    
   ```while true; do for ((i=1; i<=3; i+=2)); do docker-compose exec mids ab -n 2 -H "Host: user$i.datafun.com" http://localhost:5000/join_guild; done; sleep 10; done```
  
   ```while true; do for ((i=1; i<=3; i++)); do docker-compose exec mids ab -n 1 -H "Host: user$i.datafun.com" http://localhost:5000/slay_a_dragon; done; sleep 10; done```
  
   ```while true; do for ((i=2; i<=3; i++)); do docker-compose exec mids ab -n 2 -H "Host: user$i.datafun.com" http://localhost:5000/catch_a_butterfly; done; sleep 10; done```
    - For example, first command generates 2 sword_purchase event requests for 3 users at a time and waits 10 seconds before repeating the same thing again.
        - This is done by having a for loop that loops through each "user" in a while loop that runs forever.
        - Just for fun, I made odd-numbered users join guilds. All users purchased swords, slay dragons, and catch butterflies. However, all users slay dragons half as often as they catch butterflies or purchase swords.
        - ```ab```: signals to the mids container that we're using Apache Bench
        - ```-n```: indicates how many times allows us to specify to Apache Bench how many requests we want to make (in this case, 10 requests).
        - ```-H```: changes the header of the request (a meta-information of this request). This allows us to simulate having many different users.
        - ```http://localhost:5000/purchase_a_sword```: path to the sword_purchase event, specified in game_api.py (the game API python script)
            - Chaging the last path "purchase_a_sword" to other event paths such as "join_guild" allows us to generate different events like joining a guild.
    - Each of these command lines holds on a terminal, so this needs 4 different terminals.

### Query the Data using Presto
*See next section: **Game Data Analysis** for actual queries that answer business insights*

```
```


- **Step 11: ```docker-compose exec presto presto --server presto:8080 --catalog hive --schema default```**
    - Launch Presto interactive interface in terminal.
    - ```--server presto:8080```: specifies Presto server location. We want to make sure we're connecting from the Presto container so we must specify this.
    - ```--catalog hive```: references Hive as the data source for Presto. Allows us to query from Hive tables in Presto.
    - ```--schema default```: tells Presto that we're using the default way to organize tables.


- **Step 12: In presto . . . Show all the tables that are currently registered in Hive.**
   
   ```
    presto:default> show tables;
       Table       
    -------------------
     catch_butterflies 
     join_guilds       
     slay_dragons      
     sword_purchases   
    (4 rows)
   ```
    - Helps make sure that the tables I want to create are actually created in Hive.


- **Step 13: In presto . . . Show what columns there are in each of the tables.**

    ```
    presto:default> describe catch_butterflies;
       Column   |  Type   | Comment 
    ------------+---------+---------
     raw_event  | varchar |         
     timestamp  | varchar |         
     accept     | varchar |         
     host       | varchar |         
     user-agent | varchar |         
     event_type | varchar |         
    (6 rows)
    
    presto:default> describe join_guilds;
       Column   |  Type   | Comment 
    ------------+---------+---------
     raw_event  | varchar |         
     timestamp  | varchar |         
     accept     | varchar |         
     host       | varchar |         
     user-agent | varchar |         
     event_type | varchar |         
    (6 rows)

    presto:default> describe slay_dragons;
       Column   |  Type   | Comment 
    ------------+---------+---------
     raw_event  | varchar |         
     timestamp  | varchar |         
     accept     | varchar |         
     host       | varchar |         
     user-agent | varchar |         
     event_type | varchar |         
    (6 rows)
   ```
   - Again verifies that everything formatted in the Python script is still intact.


- **Step 14: In presto . . . Check that the tables are being updated as data come in.**

    ```
    presto:default> select count(*) from sword_purchases;
     _col0 
    -------
       198 
    (1 row)

    Query 20210812_174322_00013_3bdrf, FINISHED, 1 node
    Splits: 45 total, 39 done (86.67%)
    0:04 [174 rows, 81.9KB] [39 rows/s, 18.6KB/s]

    presto:default> select count(*) from sword_purchases;
     _col0 
    -------
       222 
    (1 row)

    Query 20210812_174428_00014_3bdrf, FINISHED, 1 node
    Splits: 51 total, 51 done (100.00%)
    0:02 [222 rows, 103KB] [115 rows/s, 53.2KB/s]
    ```
    - The total number of rows in sword_purchases is increasing, indicating that the table is being updated as data are being read into it in stream.

## **Game Data Analysis**
*Answer some business insights using Presto with the data we generated*

#### **What percentage of users join guilds?**

```
    presto:default> select count(distinct host) * 100 / 3 as percent_users from join_guilds;
     percent_users 
    ---------------
                66 
    (1 row)
    
    Query 20210812_173940_00006_3bdrf, FINISHED, 1 node
    Splits: 23 total, 13 done (56.52%)
    0:04 [40 rows, 24.4KB] [9 rows/s, 5.65KB/s]
```

**Insight: 66% of users join guilds.**

#### **On average, do users catch butterflies more often than they slay dragons?**

```
    presto:default> select count(*) / count(distinct host) as avg_catch from catch_butterflies;
     avg_catch 
    -----------
            44 
    (1 row)

    Query 20210812_174059_00007_3bdrf, FINISHED, 1 node
    Splits: 31 total, 28 done (90.32%)
    0:07 [84 rows, 58.7KB] [12 rows/s, 8.68KB/s]

    presto:default> select count(*) / count(distinct host) as avg_slays from slay_dragons;
     avg_slays 
    -----------
            22 
    (1 row)

    Query 20210812_174109_00008_3bdrf, FINISHED, 1 node
    Splits: 32 total, 28 done (87.50%)
    0:03 [62 rows, 54.2KB] [18 rows/s, 16.3KB/s]
```

**Insight: On average, users catch butterflies twice as often as they slay dragons.**

### Recommendations:
- Users like in-game communities, and it is worthwhile to pursue expanding the social aspect of our game.
- We can add in more in-game actions that build off of catching butterflies (eg. make a butterfly collection book) rather than actions related to slaying dragons. For example, we can create missions for collecting butterflies instead of missions for slaying a number of dragons.