# W205 Project 3
## Priscilla Burity


#### Pipeline

In this project we have a mobile app game (a Flask application) in which the user can interact to send to the server the following requests: (i) purchase a sword and (ii) join the gild. Each interaction is an event.

Each event triggers an API call to the web server, and the API server then handles that request. 

We are interested in the API server sending us these events, meaning sending them into the Kafka pipeline. 

From there we use Spark streaming to filter/select data from Kafka as Kafka is hit by these events. We can do some initial analysis and even transform the events. We also want to pull metadata characteristics of such events, which are simply more information besides the event itself. 

Next we bring the data into a format that we can then be stored into HDFSS/parquet, which we use as our storage file system.

We then use Presto to query stuff back out from HDFS. In order to use Presto, we need the Hive meta store, which is used here as a table names/schema registry.

Finally, we sould produce an analytics report to present our queries and findings.

To summarize, **Project 3 pipeline consists of the followins steps**:

**Step 1** - Spin up the cluster and create a topic on kafka

**Step 2** - Built a mobile app game and send requests into the Kafka pipeline

**Step 3** - Use Apache Bench to generate test data

**Step 4** - Extract events from kafka and write them to HDFS (using Spark streaming)

**Step 5** - Use Presto to query data back out from HDFS & show basic analysis of the events



#### Files

Details on the content of these files will be discussed below. But basically, these are the files' names and a short description of their role in the pipeline.  

**docker-compose.yml** - Tool for defining and running multi-container Docker applications (as it was part of Project 2, I won't go into details of this file)

**game_api.py** - Our Flask application

**ab.sh** - Shell script with the the mock up requests so we can generate data.

**spark.py** - Script that (1) extract events from kafka, (2) filter, and (3) write them to hdfs. The script also (4) defines our own schema and (5) registers the schema in Hive. 

#### Commands flow

We start moving to the project's directory.

````
cd ~/w205/project-3-burityp

````

##### **Step 1 - Spin up the cluster and create a topic on kafka** 

`docker-compose up` starts the containers. `-d` for detached mode, i.e., starts the containers in the background.

```
docker-compose up -d
```

Now we need to create a topic in kafka. `exec` so I can run arbitrary commands in the services; `kafka` because my topic lives in kafka, then `kafka-topics` to `create` a `topic` that is named `events`, with the usual options (`partitions 1`, `replication-factor 1`). Finally, I set the connection to Zookeeper with the appropriate port number. 

```
docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```


##### **Step 2 - Set a mobile app game and send requests into the Kafka pipeline**

Our Flask application is the following:

```
vim game_api.py
```

Output:

```
#!/usr/bin/env python
import json
from kafka import KafkaProducer
from flask import Flask, request

app = Flask(__name__)
producer = KafkaProducer(bootstrap_servers='kafka:29092')


def log_to_kafka(topic, event):
    event.update(request.headers)
    producer.send(topic, json.dumps(event).encode())


@app.route("/")
def default_response():
    default_event = {'event_type': 'default'}
    log_to_kafka('events', default_event)
    return "This is the default response!\n"


@app.route("/purchase_a_sword")
def purchase_a_sword():
    purchase_sword_event = {'event_type': 'purchase_sword'}
    log_to_kafka('events', purchase_sword_event)
    return "You Purchased a Sword!\n"


@app.route("/join_the_guild")
def join_the_guild():
    join_the_guild_event = {'event_type': 'join_guild'}
    log_to_kafka('events', join_the_guild_event)
    return "You Joined the Guild!\n"

```


In this API, we define `producer` to conect to kafka and add `request headers` in the `log_to_kafka` function, so we have the full information concerning that event. This allows us to get some better information about, for example, what browser is used and the source of the host (where did that request come from). 

We then add the three interactions the user can make in the game, namely `default`, `purchase_a_sword` and `join_guild`.

Now we need to run it in the `mids` container. 

```
docker-compose exec mids env FLASK_APP=/w205/project-3-burityp/game_api.py flask run --host 0.0.0.0
```

Output:

```
 * Serving Flask app "game_api"
 * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit)
````

This means that the server is running.

Now in another (second) terminal we listen to Kafka to we see what hits Kafka.

We run kafkacat in the consumer mode (`-C`), the bootstrap servers is `kafka:29092`, we will listen to topic (`-t`) `events`, opening (`-o`) at the `beginning` and without `-e` (end option) so it will run continuously. 

```
docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning
```

**Step 3 - Use Apache Bench to generate test data**

Now we need to create stream data. To do so, in a third terminal we will use Apache Bench, which is a tool for benchmarking our HTTP server. It is designed to give an impression of how our Apache installation performs. It can also be used to mock up data (based on generated events) by using `-h` - and that's what we will be doing.

We are using a shell script named `ab.sh` with the mock up data. As the game API is running in the first terminal and we are listening to kafka in the second terminal, to check the content of `ab.sh`, we open a third terminal and type:

```
vim ab.sh
```
Output:

```

#!/bin/bash

while true
do
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/
docker-compose exec mids ab -n 20 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword
docker-compose exec mids ab -n 30 -H "Host: user1.comcast.com" http://localhost:5000/join_the_guild

docker-compose exec mids ab -n 15 -H "Host: user2.att.com" http://localhost:5000/
docker-compose exec mids ab -n 25 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword
docker-compose exec mids ab -n 35 -H "Host: user2.att.com" http://localhost:5000/join_the_guild

docker-compose exec mids ab -n 20 -H "Host: user3.gogov.com" http://localhost:5000/
docker-compose exec mids ab -n 30 -H "Host: user3.gogov.com" http://localhost:5000/purchase_a_sword
docker-compose exec mids ab -n 40 -H "Host: user3.gogov.com" http://localhost:5000/join_the_guild

done

```

Here we have three hosts (`user1.comcast.com`, `user2.att.com` and `user3.gogov.com`). 

Each one of them send the three types of requests `http://localhost:5000/`, `http://localhost:5000/purchase_a_sword` and `http://localhost:5000/join_the_guild`.

In each loop the requests are sent *n* times (*n* being a number from 10 to 40 right after `-n` in each line). The `while true` line at the beginning makes the requests to be sent continously until we stop it (by typing `Ctrl C`). As we will use Spark streaming to filter/select data from Kafka as Kafka is hit by the events, this design is useful here as we can test if the stored data is actually growing.   



Now we run the script

```
bash ab.sh
```

As output we get a set of information about our benchmarking run. 

Recall that the game is runnig in the first terminal and the second terminal is listening to Kafka. Now that we are actually sending the requests via `ab.sh`, the first terminal is now displaying the requests,  while the second terminal is displaying the flow of messages that are hitting Kafka with all the meta information we required in the game API.  

**Step 4 - Extract events from kafka and write them to hdfs (using spark streaming)**

The next step is to read the data, filter it so we can get the events we are intested in, create the tables and finally write/store them to HDFS. 

That's what the script `spark.py` does.

````
vim spark.py
````

Output:

````
#!/usr/bin/env python
"""Extract events from kafka and write them to hdfs
"""
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType


# Define schema for purchase_sword_event

def purchase_sword_event_schema():
    
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("timestamp", StringType(), True),

    ])

# Define schema for join_guild_event

def join_guild_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("timestamp", StringType(), True),
    ])


# Create a filter for event_type purchase_sword

@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

# Create a filter for event_type join_guild

@udf('boolean')
def is_join(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'join_guild':
        return True
    return False

def main():
    """main
    """
    
    # Start a Spark session
    
    spark = SparkSession \
        .builder \
        .appName("ExtractEventsJob") \
        .enableHiveSupport() \
        .getOrCreate()

    # Read from kafka using Spark stream
    
    raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

    # Filter event 'purchase_sword' from raw_events, register schema in Hive and write the table in HDFS
    
        # Filter/prepare data
    
    sword_purchases = raw_events \
        .filter(is_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_sword_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

    
        # register schema in Hive 
    
    spark.sql("drop table if exists sword_purchases")
    sql_string = """
        create external table if not exists sword_purchases (
            raw_event string,
            timestamp string,
            Accept string,
            Host string,
            `User-Agent` string,
            event_type string
            )
            stored as parquet
            location '/tmp/sword_purchases'
            tblproperties ("parquet.compress"="SNAPPY")
            """
    
    spark.sql(sql_string)
    
    # write in HDFS
    
    sink = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
        .option("path", "/tmp/sword_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()

   
    
    # Filter event 'join_guild' from raw_events, register schema in Hive and write the table in HDFS
        
        # Filter/prepare data
    
    join_guilds = raw_events \
        .filter(is_join(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          join_guild_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

    
        # register schema in Hive 
    
    spark.sql("drop table if exists join_guilds")
    sql_string2 = """
        create external table if not exists join_guilds (
            raw_event string,
            timestamp string,
            Accept string,
            Host string,
            `User-Agent` string,
            event_type string
            )
            stored as parquet
            location '/tmp/join_guilds'
            tblproperties ("parquet.compress"="SNAPPY")
            """
    spark.sql(sql_string2)
    
    # write in HDFS
    
    sink2 = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_join_guilds") \
        .option("path", "/tmp/join_guilds") \
        .trigger(processingTime="10 seconds") \
        .start()


    spark.streams.awaitAnyTermination()

        
if __name__ == "__main__":
    main()


````




The `spark.py` file starts importing the necessary libraries as usual. 

Next we define the schema for the events. This step is not necessary in the current situation as our events are beign generated in a simulated enviroment, but in real life it's useful. This is because the schema is inferred through the first message that hits Kafka and all events coming after that are assumed to have the same schema. So if the first message has the wrong schema, pieces of information of the following messages that don't fit in the schema of the first message will be lost. Defining our own schema is the safest way to make sure that we are not losing relevant information. 

Next in the script we see the creation of two filters (`is_purchase` and `is_join`) that filter `event_type`s `purchase_sword` and `join_gild` respectively. This will be useful below to write in HDFS different tables for different `event_type`s.

Next in the script, we define `spark` for the spark section, then `raw_events` to read from Kafka. In the latter, the `.readStream` option sets Spark streaming to filter/select data from Kafka as Kafka is being hit by the events. The other options here are the usual ones: define the correct boostrap server, subscribe to topic `events`, and read from earliest to latest piece of data. 

Then for each `event_type` we filter the data in `raw_events` by event type, we pull apart the `timestamp` and add the columns of `value` (with alias `raw_events`). We then use the data and the schema definition set above to create our table. 

In order to use Presto (step 5 below), we need the Hive meta store, which is used here as table names/schema registry. More details on that will be discussed in Step 5. For now we only need to register our schema in Hive.  

To do so, we start by droping the table if it exists, then create the table and specify the schema. We tell Hive it is stored as parquet, give it the location where it's stored in HDFS and how it's compressed. At the end of this block we use `spark.sql` to register that table with Hive.

Finally, after all that, we write the tables in HDFS (our `sink`). For stream writing we have to create a `sink` for each destination we want to create. We add `.writeStream` to write in stream mode, in format `parquet` as usual. We add checkpoints that helps keeping the distributed system and the sink in sync. We then add the location where we want to write the tables in HDFS (`.option("path", "/tmp/...")`) and set a trigger processing time of 10 seconds. 

This whole process will be continously running until we terminate it. So we add `spark.streams.awaitAnyTermination()` as the sink is waiting for termination.   

Hopefully, we will be able to find those tables in HDFS folders `/tmp/sword_purchases` and `/tmp/join_guilds` respectively.

Now, in a fourth terminal, we run it:
    
```
docker-compose exec spark spark-submit /w205/project-3-burityp/spark.py
```
It will be running, waiting termination. Thus in a fifth terminal we check if the data folders are in HDFS, recalling that Hadoop lives in Cloudera:

````
docker-compose exec cloudera hadoop fs -ls /tmp
````

Output:

````
Found 7 items
drwxrwxrwt   - root   supergroup          0 2020-12-05 16:54 /tmp/checkpoints_for_join_guilds
drwxrwxrwt   - root   supergroup          0 2020-12-05 16:54 /tmp/checkpoints_for_sword_purchases
drwxrwxrwt   - mapred mapred              0 2016-04-06 02:26 /tmp/hadoop-yarn
drwx-wx-wx   - hive   supergroup          0 2020-12-05 16:54 /tmp/hive
drwxrwxrwt   - root   supergroup          0 2020-12-05 16:55 /tmp/join_guilds
drwxrwxrwt   - mapred hadoop              0 2016-04-06 02:28 /tmp/logs
drwxrwxrwt   - root   supergroup          0 2020-12-05 16:55 /tmp/sword_purchases

````

Now we check each of the folders `/tmp/sword_purchases` and `/tmp/join_guilds`.

````
docker-compose exec cloudera hadoop fs -ls /tmp/sword_purchases
````
Output:

````
Found 13 items
drwxr-xr-x   - root supergroup          0 2020-12-05 16:56 /tmp/sword_purchases/_spark_metadata
-rw-r--r--   1 root supergroup       3509 2020-12-05 16:56 /tmp/sword_purchases/part-00000-00c74e49-2259-49cd-bcb9-8d7f47d45f90-c000.snappy.parquet
-rw-r--r--   1 root supergroup       2737 2020-12-05 16:55 /tmp/sword_purchases/part-00000-3be93744-539e-48ab-ae8e-a87dae01abf6-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3497 2020-12-05 16:56 /tmp/sword_purchases/part-00000-3f383361-0938-42e7-b928-78cbd74a7ee0-c000.snappy.parquet
-rw-r--r--   1 root supergroup       2787 2020-12-05 16:55 /tmp/sword_purchases/part-00000-496522d1-33bb-4c36-a72e-5965643296d3-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3122 2020-12-05 16:55 /tmp/sword_purchases/part-00000-546f15d8-af8c-433e-be07-35af749d9e00-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3553 2020-12-05 16:56 /tmp/sword_purchases/part-00000-5a771f99-0056-4c3d-8145-013e4bd2d5eb-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3966 2020-12-05 16:55 /tmp/sword_purchases/part-00000-60a93447-0f93-4cb6-b578-c2f0d62fad4e-c000.snappy.parquet
-rw-r--r--   1 root supergroup       2868 2020-12-05 16:56 /tmp/sword_purchases/part-00000-83a58f8e-ca15-4eb2-8ce4-3464df10b75e-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3533 2020-12-05 16:55 /tmp/sword_purchases/part-00000-85d0b252-c283-4196-878e-5f11c094ff22-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3229 2020-12-05 16:55 /tmp/sword_purchases/part-00000-93a694ea-7e4d-433f-945b-9b6eafaa12ca-c000.snappy.parquet
-rw-r--r--   1 root supergroup       3511 2020-12-05 16:56 /tmp/sword_purchases/part-00000-a1fa6627-588c-40f7-818b-b9e0c2f855aa-c000.snappy.parquet
-rw-r--r--   1 root supergroup       2880 2020-12-05 16:55 /tmp/sword_purchases/part-00000-c2b71b37-72f4-44a8-982f-386463a5cf31-c000.snappy.parquet````






````

The output of `docker-compose exec cloudera hadoop fs -ls /tmp/join_guilds` is similar.

So everything seems to be ok! So our data has landed in HDFS, which makes us sure that we can read it back out. 



**Step 5 - Use Presto to query data back out from HDFS & report results**

Now it's time to use Presto to query stuff back out from HDFS. 

As mentioned above, Presto needs the Hive meta store to query from HDFS. In fact, Presto connects to Hive to look up the table names and the meta information and then connects to Hadoop file system to actually read the data so we can send SQL statements.

We have already included Hive in our spark script.

In `spark.py`, we enabled Hive support in the spark section (`.enableHiveSupport()`) and registered an external table for each event type.

Now in order to get to Presto, we run

```
docker-compose exec presto presto --server presto:8080 --catalog hive --schema default
```

This leads us to the the Presto console, which we'll use to check some basic features of our tables and report them. 



**Please see below some query results**:

1 - There are 2 tables in the dataset

````
presto:default> show tables;

      Table      
-----------------
 join_guilds     
 sword_purchases 
(2 rows)

Query 20201205_170551_00002_dbvdj, FINISHED, 1 node
Splits: 2 total, 1 done (50.00%)
0:02 [2 rows, 68B] [0 rows/s, 31B/s]

````

2 - The tables have the schema below (the same for both types of event): 

```
presto:default> describe sword_purchases;
   Column   |  Type   | Comment 
------------+---------+---------
 raw_event  | varchar |         
 timestamp  | varchar |         
 accept     | varchar |         
 host       | varchar |         
 user-agent | varchar |         
 event_type | varchar |         
(6 rows)

Query 20201205_170637_00003_dbvdj, FINISHED, 1 node
Splits: 2 total, 1 done (50.00%)
0:04 [0 rows, 0B] [0 rows/s, 0B/s]

```

```
presto:default> describe join_guilds;
   Column   |  Type   | Comment 
------------+---------+---------
 raw_event  | varchar |         
 timestamp  | varchar |         
 accept     | varchar |         
 host       | varchar |         
 user-agent | varchar |         
 event_type | varchar |         
(6 rows)

Query 20201205_170730_00004_dbvdj, FINISHED, 1 node
Splits: 2 total, 1 done (50.00%)
0:02 [6 rows, 422B] [2 rows/s, 179B/s]
```

3 - The tables are growing as new data is coming in.

We run `SELECT COUNT(*) FROM sword_purchases;` twice so we can see that the tables are growing.

1st time (4891 lines):
````
presto:default> SELECT COUNT(*) FROM sword_purchases;
 _col0 
-------
  4891 
(1 row)

Query 20201205_170832_00005_dbvdj, FINISHED, 1 node
Splits: 84 total, 80 done (95.24%)
0:23 [4.7K rows, 255KB] [204 rows/s, 11.1KB/s]
````

2nd time (5096 lines):
````
presto:default> SELECT COUNT(*) FROM sword_purchases;
 _col0 
-------
  5096 
(1 row)

Query 20201205_171044_00006_dbvdj, FINISHED, 1 node
Splits: 96 total, 88 done (91.67%)
0:07 [4.83K rows, 274KB] [716 rows/s, 40.6KB/s]

````


3 -  At a given (later) moment 3 hosts sent a different number of requests for the sword_purchase event. For example, user2.att.com sent 1725 requests of this type.

````
presto:default> SELECT host , COUNT(*) FROM sword_purchases GROUP BY host;
       host        | _col1 
-------------------+-------
 user2.att.com     |  1725 
 user3.gogov.com   |  2070 
 user1.comcast.com |  1376 
(3 rows)

Query 20201205_171157_00007_dbvdj, FINISHED, 1 node
Splits: 104 total, 102 done (98.08%)
0:16 [5.17K rows, 300KB] [329 rows/s, 19.1KB/s]

````

4 -  Likewise, at a given moment 3 hosts sent a different number of requests for the join_guild event. For example, user3.gogov.com sent 2160 requests of this type.

````
presto:default> SELECT host , COUNT(*) FROM join_guilds GROUP BY host;
       host        | _col1 
-------------------+-------
 user2.att.com     |  1825 
 user3.gogov.com   |  2160 
 user1.comcast.com |  1457 
(3 rows)

Query 20201205_171730_00008_dbvdj, FINISHED, 1 node
Splits: 129 total, 127 done (98.45%)
0:25 [5.37K rows, 324KB] [214 rows/s, 13KB/s]

````
    
    

This ends the project :)
    
Thank you!    