# Project 2

## Initialization: Linux Commands

```
cd ~/w205/project-2-AshQTan
curl -L -o assessment-attempts-20180128-121051-nested.json https://goo.gl/ME6hjp
cp ~/w205/course-content/08-Querying-Data/docker-compose.yml ~/w205/project-2-AshQTan/
```

modify the spark section of the docker-compose.yml for jupyter notebook:
```
	expose:
		- "8888"
	ports:
		- "8888:8888"
```


1) command to bring up the cluster

```
docker-compose up -d
docker-compose ps
docker ps -a
```

2) command to create the kafka topic with a meaningful name (most students choose assessments)

```
docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```

3) command to publish the assessments json data to the kafka topic using kafkacat
```
docker-compose exec mids bash -c "kafkacat -C -b kafka:29092 -t assessments -o beginning -e"
```

4) command to shutdown the cluster

```
docker-compose down
docker-compose ps
docker ps -a
```


See history.txt for full list of commands used.

## Set Up

In [8]:
import json
from pyspark.sql import Row
import pandas as pd

# 1) create a data frame by subscribing to the kafka topic
raw_assessments = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessments").option("startingOffsets", "earliest").option("endingOffsets", "latest").load()
raw_assessments.cache()

# 2) convert the json data as a string to a new dataframe
assessments = raw_assessments.select(raw_assessments.value.cast('string'))

# 3) extract / unrolls the json data into new dataframes 
extracted_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()

# 4) register dataframes as temporary tables to allow in memory queries against them
extracted_assessments.registerTempTable('assessments')

# 6) perform a write to HDFS in parquet format for each data frame you created
assessments.write.mode('overwrite').parquet("/tmp/assessments")


In [2]:
spark.sql("select * from assessments limit 10").show()

+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|Map(questions -> ...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|Map(questions -> ...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false

## Nested multi-value as a dictionary

We can extract sequence.id by writing a custom lambda transform, creating a separate data frame, registering it as a temp table, and use spark SQL to join it to the outer nesting layer:

In [3]:
def my_lambda_sequences_id(x):
    raw_dict = json.loads(x.value)
    my_dict = {"keen_id" : raw_dict["keen_id"], "sequences_id" : raw_dict["sequences"]["id"]}
    return Row(**my_dict)

my_sequences = assessments.rdd.map(my_lambda_sequences_id).toDF()
my_sequences.registerTempTable('sequences')
my_sequences.write.mode('overwrite').parquet("/tmp/sequences")

spark.sql("select a.keen_id, a.keen_timestamp, s.sequences_id from assessments a join sequences s on a.keen_id = s.keen_id limit 10").show()


+--------------------+------------------+--------------------+
|             keen_id|    keen_timestamp|        sequences_id|
+--------------------+------------------+--------------------+
|5a17a67efa1257000...|1511499390.3836269|8ac691f8-8c1a-403...|
|5a26ee9cbf5ce1000...|1512500892.4166169|9bd87823-4508-4e0...|
|5a29dcac74b662000...|1512692908.8423469|e7110aed-0d08-4cb...|
|5a2fdab0eabeda000...|1513085616.2275269|cd800e92-afc3-447...|
|5a30105020e9d4000...|1513099344.8624721|8ac691f8-8c1a-403...|
|5a3a6fc3f0a100000...| 1513779139.354213|e7110aed-0d08-4cb...|
|5a4e17fe08a892000...|1515067390.1336551|9abd5b51-6bd8-11e...|
|5a4f3c69cc6444000...| 1515142249.858722|083844c5-772f-48d...|
|5a51b21bd0480b000...| 1515303451.773272|e7110aed-0d08-4cb...|
|5a575a85329e1a000...| 1515674245.348099|25ca21fe-4dbb-446...|
+--------------------+------------------+--------------------+



## Nested multi-valued as a list
Let's see an example of a multi-valued in the form of a list. Previously, we saw that we can pull out 1 item using the [] operator. In this example, we will pull out all values from the list by writing a custom labmda transform, creating a another data frame, registering it as a temp table, and joining it to data frames of outer nesting layers.

In [4]:
def my_lambda_questions(x):
    raw_dict = json.loads(x.value)
    my_list = []
    my_count = 0
    for l in raw_dict["sequences"]["questions"]:
        my_count += 1
        my_dict = {"keen_id" : raw_dict["keen_id"], "my_count" : my_count, "id" : l["id"]}
        my_list.append(Row(**my_dict))
    return my_list

my_questions = assessments.rdd.flatMap(my_lambda_questions).toDF()
my_questions.registerTempTable('questions')
my_questions.write.mode('overwrite').parquet("/tmp/questions")

spark.sql("select q.keen_id, a.keen_timestamp, q.id from assessments a join questions q on a.keen_id = q.keen_id limit 10").show()

+--------------------+------------------+--------------------+
|             keen_id|    keen_timestamp|                  id|
+--------------------+------------------+--------------------+
|5a17a67efa1257000...|1511499390.3836269|803fc93f-7eb2-412...|
|5a17a67efa1257000...|1511499390.3836269|f3cb88cc-5b79-41b...|
|5a17a67efa1257000...|1511499390.3836269|32fe7d8d-6d89-4db...|
|5a17a67efa1257000...|1511499390.3836269|5c34cf19-8cfd-4f5...|
|5a26ee9cbf5ce1000...|1512500892.4166169|0603e6f4-c3f9-4c2...|
|5a26ee9cbf5ce1000...|1512500892.4166169|26a06b88-2758-45b...|
|5a26ee9cbf5ce1000...|1512500892.4166169|25b6effe-79b0-4c4...|
|5a26ee9cbf5ce1000...|1512500892.4166169|6de03a9b-2a78-46b...|
|5a26ee9cbf5ce1000...|1512500892.4166169|aaf39991-fa83-470...|
|5a26ee9cbf5ce1000...|1512500892.4166169|aab2e817-73dc-4ff...|
+--------------------+------------------+--------------------+



## How to handle "holes" in json data

In [5]:
def my_lambda_correct_total(x):
    
    raw_dict = json.loads(x.value)
    my_list = []
    
    if "sequences" in raw_dict:
        
        if "counts" in raw_dict["sequences"]:
            
            if "correct" in raw_dict["sequences"]["counts"] and "total" in raw_dict["sequences"]["counts"]:
                    
                my_dict = {"correct": raw_dict["sequences"]["counts"]["correct"], 
                           "total": raw_dict["sequences"]["counts"]["total"]}
                my_list.append(Row(**my_dict))
    
    return my_list

my_correct_total = assessments.rdd.flatMap(my_lambda_correct_total).toDF()
my_correct_total.registerTempTable('ct')
spark.sql("select * from ct limit 10").show()


+-------+-----+
|correct|total|
+-------+-----+
|      2|    4|
|      1|    4|
|      3|    4|
|      2|    4|
|      3|    4|
|      5|    5|
|      1|    1|
|      5|    5|
|      4|    4|
|      0|    5|
+-------+-----+



### Question 1: How many assessments are included in this dataset?

In [6]:
# 5) perform SQL queries against the dataframes you registered
assessments = raw_assessments.select(raw_assessments.value.cast('string'))
extracted_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()
extracted_assessments.registerTempTable('assessments')
spark.sql("select count(*) from assessments").show()


+--------+
|count(1)|
+--------+
|    3280|
+--------+



There are 3280 assessments included in this dataset.

### Question 2: How many people took *Learning Git*?

In [7]:
spark.sql('select exam_name, count(exam_name) as name_count from assessments group by exam_name order by name_count desc').show()


+--------------------+----------+
|           exam_name|name_count|
+--------------------+----------+
|        Learning Git|       394|
|Introduction to P...|       162|
|Introduction to J...|       158|
|Intermediate Pyth...|       158|
|Learning to Progr...|       128|
|Introduction to M...|       119|
|Software Architec...|       109|
|Beginning C# Prog...|        95|
|    Learning Eclipse|        85|
|Learning Apache M...|        80|
|Beginning Program...|        79|
|       Mastering Git|        77|
|Introduction to B...|        75|
|Advanced Machine ...|        67|
|Learning Linux Sy...|        59|
|JavaScript: The G...|        58|
|        Learning SQL|        57|
|Practical Java Pr...|        53|
|    HTML5 The Basics|        52|
|   Python Epiphanies|        51|
+--------------------+----------+
only showing top 20 rows



394 people took Learning Git.

In [None]:
exit()