# Project 3

## **0. Summary of Commands for Every Week of Project**
#### 1) Cd into the Right Directory 
    cd ~/w205/project-3-hfarb
    
#### 2) Commands to Bring Up the Cluster and See if it is Up
    docker-compose up -d
    docker-compose ps
    docker ps -a
#### 3) Command to Create the Kafka Topic Events
    docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-   exists --zookeeper zookeeper:32181

#### 4) Start Flask Server
    docker-compose exec mids env FLASK_APP=/w205/project-3-hfarb/game_api.py flask run --host 0.0.0.0
#### 5) Run Individual Apache Bench Commands 
     Each week is different:
     Week 11: 
      docker-compose exec mids curl http://localhost:5000/
      docker-compose exec mids curl http://localhost:5000/purchase_a_sword
      docker-compose exec mids curl http://localhost:5000/buy_a_sword
      docker-compose exec mids curl http://localhost:5000/join_a_guild
      
     Week 12: Add different metta data to game_api
      docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/
      docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword
      docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/buy_a_sword
      docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_a_guild

      docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/
      docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword
      docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/buy_a_sword
      docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_a_guild
      
     Week 13: 
      docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/
      docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword
      docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/buy_a_sword
      docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_a_guild

      docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/
      docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword
      docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/buy_a_sword
      docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_a_guild
#### 6) In week 13 for Streaming Data: Infinite Loop to Run the Apache Bench Command
    while true; do docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword; done

#### 7) Set up to Watch kafka
     docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning -e     

#### 8)  Create a symbolic link in the Spark container to the /205 mount point
     docker-compose exec spark bash
     ln -s /w205 w205
     exit 

#### 9) Run an Enhanced Version of the Pyspark Command Line to Target Jupyter Notebook
    docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark

#### 10) Change 0.0.0.0 to the External Ip Address for your Google Cloud Virtual Machine and Open Incognito Browser
    34.82.86.53
    http://34.82.86.53:8888/?token=7f7c04aaa9b3a3c796f79b4f30d4d1073558741f0e5fb433

#### 11) In Week 13 for Streaming Data: Hive command to create an external table for schema on read
     docker-compose exec cloudera hive
     create external table if not exists default.sword_purchases (Accept string, Host string, User_Agent string, event_type string, timestamp string) stored as parquet location '/tmp/sword_purchases'  tblproperties ("parquet.compress"="SNAPPY");

#### 12) In Week 13 for Streaming Data:  Run presto so you can query against the external table 
     docker-compose exec presto presto --server presto:8080 --catalog hive --schema default
     
#### 13) Shutdown the Cluster and Make Sure it Shut Down Correctly 
     docker-compose down 
     docker-compose ps
     docker ps -a

#### After week 11

In [1]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf

In [2]:
spark

In [3]:
@udf('string')
def munge_event(event_as_json):
    event = json.loads(event_as_json)
    event['Host'] = "moe"
    event['Cache-Control'] = "no-cache"
    return json.dumps(event)

In [4]:
 raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [5]:
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2020-12-05 19:30:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2020-12-05 19:30:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2020-12-05 19:30:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2020-12-05 19:30:...|            0|
+----+--------------------+------+---------+------+--------------------+-------------+



In [6]:
 munged_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .withColumn('munged', munge_event('raw'))

In [7]:
munged_events.show()

+--------------------+--------------------+--------------------+
|                 raw|           timestamp|              munged|
+--------------------+--------------------+--------------------+
|{"Host": "localho...|2020-12-05 19:30:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-12-05 19:30:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-12-05 19:30:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-12-05 19:30:...|{"Host": "moe", "...|
+--------------------+--------------------+--------------------+



In [8]:
extracted_events = munged_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.munged))) \
        .toDF()

In [9]:
extracted_events.show()

+------+-------------+----+-----------+--------------+--------------------+
|Accept|Cache-Control|Host| User-Agent|    event_type|           timestamp|
+------+-------------+----+-----------+--------------+--------------------+
|   */*|     no-cache| moe|curl/7.47.0|       default|2020-12-05 19:30:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2020-12-05 19:30:...|
|   */*|     no-cache| moe|curl/7.47.0|     buy_sword|2020-12-05 19:30:...|
|   */*|     no-cache| moe|curl/7.47.0|    join_guild|2020-12-05 19:30:...|
+------+-------------+----+-----------+--------------+--------------------+



In [10]:
 sword_purchases = extracted_events \
        .filter(extracted_events.event_type == 'purchase_sword')

In [11]:
sword_purchases.show()

+------+-------------+----+-----------+--------------+--------------------+
|Accept|Cache-Control|Host| User-Agent|    event_type|           timestamp|
+------+-------------+----+-----------+--------------+--------------------+
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2020-12-05 19:30:...|
+------+-------------+----+-----------+--------------+--------------------+



In [12]:
default_hits = extracted_events \
        .filter(extracted_events.event_type == 'default')

In [13]:
default_hits.show()

+------+-------------+----+-----------+----------+--------------------+
|Accept|Cache-Control|Host| User-Agent|event_type|           timestamp|
+------+-------------+----+-----------+----------+--------------------+
|   */*|     no-cache| moe|curl/7.47.0|   default|2020-12-05 19:30:...|
+------+-------------+----+-----------+----------+--------------------+



### After Week 12

After week 12, we need to modify the game_api.py file to add different meta data. Add type of sword is silver, sword length is short, and guild name is farber.

@app.route("/purchase_a_sword")
def purchase_a_sword():
    purchase_sword_event = {'event_type': 'purchase_sword',
                            'sword_type': 'silver'}
    log_to_kafka('events', purchase_sword_event)
    return "Sword Purchased!\n"

@app.route("/buy_a_sword")
def buy_a_sword():
    buy_sword_event = {'event_type': 'buy_sword',
                       'sword_length': 'short'}
    log_to_kafka('events', buy_sword_event)
    return "Sword Bought!\n"

@app.route("/join_a_guild")
def join_guild():
    join_guild_event = {'event_type': 'join_guild',
                        'guild_name': 'farber'}
    log_to_kafka('events', join_guild_event)
    return "Guild Joined!\n"


#### Code from filtered_writes.py

In [1]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf

In [2]:
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [3]:
spark

In [4]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [5]:
purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

In [6]:
extracted_purchase_events = purchase_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

In [7]:
extracted_purchase_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [8]:
extracted_purchase_events.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|us

In [9]:
extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')

#### pyspark code

In [10]:
purchases = spark.read.parquet('/tmp/purchases')

In [11]:
purchases.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|us

In [12]:
purchases.registerTempTable('purchases')

In [13]:
purchases_by_example2 = spark.sql("select * from purchases where Host = 'user1.comcast.com'")

In [14]:
purchases_by_example2.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|    silver|2020-12-08 20:12:...|
|   */*|us

In [15]:
df = purchases_by_example2.toPandas()

In [16]:
df

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_type,timestamp
0,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,silver,2020-12-08 20:12:58.916
1,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,silver,2020-12-08 20:12:58.919
2,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,silver,2020-12-08 20:12:58.923
3,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,silver,2020-12-08 20:12:58.933
4,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,silver,2020-12-08 20:12:58.937
5,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,silver,2020-12-08 20:12:58.94
6,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,silver,2020-12-08 20:12:58.942
7,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,silver,2020-12-08 20:12:58.945
8,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,silver,2020-12-08 20:12:58.947
9,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,silver,2020-12-08 20:12:58.951


### After Week 13

In [1]:
import json
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

In [2]:
def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])

In [3]:
@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [4]:
spark

In [5]:
 raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

In [6]:
sword_purchases = raw_events \
        .filter(is_sword_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_sword_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

In [7]:
 sink = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
        .option("path", "/tmp/sword_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()

#### 1) Command to Open Hive and Create  External Table
    docker-compose exec cloudera hive 
    create external table if not exists default.sword_purchases (Accept string, Host string, User_Agent string, event_type string, timestamp string) stored as parquet location '/tmp/sword_purchases'  tblproperties ("parquet.compress"="SNAPPY");

#### 2) Command to Open Presto so we Can Query Against External Tables
    docker-compose exec presto presto --server presto:8080 --catalog hive --schema default

#### 3) Query Against External Tables Using Presto
     Query 1: All Info from Users About Purhcased Swords 
     select * from sword_purchases limit 5;
                                                                                                                                                                                                                                  
      accept       |          host           | user_agent |    event_type     |    timestamp    
-----------------------------------------------------------------------------------------------------------------------------------------+-------------------------+------------+-------------------+-----------------
 {"Host": "user1.comcast.com", "sword_type": "silver", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-09 04:46:10.012 | */*        | user1.comcast.com | ApacheBench/2.3 
 {"Host": "user1.comcast.com", "sword_type": "silver", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-09 04:46:10.016 | */*        | user1.comcast.com | ApacheBench/2.3 
 {"Host": "user1.comcast.com", "sword_type": "silver", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-09 04:46:10.018 | */*        | user1.comcast.com | ApacheBench/2.3 
 {"Host": "user1.comcast.com", "sword_type": "silver", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-09 04:46:10.021 | */*        | user1.comcast.com | ApacheBench/2.3 
 {"Host": "user1.comcast.com", "sword_type": "silver", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-09 04:46:10.026 | */*        | user1.comcast.com | ApacheBench/2.3 
   
     Query 2: How Many Swords Were Purchased?
     select count(*) from sword_purchases;
_col0 
-------
 14810 
(1 row)

#### Stop the Stream 

In [8]:
sink.stop()