# <center>Project 3 w205</center>

### <center>Daniel Lampert</center>


### Load libraries

In [3]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf
import json
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

## Post weeek 11

### Week 11 Linux Commands
1. Command to startup cluster 
    - docker-compose up -d
2. Command to create topic
    - docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
3. Command to bring up flask
    - docker-compose exec mids env FLASK_APP=/w205/project-3-dtascidan/game_api.py flask run --host 0.0.0.0
4. Command to shutdown cluster
    - docker-compose down

### User defined functions

### Curl Commands
1. docker-compose exec mids curl http://localhost:5000/
2. docker-compose exec mids curl http://localhost:5000/purchase_a_sword
3. docker-compose exec mids curl http://localhost:5000/buy_a_sword
4. docker-compose exec mids curl http://localhost:5000/join_guild

In [4]:
@udf('string')
def munge_event(event_as_json):
    event = json.loads(event_as_json)
    event['Host'] = "moe"
    event['Cache-Control'] = "no-cache"
    return json.dumps(event)

### Makes raw events table

In [5]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()


In [6]:
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2020-12-07 05:21:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2020-12-07 05:21:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2020-12-07 05:21:...|            0|
|null|[7B 22 41 63 63 6...|events|        0|     3|2020-12-07 05:22:...|            0|
+----+--------------------+------+---------+------+--------------------+-------------+



In [7]:
 munged_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .withColumn('munged', munge_event('raw'))

In [8]:
munged_events.show()

+--------------------+--------------------+--------------------+
|                 raw|           timestamp|              munged|
+--------------------+--------------------+--------------------+
|{"Host": "localho...|2020-12-07 05:21:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-12-07 05:21:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-12-07 05:21:...|{"Host": "moe", "...|
|{"Accept": "*/*",...|2020-12-07 05:22:...|{"Accept": "*/*",...|
+--------------------+--------------------+--------------------+



In [11]:
extracted_events = munged_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.munged))) \
        .toDF()
extracted_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Cache-Control: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [13]:
sword_purchases = extracted_events \
        .filter(extracted_events.event_type == 'purchase_sword') #have to add purchase sword function
sword_purchases.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Cache-Control: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [14]:
 default_hits = extracted_events \
        .filter(extracted_events.event_type == 'default')
default_hits.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Cache-Control: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



## Post Week 12

### Week 12 Linux Commands
1. Apache bench commands 
    - docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/
    - docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword
    - docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_a_guild
    - docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/buy_a_sword
    
    - docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/
    - docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword
    - docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_a_guild
    - docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/buy_a_sword
  


#### Code from filtered_writes.py

In [21]:
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [22]:
raw_events = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

In [23]:
 purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

In [24]:
extracted_purchase_events = purchase_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

In [25]:
extracted_purchase_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [26]:
extracted_purchase_events.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|   localhost:5000|    curl/7.47.0|purchase_sword|   knights|2020-12-06 21:24:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|us

In [27]:
extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')

#### pyspark code

In [28]:
purchases = spark.read.parquet('/tmp/purchases')

In [29]:
purchases.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|   localhost:5000|    curl/7.47.0|purchase_sword|   knights|2020-12-06 21:24:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|us

In [30]:
purchases.registerTempTable('purchases')

In [31]:
purchases_by_example2 = spark.sql("select * from purchases where Host = 'user1.comcast.com'")

In [32]:
purchases_by_example2.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-06 21:29:...|
|   */*|us

In [33]:
df = purchases_by_example2.toPandas()


In [34]:
df.head()

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_type,timestamp
0,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,knights,2020-12-06 21:29:18.122
1,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,knights,2020-12-06 21:29:18.126
2,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,knights,2020-12-06 21:29:18.131
3,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,knights,2020-12-06 21:29:18.137
4,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,knights,2020-12-06 21:29:18.14


### Basic Analytics Using Spark SQl on Events table

1. Were more hosts connecting from comcast or att?
2. When was the first time stamp and the last one?

In [35]:
comcast_table = spark.sql("SELECT COUNT(Host) as Comcast_Count FROM purchases WHERE Host = 'user1.comcast.com'")
att_table = spark.sql("SELECT COUNT(Host) as ATT_Count FROM purchases WHERE Host = 'user2.att.com'")

In [36]:
comcast_table.show()
att_table.show()

+-------------+
|Comcast_Count|
+-------------+
|           10|
+-------------+

+---------+
|ATT_Count|
+---------+
|       10|
+---------+



In [37]:
time_stamp_table = spark.sql("SELECT MIN(timestamp), MAX(timestamp) FROM purchases")

In [38]:
time_stamp_table.show()

+--------------------+--------------------+
|      min(timestamp)|      max(timestamp)|
+--------------------+--------------------+
|2020-12-06 21:24:...|2020-12-06 21:29:...|
+--------------------+--------------------+



## Post Week 13

### Week 13 Linux Commands
1. Infinite loop to run Apache Bench
    - while true; do docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword; sleep 5; done
2. Command to see if infinite loop wrote to HDFS
    - docker-compose exec cloudera hadoop fs -ls /tmp/sword_purchases

In [47]:
def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])

In [48]:
@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [49]:
raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

In [50]:
sword_purchases = raw_events \
        .filter(is_sword_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_sword_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

In [51]:
sink = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
        .option("path", "/tmp/sword_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()

### Hive Commands to make table with Schema on read
1. Command to make external table for schema on read with Hive
    - docker-compose exec cloudera hive
2. Command to create external table
    - create external table if not exists default.sword_purchases (Accept string, Host string, User_Agent string, event_type string, timestamp string) stored as parquet location '/tmp/sword_purchases'  tblproperties ("parquet.compress"="SNAPPY");

### Presto queries against external table
1. Command to query with Presto
    - docker-compose exec presto presto --server presto:8080 --catalog hive --schema default
2. Example queries and results
    - SELECT * FROM sword_purchases;
    - {"Host": "user1.comcast.com", "sword_type": "knights", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-02 18:56:54.56  | */*        | user1.comcast.com | ApacheBench/2.3 
 {"Host": "user1.comcast.com", "sword_type": "knights", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-02 18:56:54.564 | */*        | user1.comcast.com | ApacheBench/2.3 
 {"Host": "user1.comcast.com", "sword_type": "knights", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-02 18:56:54.566 | */*        | user1.comcast.com | ApacheBench/2.3 
 {"Host": "user1.comcast.com", "sword_type": "knights", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-02 18:56:54.569 | */*        | user1.comcast.com | ApacheBench/2.3 
 {"Host": "user1.comcast.com", "sword_type": "knights", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-02 18:56:54.573 | */*        | user1.comcast.com | ApacheBench/2.3 
    - SELECT COUNT(*) FROM sword_purchases;
    - 1970

In [52]:
#stops streaming
sink.stop()