# Joe Mirza - Project 3 - W205

<br>  
### **My project has the following files:**  
**1. Project_3_Joe_Mirza_w205.ipynb:** This notebook. To narrate the pipeline steps and perform a few presto queries once the data is landed in hadoop.   <br>  
**2. ab.sh:** A bash script I run from within a while loop that runs on the command line and streams events into the pipeline using apache bench.   <br>  
**3. game_api.py:** Takes the events from the last step and routes them from flask into kafka. Maps those incoming events into event dictionaries, appends header information and packages each bundle into a json before sending the object into kafka.    <br>  
**4. stream_and_hive.py:** Reads the event objects from kafka. Uses pyspark to filter the 4 events types I'm supporting (default, purchase_sword, purchase_armor and join_a_guild) into 4 separate tables which are registered in hive and written to hadoop.     <br>  
**5. docker-compose.yml:** Spins up the cluster, maps/coordinates the interaction between some of the services. Will pull a few pieces out of that file into this one inline in this notebook to demonstrate I understand how they work.   <br>  

Will spin up the cluster with:        `     docker-compose up -d`

### **Services spun up:**
- zookeeper
- kafka
- cloudera
- spark
- presto
- mids
<br>  
The only new service here, relative to what we've done in Project 1 and 2, is presto. Spark is used in this project to filter, transform and ultimately write events to hadoop. We'll use presto in this notebook to perform sql queries.  


### **Start up kafkacat to 'consume' or receive events from apache bench:**

#### `docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning`

I followed the approach laid out in unit 13a, which didn't create the topic 'events' first but rather relied on the fact that kafkacat will create a topic 'events' if it doesn't already exist. But that requires entering the above command twice. Seems odd, but it works. 

Per the above command and the .yml, kafka is listening on port 29092
    ```
      kafka:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        expose:
          - "9092"
          - "29092"
    ```

### **Before running flask, we'll discuss what game_api.py does and how I modified it. I'll only show the methods and flask decorators I created or modified so you don't have to wade through the whole thing. If you need to see all of game_api.py, it's in the repository.**

1.flask is imported to read and, well, 'route' the apache bench messages it receives  <br>    
2.KafkaProducer is imported to send the events that are transformed here into the kafka queue   <br>  
3.request is imported to append header information to the transformed events    <br>  
4.Each of the 4 events type I'm supporting has a different flask decorator and associated python method. The one I created is join_a_guild. When flask's @app.route 'sees' an incoming event from apache bench ending in "/join_a_guild", it creates a join_guiild_event dictionary and sends it to the log_to_kakfa method. There the header is appended and the whole thing is sent to the `events` topic as a json by kafka producer. <br>  
5.Sword purchases and Guild joins have metadata associated with each of them. In the case of sword purchases, it's the type of sword (e.g. broadsword, longsword or scimitar) and with guilds it's brewers, masons and assassins. 
 

    ```
    import json
    from kafka import KafkaProducer
    from flask import Flask, request

    app = Flask(__name__)
    producer = KafkaProducer(bootstrap_servers='kafka:29092')

    @app.route("/purchase_a_sword/<sword_name>")
    def purchase_a_sword(sword_name):
        purchase_sword_event = {'event_type': 'purchase_sword', 'sword_type': sword_name}
        log_to_kafka('events', purchase_sword_event)
        return "Sword Purchased!\n"

    @app.route("/join_a_guild/<guild_name>")
    def join_a_guild(guild_name):
        join_guild_event = {'event_type': 'join_a_guild','guild_type': guild_name}
        log_to_kafka('events', join_guild_event)
        return "Guild Joined!\n"
    ```

### **Let's run flask now**

#### `docker-compose exec mids env FLASK_APP=/w205/project-3-FuriousGeorge19/game_api.py flask run --host 0.0.0.0`

### **With flask and kafka running, now is a good point to test whether what we've built up until this point is working by using apache bench to stream some events and see if they're picked up by flask and kafka **

Rather than use the while loop in the live session slides, that had the apache bench calls in them, I thought I'd try to use the while loop to run the bash script ab.sh repeatedly instead, which I modified to have the types of events I needed to pass. I was pleasantly surprised when it worked. 

The modified loop sends lots and lots (90 events) each time the loop runs. 

    ```
    
    while true; do 
      ./ab.sh; 
      sleep 10;  
    done
    
    ```

### **It works. In the terminal window where flask is running we see lines like this:**

    ```
    
    127.0.0.1 - - [05/Dec/2020 08:57:01] "GET / HTTP/1.0" 200 -
    127.0.0.1 - - [05/Dec/2020 08:57:01] "GET / HTTP/1.0" 200 -
    127.0.0.1 - - [05/Dec/2020 08:57:01] "GET / HTTP/1.0" 200 -
    127.0.0.1 - - [05/Dec/2020 08:57:01] "GET / HTTP/1.0" 200 -
    127.0.0.1 - - [05/Dec/2020 08:57:01] "GET / HTTP/1.0" 200 -
    127.0.0.1 - - [05/Dec/2020 08:57:01] "GET / HTTP/1.0" 200 -
    127.0.0.1 - - [05/Dec/2020 08:57:01] "GET / HTTP/1.0" 200 -
    127.0.0.1 - - [05/Dec/2020 08:57:01] "GET / HTTP/1.0" 200 -
    127.0.0.1 - - [05/Dec/2020 08:57:01] "GET / HTTP/1.0" 200 -
    127.0.0.1 - - [05/Dec/2020 08:57:01] "GET / HTTP/1.0" 200 -
    127.0.0.1 - - [05/Dec/2020 08:57:02] "GET /purchase_a_sword HTTP/1.0" 200 -
    127.0.0.1 - - [05/Dec/2020 08:57:02] "GET /purchase_a_sword HTTP/1.0" 200 -
    127.0.0.1 - - [05/Dec/2020 08:57:02] "GET /purchase_a_sword HTTP/1.0" 200 -
    127.0.0.1 - - [05/Dec/2020 08:57:02] "GET /purchase_a_sword HTTP/1.0" 200 -
    127.0.0.1 - - [05/Dec/2020 08:57:02] "GET /purchase_a_sword HTTP/1.0" 200 -
    127.0.0.1 - - [05/Dec/2020 08:57:02] "GET /purchase_a_sword HTTP/1.0" 200 -
    
    ```
These are the raw events from apache bench.

### **It works. In the window running flask we see lines like this:**

<font size="2.5">

{"Host": "user1.comcast.com", "sword_type": "broadsword", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} <br>   
{"Host": "user1.comcast.com", "sword_type": "broadsword", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"}   <br>   
{"Accept": "*/*", "Host": "user2.att.com", "event_type": "join_a_guild", "guild_type": "brewers", "User-Agent": "ApacheBench/2.3"} <br>   
{"Accept": "*/*", "Host": "user2.att.com", "event_type": "join_a_guild", "guild_type": "brewers", "User-Agent": "ApacheBench/2.3"} <br>   
{"Host": "user1.comcast.com", "sword_type": "longsword", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} <br>   
{"Host": "user1.comcast.com", "sword_type": "longsword", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} <br>   
{"Host": "user1.comcast.com", "sword_type": "longsword", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} <br>   


### **With data flowing into kafka, we can now run stream_and_hive.py**

#### This: 
#### 1. Reads the event objects from kafka.
#### 2. Uses pyspark to filter the 4 events types I'm supporting (default, purchase_sword, purchase_armor and join_a_guild) into 4 separate tables. I created the join_a_guild schema and modified purchase_sword's to support the metadata I mentioned earlier.
#### 3. Registers those tables in hive and writes them to to hadoop.

#### Rather then describe the entire file, I'll show examples of some of the changes I made. 

#### This is the schema I created joining a guild. It has an additional metadata field to support the type of guild: mason, assassins and brewers. 


    ```
    def join_a_guild_event_schema():
        """
        root
        |-- Accept: string (nullable = true)
        |-- Host: string (nullable = true)
        |-- User-Agent: string (nullable = true)
        |-- event_type: string (nullable = true)
        |-- guild_type: string (nullable = true) 
        """
        return StructType([
            StructField("Accept", StringType(), True),
            StructField("Host", StringType(), True),
            StructField("User-Agent", StringType(), True),
            StructField("event_type", StringType(), True),
            StructField("guild_type", StringType(), True),
        ])
```

#### A user-defined function needs to be created to test whether or not an event is of type 'join_a_guild'.

    ```
    @udf('boolean')
    def is_join_guild(event_as_json):
        """udf for filtering events
        """
        event = json.loads(event_as_json)
        if event['event_type'] == 'join_a_guild':
            return True
        return False
    ```

#### Fields from the join_guilds table are written to hadoop 
   
    ```
    spark.sql("drop table if exists join_guilds")
        sql_string_guilds = """
            create external table if not exists join_guilds (
                raw_event string,
                timestamp string,
                Accept string,
                Host string,
                `User-Agent` string,
                event_type string,
                guild_type string
                )
                stored as parquet
                location '/tmp/join_guilds'
                tblproperties ("parquet.compress"="SNAPPY")
                """
        spark.sql(sql_string_guilds)
    ```

### So we'll use this command to submit stream_and_hive.py to the spark container...

#### `docker-compose exec spark spark-submit /w205/project-3-FuriousGeorge19/stream_and_hive.py`

### And then use this command to start up presto and write some queries on the data we worked so hard to create

#### `docker-compose exec presto presto --server presto:8080 --catalog hive --schema default`


### We'll look at the tables we have in presto

#### `presto:default> show tables;`


#### Our results:
   
    ```
    presto:default> show tables;
           Table       
    -------------------
     armor_purchases   
     default_purchases 
     join_guilds       
     sword_purchases   
    (4 rows)
    ```

### We'll start by focusing on the table join_guilds. We created it from scratch and want to make sure the events and metadata were passed correctly. 


### This is a good start. event_type and guild_type are where they should be. 


```
    presto:default> describe join_guilds;
       Column   |  Type   | Comment 
    ------------+---------+---------
     raw_event  | varchar |         
     timestamp  | varchar |         
     accept     | varchar |         
     host       | varchar |         
     user-agent | varchar |         
     event_type | varchar |         
     guild_type | varchar |  
```


## We'll select the most interesting columns from the table and look at the first 25 rows. This looks good. The event_type is parsed correctly and the metadata 'guild_type' is populated the way I expected it to be. 

```

presto:default> select timestamp, host, "user-agent", event_type, guild_type from join_guilds limit 25;
        timestamp        |     host      |   user-agent    |  event_type  | guild_type 
-------------------------+---------------+-----------------+--------------+------------
 2020-12-06 10:51:18.293 | user2.att.com | ApacheBench/2.3 | join_a_guild | brewers    
 2020-12-06 10:51:18.299 | user2.att.com | ApacheBench/2.3 | join_a_guild | brewers    
 2020-12-06 10:51:18.305 | user2.att.com | ApacheBench/2.3 | join_a_guild | brewers    
 2020-12-06 10:51:18.311 | user2.att.com | ApacheBench/2.3 | join_a_guild | brewers    
 2020-12-06 10:51:18.317 | user2.att.com | ApacheBench/2.3 | join_a_guild | brewers    
 2020-12-06 10:51:18.324 | user2.att.com | ApacheBench/2.3 | join_a_guild | brewers    
 2020-12-06 10:51:18.332 | user2.att.com | ApacheBench/2.3 | join_a_guild | brewers    
 2020-12-06 10:51:18.337 | user2.att.com | ApacheBench/2.3 | join_a_guild | brewers    
 2020-12-06 10:51:18.347 | user2.att.com | ApacheBench/2.3 | join_a_guild | brewers    
 2020-12-06 10:51:18.353 | user2.att.com | ApacheBench/2.3 | join_a_guild | brewers    
 2020-12-06 10:51:19.028 | user2.att.com | ApacheBench/2.3 | join_a_guild | masons     
 2020-12-06 10:51:19.037 | user2.att.com | ApacheBench/2.3 | join_a_guild | masons     
 2020-12-06 10:51:19.048 | user2.att.com | ApacheBench/2.3 | join_a_guild | masons     
 2020-12-06 10:51:19.057 | user2.att.com | ApacheBench/2.3 | join_a_guild | masons     
 2020-12-06 10:51:19.068 | user2.att.com | ApacheBench/2.3 | join_a_guild | masons     
 2020-12-06 10:51:19.076 | user2.att.com | ApacheBench/2.3 | join_a_guild | masons     
 2020-12-06 10:51:19.082 | user2.att.com | ApacheBench/2.3 | join_a_guild | masons     
 2020-12-06 10:51:19.087 | user2.att.com | ApacheBench/2.3 | join_a_guild | masons     
 2020-12-06 10:51:19.095 | user2.att.com | ApacheBench/2.3 | join_a_guild | masons     
 2020-12-06 10:51:19.1   | user2.att.com | ApacheBench/2.3 | join_a_guild | masons     
 2020-12-06 10:51:19.737 | user2.att.com | ApacheBench/2.3 | join_a_guild | assassins  
 2020-12-06 10:51:19.743 | user2.att.com | ApacheBench/2.3 | join_a_guild | assassins  
 2020-12-06 10:51:19.748 | user2.att.com | ApacheBench/2.3 | join_a_guild | assassins  
 2020-12-06 10:51:19.754 | user2.att.com | ApacheBench/2.3 | join_a_guild | assassins  
 2020-12-06 10:51:19.759 | user2.att.com | ApacheBench/2.3 | join_a_guild | assassins  
(25 rows)

```

### We'll count the number of users that joined a guild (so far)
```
presto:default> select count(event_type) 
             -> from join_guilds;
 _col0 
-------
  1590 
(1 row)

```
#### 1,590 users joined a guild of any type

### ...and the number that joined the brewers guild

```
presto:default> select count(guild_type) from join_guilds where guild_type ='masons';
 _col0 
-------
   710 
(1 row)
```
#### 710 users joined the masons' guild

### We'll switch to looking to the sword_purchases table, the other table whose schema I modified

## We'll take a look at a subset of the interesting columns from the sword_purchases table. event_type 'purchase_sword' and the associated metadata 'sword_type' look like they've been passed and parsed correctly. 
```

presto:default> select timestamp, host, "user-agent", event_type, sword_type from sword_purchases limit 25;
        timestamp        |       host        |   user-agent    |   event_type   | sword_type 
-------------------------+-------------------+-----------------+----------------+------------
 2020-12-06 10:56:58.271 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | broadsword 
 2020-12-06 10:56:58.277 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | broadsword 
 2020-12-06 10:56:58.282 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | broadsword 
 2020-12-06 10:56:58.286 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | broadsword 
 2020-12-06 10:56:58.292 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | broadsword 
 2020-12-06 10:56:58.298 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | broadsword 
 2020-12-06 10:56:58.303 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | broadsword 
 2020-12-06 10:56:58.308 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | broadsword 
 2020-12-06 10:56:58.315 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | broadsword 
 2020-12-06 10:56:58.32  | user1.comcast.com | ApacheBench/2.3 | purchase_sword | broadsword 
 2020-12-06 10:56:58.939 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | longsword  
 2020-12-06 10:56:58.955 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | longsword  
 2020-12-06 10:56:58.96  | user1.comcast.com | ApacheBench/2.3 | purchase_sword | longsword  
 2020-12-06 10:56:58.966 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | longsword  
 2020-12-06 10:56:58.975 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | longsword  
 2020-12-06 10:56:58.981 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | longsword  
 2020-12-06 10:56:58.987 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | longsword  
 2020-12-06 10:56:58.993 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | longsword  
 2020-12-06 10:56:58.997 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | longsword  
 2020-12-06 10:56:59.002 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | longsword  
 2020-12-06 10:56:59.712 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | scimitar   
 2020-12-06 10:56:59.717 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | scimitar   
 2020-12-06 10:56:59.722 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | scimitar   
 2020-12-06 10:56:59.726 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | scimitar   
 2020-12-06 10:56:59.731 | user1.comcast.com | ApacheBench/2.3 | purchase_sword | scimitar   
(25 rows)

```

 ## We'll see how many swords have been purchased....
```

presto:default> select count(event_type) from sword_purchases;
 _col0 
-------
  3060 
(1 row)

```
#### There have been 3,060 sword purchases so far.

 ## ...and how many of those swords were scimitars...
```

presto:default> select count(sword_type) from sword_purchases where sword_type = 'scimitar';
 _col0 
-------
  1130 
(1 row

```
#### There have been 1,130 scimitars purchased so far. 