### Linux Commands

#### Starting up the cluster
    docker-compose up -d
    docker-compose ps
    docker ps -a
    
#### Creating a topic
    docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
    
#### Starting up the flask server
    docker-compose exec mids env FLASK_APP=/w205/project-3-caseyhyoon/game_api.py flask run --host 0.0.0.0

#### Shutting down the cluster
    docker-compose down
    docker-compose ps
    docker ps -a

### After Week 11

#### Code from separate_events.py

In [1]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf

In [2]:
@udf('string')
def munge_event(event_as_json):
    event = json.loads(event_as_json)
    event['Host'] = "moe"
    event['Cache-Control'] = "no-cache"
    return json.dumps(event)

In [3]:
raw_events = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

In [4]:
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2020-12-10 04:07:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2020-12-10 04:07:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2020-12-10 04:07:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2020-12-10 04:07:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     4|2020-12-10 04:07:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     5|2020-12-10 04:07:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     6|2020-12-10 04:07:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     7|2020-12-10 04:07:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0

In [5]:
munged_events = raw_events \
    .select(raw_events.value.cast('string').alias('raw'),
            raw_events.timestamp.cast('string')) \
    .withColumn('munged', munge_event('raw'))

In [7]:
munged_events.show()

+--------------------+--------------------+--------------------+
|                 raw|           timestamp|              munged|
+--------------------+--------------------+--------------------+
|{"Host": "user1.c...|2020-12-10 04:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 04:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 04:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 04:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 04:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 04:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 04:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 04:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 04:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 04:07:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 04:08:...|{"Host": "moe", "...|
|{"Host": "user1.c...|2020-12-10 04:08:...|{"Host": "moe", "...|
|{"Host": "user1.c...|202

In [8]:
extracted_events = munged_events \
    .rdd \
    .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.munged))) \
    .toDF()

In [9]:
extracted_events.show()

+------+-------------+----+-----------+--------------+--------------------+
|Accept|Cache-Control|Host| User-Agent|    event_type|           timestamp|
+------+-------------+----+-----------+--------------+--------------------+
|   */*|     no-cache| moe|curl/7.47.0|       default|2020-12-08 12:18:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2020-12-08 12:18:...|
|   */*|     no-cache| moe|curl/7.47.0|     buy_sword|2020-12-08 12:18:...|
|   */*|     no-cache| moe|curl/7.47.0|    join_guild|2020-12-08 12:18:...|
+------+-------------+----+-----------+--------------+--------------------+



In [10]:
sword_purchases = extracted_events \
    .filter(extracted_events.event_type == 'purchase_sword')

In [11]:
sword_purchases.show()

+------+-------------+----+-----------+--------------+--------------------+
|Accept|Cache-Control|Host| User-Agent|    event_type|           timestamp|
+------+-------------+----+-----------+--------------+--------------------+
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2020-12-08 12:18:...|
+------+-------------+----+-----------+--------------+--------------------+



In [12]:
default_hits = extracted_events \
    .filter(extracted_events.event_type == 'default')

In [13]:
default_hits.show()

+------+-------------+----+-----------+----------+--------------------+
|Accept|Cache-Control|Host| User-Agent|event_type|           timestamp|
+------+-------------+----+-----------+----------+--------------------+
|   */*|     no-cache| moe|curl/7.47.0|   default|2020-12-08 12:18:...|
+------+-------------+----+-----------+----------+--------------------+



### After week 12

### Linux Commands

#### Individual Apache bench commands
    docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/
    docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword
    docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/buy_a_sword
    docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_guild

    docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/
    docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword
    docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/buy_a_sword
    docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_guild


#### Code from filtered_writes.py

In [1]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf

In [2]:
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [3]:
spark

In [13]:
raw_events = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

In [14]:
purchase_events = raw_events \
    .select(raw_events.value.cast('string').alias('raw'),
            raw_events.timestamp.cast('string')) \
    .filter(is_purchase('raw'))

In [15]:
extracted_purchase_events = purchase_events \
    .rdd \
    .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
    .toDF()

In [16]:
extracted_purchase_events.printSchema()


root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [17]:
extracted_purchase_events.show()


+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|us

In [18]:
extracted_purchase_events \
    .write \
    .mode('overwrite') \
    .parquet('/tmp/purchases')

#### Pyspark code

In [19]:
purchases = spark.read.parquet('/tmp/purchases')

purchases.show()


+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|us

In [20]:
purchases.registerTempTable('purchases')


In [21]:
purchases_by_example2 = spark.sql("select * from purchases where Host = 'user1.comcast.com'")

purchases_by_example2.show()


+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-10 06:08:...|
|   */*|us

In [22]:
df = purchases_by_example2.toPandas()

df.describe()

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_type,timestamp
count,10,10,10,10,10,10
unique,1,1,1,1,1,10
top,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,knights,2020-12-10 06:08:36.506
freq,10,10,10,10,10,1


#### Simple analytics using Spark SQL on the Spark Dataframe in Memory

#### How many users are there?

In [48]:
spark.sql("select count(DISTINCT Host) AS num_users from purchases").show()

+---------+
|num_users|
+---------+
|        2|
+---------+



#### How many swords does each user have?

In [49]:
spark.sql("select Host, count(sword_type) AS num_swords from purchases group by Host").show()

+-----------------+----------+
|             Host|num_swords|
+-----------------+----------+
|    user2.att.com|        10|
|user1.comcast.com|        10|
+-----------------+----------+



### How many sword_types are there?

In [50]:
spark.sql("select count(DISTINCT sword_type) as num_sword_types from purchases").show()

+---------------+
|num_sword_types|
+---------------+
|              1|
+---------------+



### After Week 13

In [1]:
import json
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

In [2]:
def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])


In [3]:
@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [4]:
spark

In [5]:
raw_events = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "events") \
    .load()

In [6]:
sword_purchases = raw_events \
    .filter(is_sword_purchase(raw_events.value.cast('string'))) \
    .select(raw_events.value.cast('string').alias('raw_event'),
            raw_events.timestamp.cast('string'),
            from_json(raw_events.value.cast('string'),
                      purchase_sword_event_schema()).alias('json')) \
    .select('raw_event', 'timestamp', 'json.*')

In [7]:
sink = sword_purchases \
    .writeStream \
    .format("parquet") \
    .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
    .option("path", "/tmp/sword_purchases") \
    .trigger(processingTime="10 seconds") \
    .start()

### Hive Command

#### Getting onto hive
    docker-compose exec cloudera hive

#### Create external table for schema on read
    create external table if not exists default.sword_purchases (Accept string, Host string, User_Agent string, event_type string, timestamp string) stored as parquet location '/tmp/sword_purchases'  tblproperties ("parquet.compress"="SNAPPY");

### Presto

#### Getting onto presto
    docker-compose exec presto presto --server presto:8080 --catalog hive --schema default
    
#### Showing results
    select * from sword_purchases limit 5;
    select count(*) from sword_purchases;

In [8]:
sink.stop()