# Project 3 Report: Peter Morgan, Bruce Lam, Eda Kavlakoglu

In [1]:
# Importing Modules
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

## Executive Summary

My summary blah blah blah

## Repository Description
bulleted list describing all files that exist in repository

## Detailed Pipeline Breakdown

### Set Up
Below we will describe in detail how the pipeline is spun up. First, we need to change into the w205 directory and clone the project repository. This is done using the commands below.

```console
cd ~/w205
git clone https://github.com/mids-w205-martin-mims/project-3-superpeter55.git
```

Next, we must move into our new directory and create a new branch "assignemnt" and switch over to that branch. This is done using the commands below. We also include the git status command and its output to ensure that we are on the assignment branch as expected. As you can see, we successfully created and switched to the assignment branch

```console
cd project-3-superpeter55
git branch assignment
git checkout assignment
git status
```
```console
On branch assignment
```

The next step is going to be copying our docker-compose file from the week 13 content so we can run our docker images. This is done using the command below. In this case, the period "." at the end of the command pastes the copied file in the current directory. Once this is copied in, there are a few changes we have to make to ensure our docker containers run properly. First, we must comment out the two lines in the cloudera service that read "ports:" and "8888:8888". These lines are already commented out in the spark service in the docker-compose file and must be uncommented. This insures that we can run a jupyter notebook with pyspark in port 8888.

```console
cp ~/w205/course-content/13-Understanding-Data/docker-compose.yml .
```

Now that we have copied in the docker-compose file and made the necessary modifications, we will spin up our cluster using the command below. The -d at the end runs the containers in the background and we are able to continue to use our console. Next we will use docker-compose ps to make sure all the containers are up. The command lists all the containers and some basic information on them. Since all the containers say they are in the state "up", we will proceed.

```console
docker-compose up -d
```

Now that our cluster is running, we will create our kafka topic. We will use the exec command on kafka to execute a single kafka command. In this case we run the kafka-topics command with the --create option to create a topic. The --topic option is used to name the topic "events". I chose to name this topic events because we are collecting game event data for this project. We use the --partitions option to specify that we only need one partition. The --if-not-exists option is to prevent two topics with the same name being created. The --zookeeper option is specifying which port we would like to connect our topic to. To ensure this command is successful, the console will output "Created topic events".

```console
docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```

Moving on we will set up the game_api.py file which will define what type of events we can perform. To start, we will copy the game_api.py file from week 12 and then make some modifications. This is done using the command below.

```console
cp ~/w205/course-content/12-Querying-Data-II/game_api.py .
```

The game_api.py file from week 12 does not have functions for buy a sword or join guild so we will need to create them. The functions added to the game_api.py file are shown below. These functions are very similar to the function that already exists in the game_api.py file named purchase_a_sword(). The first line of each function specifies the route that will need to be specified to trigger each function. In this case, we are specifying /buy_a_sword to trigger the buy_a_sword function, and /join_guild to trigger the join_guild function. The first step of each function is to define a dictionary that has the event metadata. In this case, we specify that the event type is buy_sword or join_guild event and the sword_type is sharp and the guild_name is morgan because I started my own guild. The log_to_kafka function sends the event and event metadata to the kafka topic "events". Finally the return statement returns a nice message to the user that tells them what happened. 

```python
@app.route("/buy_a_sword")
def buy_a_sword():
    buy_sword_event = {'event_type': 'buy_sword',
                       'sword_type': 'sharp'}
    log_to_kafka('events', buy_sword_event)
    return "Sword Bought!\n"

@app.route("/join_guild")
def join_guild():
    join_guild_event = {'event_type': 'join_guild',
                        'guild_name': 'morgan'}
    log_to_kafka('events', join_guild_event)
    return "Joined Guild!\n"
```

Now that we have added these two functions to our game_api.py file, we can run our flask app through the game_api.py file. The code to run our flask app already exists in the game_api file so we just need to run the file and specify the host which in this case is 0.0.0.0. 

```console
docker-compose exec mids env FLASK_APP=/w205/project-3-superpeter55/game_api.py flask run --host 0.0.0.0
```

Note that since the flask app is now running in the window. We will need to open a new terminal to perform commands. We do this and then switch into the proper directory using the commands below.

```console
cd w205
cd project-3-superpeter55/
```

The next task we would like to perform is spinning up a jupyter notebook with pyspark. Before we actually spin up the notebook, we must create a symbolic link so that we can access our mounted w205 directory. This is done using the commands below. The first line uses docker-compose exec to open up a 

```console
docker-compose exec spark bash
ln -s /w205 w205
exit 
```

In [2]:
spark

In [3]:
sc

## Batch Mode

blah blah blah

In [10]:
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [11]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [12]:
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2021-12-04 23:29:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2021-12-04 23:29:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2021-12-04 23:29:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2021-12-04 23:29:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     4|2021-12-04 23:29:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     5|2021-12-04 23:29:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     6|2021-12-04 23:29:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     7|2021-12-04 23:29:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0

In [13]:
purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

In [14]:
purchase_events.show()

+--------------------+--------------------+
|                 raw|           timestamp|
+--------------------+--------------------+
|{"Host": "user1.c...|2021-12-04 23:30:...|
|{"Host": "user1.c...|2021-12-04 23:30:...|
|{"Host": "user1.c...|2021-12-04 23:30:...|
|{"Host": "user1.c...|2021-12-04 23:30:...|
|{"Host": "user1.c...|2021-12-04 23:30:...|
|{"Host": "user1.c...|2021-12-04 23:30:...|
|{"Host": "user1.c...|2021-12-04 23:30:...|
|{"Host": "user1.c...|2021-12-04 23:30:...|
|{"Host": "user1.c...|2021-12-04 23:30:...|
|{"Host": "user1.c...|2021-12-04 23:30:...|
|{"Host": "user2.a...|2021-12-04 23:31:...|
|{"Host": "user2.a...|2021-12-04 23:31:...|
|{"Host": "user2.a...|2021-12-04 23:31:...|
|{"Host": "user2.a...|2021-12-04 23:31:...|
|{"Host": "user2.a...|2021-12-04 23:31:...|
|{"Host": "user2.a...|2021-12-04 23:31:...|
|{"Host": "user2.a...|2021-12-04 23:31:...|
|{"Host": "user2.a...|2021-12-04 23:31:...|
|{"Host": "user2.a...|2021-12-04 23:31:...|
|{"Host": "user2.a...|2021-12-04

In [15]:
extracted_purchase_events = purchase_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

In [16]:
extracted_purchase_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_size: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [17]:
extracted_purchase_events.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_size|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|us

In [18]:
extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')

In [19]:
purchases = spark.read.parquet('/tmp/purchases')
purchases.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_size|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|us

In [20]:
purchases.registerTempTable('purchases')

In [21]:
purchases_by_example2 = spark.sql("select * from purchases where host='user1.comcast.com'")
purchases_by_example2.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_size|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|     large|2021-12-04 23:30:...|
|   */*|us

In [22]:
newdf = purchases_by_example2.toPandas()
newdf

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_size,timestamp
0,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-12-04 23:30:56.765
1,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-12-04 23:30:56.775
2,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-12-04 23:30:56.783
3,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-12-04 23:30:56.789
4,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-12-04 23:30:56.795
5,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-12-04 23:30:56.8
6,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-12-04 23:30:56.805
7,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-12-04 23:30:56.811
8,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-12-04 23:30:56.817
9,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-12-04 23:30:56.823


In [23]:
newdf.describe()

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_size,timestamp
count,10,10,10,10,10,10
unique,1,1,1,1,1,10
top,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,large,2021-12-04 23:30:56.823
freq,10,10,10,10,10,1


## Streaming Mode

blah blah blah

In [24]:
def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("sword_size", StringType(), True)
    ])

In [25]:
@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [26]:
raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

In [27]:
sword_purchases = raw_events \
        .filter(is_sword_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_sword_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

In [28]:
sink = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
        .option("path", "/tmp/sword_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()

## Business Analytics Questions

Use sql in presto
how many events are there?
how many events from user1/user2?

In [29]:
sink.stop()