### After Week 11

Copy this week's docker-compose.yml file

cp ~/w205/course-content/11-Storing-Data-III/docker-compose.yml ~/w205/project-3-asozer

bring up cluster:

docker-compose up -d

(check with docker-compose ps -a )

-----

Copy game_api.py in the week 11 course content folder

cp ~/w205/course-content/11-Storing-Data-III/*.py ~/w205/project-3-asozer

-----

Added new endpoints in game_api.py for 'buy a sword' & 'join guild'

-----

Create topic events:

docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181

-----

startup the flask server (may need to modify the directory and file name)


docker-compose exec mids env FLASK_APP=/w205/project-3-asozer/game_api.py flask run --host 0.0.0.0

-----

Generate events use curl for all endpoints (inclusing for new endpoints):

docker-compose exec mids curl http://localhost:5000/

docker-compose exec mids curl http://localhost:5000/purchase_a_sword

docker-compose exec mids curl http://localhost:5000/buy_a_sword

docker-compose exec mids curl http://localhost:5000/join_guild

----

Prepare for Jupyter Notebook with symbolic link

docker-compose exec spark bash

ln -s /w205 w205

exit

----

Generate Jupyter Notebook with PySpark kernel (Note to change ip address to own ip):

docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark

----

In Jupyter Notebook:

In [1]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf

In [2]:
@udf('string')
def munge_event(event_as_json):
    event = json.loads(event_as_json)
    event['Host'] = "moe"
    event['Cache-Control'] = "no-cache"
    return json.dumps(event)

In [3]:
raw_events = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

In [5]:
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2020-12-03 21:05:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2020-12-03 21:05:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2020-12-03 21:06:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2020-12-03 21:06:...|            0|
+----+--------------------+------+---------+------+--------------------+-------------+



In [6]:
munged_events = raw_events \
    .select(raw_events.value.cast('string').alias('raw'),
            raw_events.timestamp.cast('string')) \
    .withColumn('munged', munge_event('raw'))

In [7]:
munged_events.show()

+--------------------+--------------------+--------------------+
|                 raw|           timestamp|              munged|
+--------------------+--------------------+--------------------+
|{"Host": "localho...|2020-12-03 21:05:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-12-03 21:05:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-12-03 21:06:...|{"Host": "moe", "...|
|{"Host": "localho...|2020-12-03 21:06:...|{"Host": "moe", "...|
+--------------------+--------------------+--------------------+



In [8]:
extracted_events = munged_events \
    .rdd \
    .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.munged))) \
    .toDF()

In [9]:
extracted_events.show()

+------+-------------+----+-----------+--------------+--------------------+
|Accept|Cache-Control|Host| User-Agent|    event_type|           timestamp|
+------+-------------+----+-----------+--------------+--------------------+
|   */*|     no-cache| moe|curl/7.47.0|       default|2020-12-03 21:05:...|
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2020-12-03 21:05:...|
|   */*|     no-cache| moe|curl/7.47.0|     buy_sword|2020-12-03 21:06:...|
|   */*|     no-cache| moe|curl/7.47.0|    join_guild|2020-12-03 21:06:...|
+------+-------------+----+-----------+--------------+--------------------+



In [10]:
sword_purchases = extracted_events \
    .filter(extracted_events.event_type == 'purchase_sword')

In [11]:
sword_purchases.show()

+------+-------------+----+-----------+--------------+--------------------+
|Accept|Cache-Control|Host| User-Agent|    event_type|           timestamp|
+------+-------------+----+-----------+--------------+--------------------+
|   */*|     no-cache| moe|curl/7.47.0|purchase_sword|2020-12-03 21:05:...|
+------+-------------+----+-----------+--------------+--------------------+



In [12]:
default_hits = extracted_events \
        .filter(extracted_events.event_type == 'default')

In [13]:
default_hits.show()

+------+-------------+----+-----------+----------+--------------------+
|Accept|Cache-Control|Host| User-Agent|event_type|           timestamp|
+------+-------------+----+-----------+----------+--------------------+
|   */*|     no-cache| moe|curl/7.47.0|   default|2020-12-03 21:05:...|
+------+-------------+----+-----------+----------+--------------------+



Shutdown the cluster:

docker-compose down
docker-compose ps -a (check)

----

### After Week 12:

Copy this week's docker-compose.yml file

cp ~/w205/course-content/12-Querying-Data-II/docker-compose.yml .

bring up cluster:

docker-compose up -d

(check with docker-compose ps -a )

----

Edit game_api.py file:

Added sword_type: knights to /purchase_a_sword endpoint

Added sword_length: long to /buy_a_sword endpoint

Added guild_name: smith to /join_guild

(done to make sure our filter is acutally filtering later on)

----

Create Kafka topic events

docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181


-----

Run flask api server 

docker-compose exec mids env FLASK_APP=/w205/project-3-asozer/game_api.py flask run --host 0.0.0.0

-----

Run following to individual apache bench commands:

docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/

docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword

docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/buy_a_sword

docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_guild

---

docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/

docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword

docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/buy_a_sword

docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_guild

-----

Run following: Shoudl see that we have offset 80 Kafkacat, and the different shemas

docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning -e

-----

Get ready for Jupyter notebook by running Spark, and adding symbolic link:
    
docker-compose exec spark bash

ln -s /w205 w205

exit

-----

Run following to get Jupyter notebook link. Note that we need to change IP address to our own

docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark

-----

In Jupyter Notebook:

Code from filtered_writes.py:

In [66]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf

In [67]:
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [68]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [69]:
purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

In [70]:
extracted_purchase_events = purchase_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

In [71]:
extracted_purchase_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [72]:
extracted_purchase_events.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|us

In [73]:
extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')
        

pyspark code:

In [74]:
purchases = spark.read.parquet('/tmp/purchases')

In [75]:
purchases.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|us

In [76]:
purchases.registerTempTable('purchases')


In [77]:
purchases_by_example2 = spark.sql("select * from purchases where Host = 'user1.comcast.com'")


In [78]:
purchases_by_example2.show()


+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|   knights|2020-12-05 21:20:...|
|   */*|us

In [79]:
df = purchases_by_example2.toPandas()


In [80]:
df.describe()

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_type,timestamp
count,9200,9200,9200,9200,9200,9200
unique,1,1,1,1,1,9200
top,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,knights,2020-12-05 22:11:06.214
freq,9200,9200,9200,9200,9200,1


In [81]:
df.head()

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_type,timestamp
0,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,knights,2020-12-05 21:20:12.395
1,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,knights,2020-12-05 21:20:12.398
2,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,knights,2020-12-05 21:20:12.399
3,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,knights,2020-12-05 21:20:12.401
4,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,knights,2020-12-05 21:20:12.405


Simple analytics using Spark SQL on the Spark Dataframe in Memory:

Business question: "How Many events are there total"

In [85]:
purchases.count()

9200

We see there are 9200 events in total.

Business question 2: How many purchase_sword events are there?

In [86]:
purchases[purchases['event_type'] == "purchase_sword"].count()


9200

We see that all 9200 of the event types are sword purchases.

### After Week 13

Copy this week's docker-compose.yml file

cp ~/w205/course-content/13-Understanding-Data/docker-compose.yml .

bring up cluster:

docker-compose up -d

(check with docker-compose ps -a )

----

Note that we need to change the docker-compose.yml file

Comment out the "ports" and "-888.888" line in cloudera

Undo the comment "ports" and "-8888.8888" line in spark

----

Create Kafka topic events

docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181

-----

Start flask server

docker-compose exec mids env FLASK_APP=/w205/project-3-asozer/game_api.py flask run --host 0.0.0.0

------

infinite loop to run the apache bench command

while true; do docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword; sleep 5; done

------

In [22]:
import json
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

In [23]:
def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])

In [24]:
@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False


In [25]:
spark

In [26]:
#code to write hdfs files in streaming mode
raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()


In [27]:
#filter at the beginning and put our imposing schema, rather than inferring schema
sword_purchases = raw_events \
        .filter(is_sword_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_sword_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

In [28]:
#Tell our sink to start
sink = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
        .option("path", "/tmp/sword_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()

Run the following hive command to hold the hive command to create an external table for schema on read. Note the change in the schema name to sword_purchases, and the directory name to tmp/sword_purchases:

create external table if not exists default.sword_purchases (Accept string, Host string, User_Agent string, event_type string, timestamp string) stored as parquet location '/tmp/sword_purchases'  tblproperties ("parquet.compress"="SNAPPY");

----------------------------------------

create presto query against table:

docker-compose exec presto presto --server presto:8080 --catalog hive --schema default

----------------------------------------

basic queries in Hive:

(1) select * from sword_purchases limit 5;

returns:
                                                                  accept                                                                  |          host           | user_agent |    event_type     |    timestamp    
------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+------------+-------------------+-----------------

{"Host": "user1.comcast.com", "sword_type": "knights", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-05 21:37:31.822 | */*        | user1.comcast.com | ApacheBench/2.3 
 
 {"Host": "user1.comcast.com", "sword_type": "knights", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-05 21:37:31.828 | */*        | user1.comcast.com | ApacheBench/2.3 
 
 {"Host": "user1.comcast.com", "sword_type": "knights", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-05 21:37:31.833 | */*        | user1.comcast.com | ApacheBench/2.3 
 
 
 {"Host": "user1.comcast.com", "sword_type": "knights", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-05 21:37:31.836 | */*        | user1.comcast.com | ApacheBench/2.3 
 
 {"Host": "user1.comcast.com", "sword_type": "knights", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-05 21:37:31.841 | */*        | user1.comcast.com | ApacheBench/2.3
 
 
 -----
 


(2) select count(*) from sword_purchases;

returns:
 _col0 
 
  1670
  
(^ response grows every 5 seconds)

--------

In [29]:
#stop the stream
sink.stop()