### Gabriel Ohaike

### Introduction:
 As a data scientist at a game development company, I am interested in tracking two events from my latest mobile game. Buy a sword & join guild. Each has metadata characterstic of such events.

### Tasks:
        
**In order to do this**, 

  1. I will instrument my API server to log events to Kafka
  
  2. Assemble a data pipeline to catch these events using Spark streaming to filter select event types from Kafka, land them into HDFS/parquet to make them available for analysis using Presto
  
  3. Use Apache Bench to generate test data for my pipeline.
 

**Create A docker Compose file**:

The first thing to do is create `docker compose file` that contains all the containers needed to successfully execute the events tracking. The container is made up of **zookeeper, kafka, claudera, spark, presto and mids** container. To see the content structure and port structure please refer to `docker-compose.yml`.

**Here is an example of one of the containers**

In [None]:
presto:
    image: midsw205/presto:0.0.1
    hostname: presto
    volumes:
      - ~/w205:/w205
    expose:
      - "8080"
    environment:
      HIVE_THRIFTSERVER: cloudera:9083
    extra_hosts:
      - "moby:127.0.0.1"

**Spin up cluster:**

In [None]:
docker-compose up -d

To spins up the container.The docker-compose up aggregates the output of each container in the docker-compose.yml file and -d starts the containers in the background and leave them running.

**Create a topic:**

In [None]:
docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181

This code is to create a kafka topic. The exec is use to issue a command expecially when the container is running multiple services. Next, `kafka kafka-topics` tells docker-compose to create a kafka topic. `create --topic events` now create a topc called assessment.partitions 1 allows topics to be parallelized by spitting in data into a particular topic across a multiple brokers. We are only interested in one partition here as per project reqirement, hence the number of `partitions is 1 with the replication factor of 1`. This defines the replication implimented at the partition level. Since we are only interested in one kafka topic, we set our replication factor as 1. --if-not-exists tells the command to execute only if topic does not exist, this avoids errors/warnings.--zookeeper zookeeper:32181 Here the option zookeeper is telling our connection zookeeper to connect to port 32181.

**Create a web-based application:**

In [None]:
import json
from kafka import KafkaProducer
from flask import Flask, request

app = Flask(__name__)
producer = KafkaProducer(bootstrap_servers='kafka:29092')


def log_to_kafka(topic, event):
    event.update(request.headers)
    producer.send(topic, json.dumps(event).encode())
    
@app.route("/purchase_a_sword")
def purchase_a_sword():
    purchase_sword_event = {'event_type': 'purchase_sword'}
    log_to_kafka('events', purchase_sword_event)
    return "Sword Purchased!\n"

The web app is called `game_api.py` in the folder. The code above shows some of the implementation processes. The `game_api.py` contains three main event. To process this, the `mobile app` makes an `API` call to the `web-based API server` with any of the following calls

   1. `default responses:`
        This returns a default response "This is the default response "
    
   2. `purchase_a_sword:`
        This api is called when the user want to purchase a sword. It ruturns "Sword purchased"
    
   3. `join_a_guild:`
       This is called when a user want to join a guild. It returns "Joined a Guild"
   
 In creating our web-based application, we import `Flask` class and create an instance of class called `app = Flask(__name__)`. We also import `KafkaProducer` to read from kafka using bootstrap.servers configuration to connect to `kafka:29092.` Next, we defined a function `log_to_kafka` to log events to kafka, update event header and use the `send` to send event to kafka `producer` and dump event to `json`before we log it kafka. Encoded with `encode()` for UTF8.  The `route()` decorator tells `Flask` what `url` should trigger the function.

### Streaming Set up

First thing we need to do is getting our set up ready to stream events. To do this, we file

   _1. ab.sh_
   
   _2. guild_sword_stream.py_

### ab.sh:

The `ab.sh` file uses `mids` container with `apache bench` denoted as `ab` to generate data. In the code example below, we are simply using `ab` to generate `150` purchases events from `user1` using a localhost:5000. The `.sh` script controls how we want our events to run during streaming.

In [None]:
docker-compose exec mids ab -n 150 -H "Host:user1.comcast.com" http://localhost:5000/purchase_a_sword
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_guild

### guild_sword_stream.py

The `guild_sword_stream.py` is used to define events schema, filter out events of interest and load into a `json` file and send to spark to extract events and write it into `HDFS`. Let's look at it in details to understand how different pieces contribute to the overall streaming process.

#### Define events schema:

The code below defines our `schema`. The `StructType` objects is used to define the schema of `Accept,Host, User_Agent, event_type` and flag the nullable for each column in the dataframe to be true. The smooth transition to our `json` file.

In [None]:
def sword_guild_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])

This extracted events is now fed to streaming mode as request comes i. Here, because our client is only interested in `sword purchases and join guild`, only these two would be filtered. These filtered events is `cast` to string. The `CAST()` function converts a value (of any type) into a specified datatype. In this case, we are converting value from the root to string. Finally,we `write` the stream events to `HDFS` using a processing time of `10 seconds`.

#### UDF 

`@udf` takes boolean values that returns `True or False` . The `purchase_guild` function takes events in json file format and extracts `purchase sword and join guild`. The return `True` returns only `purchase sword and join guild` and filter out any other events.

In [None]:
@udf('boolean')
def purchase_guild(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] in ('purchase_sword','join_guild'):
        return True
    return False

#### Event Stream

The event streaming is done by using a `SparkSession` to get or create events. To get the job to spark, we need the spark context using spark session that handles the background task. These events are then read by `kafka` using a `readStream` to read stream events as they are being fed from `spark`.

In [None]:
    spark = SparkSession \
        .builder \
        .appName("ExtractEventsJob") \
        .getOrCreate()

    raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

#### Filtering events

The streaming events from spark is fed to filter `purchase_sword and join_guild` cast them into string and select the filtered events with time stamp from the `json` file as seen in the code below then infer the schema.

In [None]:
    join_guild = raw_events \
        .filter(is_guild(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          join_guild_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

#### Sink to hdfs

The events are then written to hdfs using parquet format. The `writestream` writes the stream events after 10 seconds and stores in the `hdfs`. Once all events are written to `hdfs` it terminates using `sink.awaitTermination()`. It is advisable to set the `processing time` to reasonable time in order not to store events in too many small chunk of codes to the `hdfs` or keep so much memory in spark. Checkpoints is saved as `checkpoints_swords_guild`.

In [None]:
    sink = sword_guild \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_swords_guild") \
        .option("path", "/tmp/sword_guild") \
        .trigger(processingTime="10 seconds") \
        .start()

    sink.awaitTermination()

### Running the program

Haven looked into different files usage, let's see how we run it, store in the `hdfs` and query using the spark. 

After spinning up the `docker-compose.yml` file. Run the flask using `mids` container using flask envirnment variable `FLASK_APP` to run `game_api.py` using the host option `0.0.0.0.` that allows us to connect to `flask` from external sources

In [None]:
docker-compose exec mids env FLASK_APP=/w205/project-3-GOhaike/full-stack/game_api.py flask run --host 0.0.0.0

This will continue running while we set up kafkacat from the beginning to continously running mode. This is will enable streaming as the events comes in while `kafka` is listening.

In [None]:
docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning

While kafka is listening, we run Apache bench `ab.sh` to generate our streaming events and send in to `kafkacat`. The `while loop` ensures events are stream every 10 mins using `ab` in mids container and send to `kafka`. In our case here, we have `three users` for sword purchases and join guild. Running the file will create this these events. 

In [None]:
while true
	do
	docker-compose exec mids ab -n 150 -H "Host:user1.comcast.com" http://localhost:5000/purchase_a_sword
	docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_guild
	docker-compose exec mids ab -n 100 -H "Host: user2.comcast.com" http://localhost:5000/purchase_a_sword
	docker-compose exec mids ab -n 15 -H "Host: user2.comcast.com" http://localhost:5000/join_guild
	docker-compose exec mids ab -n 210 -H "Host: user3.comcast.com" http://localhost:5000/purchase_a_sword
	docker-compose exec mids ab -n 7 -H "Host: user3.comcast.com" http://localhost:5000/join_guild
	sleep 10
done

In [None]:
docker-compose exec spark spark-submit /w205/project-3-GOhaike/full-stack/guild_swords_stream.py

## Querying events:

In other to run query and perform analysis on streaming events, we are going to read from `hdfs` using a `hive metastore` in `presto` to register events in `hive` by running the command below. Our `event_stream.py` file contains two events of interest, `sword purchases and join guild` each of this events are filterd and stored in `hdfs`. Using `hive` these events referenced and pass to presto for query and analysis.

First, let's run a code to execute `hive` 

In [None]:
docker-compose exec cloudera hive

Next, create a table `sword_purchases and join_guilds` and reference `hdfs` using to preper the data for `presto`

In [None]:
create external table if not exists default.sword_purchases (raw_event string,timestamp string,Accept string, Host string, User_Agent string, event_type string) stored as parquet location '/tmp/sword_purchase'  tblproperties ("parquet.compress"="SNAPPY");

In [None]:
create external table if not exists default.join_guilds (raw_event string,timestamp string,Accept string, Host string, User_Agent string, event_type string) stored as parquet location '/tmp/join_guild'  tblproperties ("parquet.compress"="SNAPPY");

Next, we execute `presto` on server 8000 from hive catalog using a default schema

In [None]:
docker-compose exec presto presto --server presto:8080 --catalog hive --schema default

**Now let's see what the table look like**

In [None]:
show tables;

In [None]:
      Table      
-----------------
 join_guilds     
 sword_purchases 
(2 rows)

The result show 2 rows which is the event we want to track

#### let's see how the schema of sword purchases and join guilds look like

In [None]:
describe sword_purchases; 
    
   Column   |  Type   | Comment 
------------+---------+---------
 raw_event  | varchar |         
 timestamp  | varchar |         
 accept     | varchar |         
 host       | varchar |         
 user_agent | varchar |         
 event_type | varchar |         
(6 rows)

In [None]:
describe join_guilds;

   Column   |  Type   | Comment 
------------+---------+---------
 raw_event  | varchar |         
 timestamp  | varchar |         
 accept     | varchar |         
 host       | varchar |         
 user_agent | varchar |         
 event_type | varchar |

They both have the same structure, therefore we to the next

Now let's generate our stream events running `ab.sh`

In [None]:
./ab.sh

Once we run the .sh, the streaming events will start to update at any specified time. 

In [None]:
select count(*) as sword_purchases FROM sword_purchases;
    
    
 sword_purchases 
-----------------
           91080 

Let's see how many people that joined guild during the streaming

In [None]:
select count(*) as join_guild FROM join_guilds;
    
    
 join_guild 
------------
       5344 

As seen above, `91080` purchased sword and `5344` joined guild. This is true as more request as made for sword purchases than join guild.

#### Let's pull out a back and query

select the first 10 rows of sword purchases and join guild events

In [None]:
presto:default> select host, event_type, timestamp from sword_purchases limit 10;
     
    
       host        |   event_type   |        timestamp        
-------------------+----------------+-------------------------
 user1.comcast.com | purchase_sword | 2020-08-01 01:17:18.513 
 user1.comcast.com | purchase_sword | 2020-08-01 01:17:18.52  
 user1.comcast.com | purchase_sword | 2020-08-01 01:17:18.526 
 user1.comcast.com | purchase_sword | 2020-08-01 01:17:18.532 
 user1.comcast.com | purchase_sword | 2020-08-01 01:17:18.539 
 user1.comcast.com | purchase_sword | 2020-08-01 01:17:18.545 
 user1.comcast.com | purchase_sword | 2020-08-01 01:17:18.55  
 user1.comcast.com | purchase_sword | 2020-08-01 01:17:18.555 
 user1.comcast.com | purchase_sword | 2020-08-01 01:17:18.561 
 user1.comcast.com | purchase_sword | 2020-08-01 01:17:18.565 

In [None]:
presto:default> select host, event_type, timestamp from join_guilds limit 10;
    
    
       host        | event_type |        timestamp        
-------------------+------------+-------------------------
 user1.comcast.com | join_guild | 2020-08-01 01:14:09.085 
 user1.comcast.com | join_guild | 2020-08-01 01:14:09.089 
 user1.comcast.com | join_guild | 2020-08-01 01:14:09.093 
 user1.comcast.com | join_guild | 2020-08-01 01:14:09.096 
 user1.comcast.com | join_guild | 2020-08-01 01:14:09.101 
 user1.comcast.com | join_guild | 2020-08-01 01:14:09.104 
 user1.comcast.com | join_guild | 2020-08-01 01:14:09.108 
 user1.comcast.com | join_guild | 2020-08-01 01:14:09.111 
 user1.comcast.com | join_guild | 2020-08-01 01:14:09.115 
 user1.comcast.com | join_guild | 2020-08-01 01:14:09.118 

How many users do we have on our streaming events?

In [None]:
select count (DISTINCT host) as users FROM sword_purchases;
    
 users 
-------
     3 

In [None]:
select count (DISTINCT host) as users FROM join_guilds;
    
 users 
-------
     3 

In both events, we have three users requests. More users are welcome.

#### Looking at the Frequency of host in sword purchases and join guilds events

In [None]:
select host, count(*) as freq from sword_purchases GROUP BY host;

    
       host        | freq  
-------------------+-------
 user1.comcast.com | 29700 
 user2.comcast.com | 19800 
 user3.comcast.com | 41580 

In [None]:
select host, count(*) as freq from join_guilds GROUP BY host;

    
       host        | freq 
-------------------+------
 user3.comcast.com | 1169 
 user1.comcast.com | 1670 
 user2.comcast.com | 2505 

In both events, there are three users the purchase swords and three users that join guilds. As expected, there are more purchases and join guilds.

### CONCLUSION:

We are able to successfully tracked these two events `purchasing a sword and joining guild` from the moment request was made to what happend in the pipelines. We controlled the streaming from sword purchases by `10 mins` and join guild by `15`mins. This is so because more people make purchases than join guilds. Using Apache bench, we generate events and send to kafka. This events are stored in `hdfs` using `readstream` option from `kafka`. This allows for continuosly streaming events and storing into `hdfs`. To examine what is in the `hdfs`, we used `hive metastore -the hard-way` to register events in a table and query using presto.

Findings:

_**There are two events :** sword purchases and join guilds_

_There are `three users` in our stream events of which `91080` made sword purchases and `5344` joined guild_

_In `purchase sword event`_

   user1.comcast.com purchased `29700`
   user2.comcast.com purchased `19800`
   user3.comcast.com purchased `41580`


_In `join guild event`_

   user1.comcast.com joined `1169` times
   user2.comcast.com joined `1670` times
   user3.comcast.com joined `2505`times


This concludes my mobile games tracking events. 
