In [1]:
import json
import pprint
import os
from pyspark import Row

# Part I: Linux Commands

### Getting the Assessments Data

```
curl -L -o assessment-attempts-20180128-121051-nested.json https://goo.gl/ME6hjp`
```

### Bringing up the cluster

```
docker-compose up -d
```

### Creating the kafka topic with a meaningful name (assessments)

```
docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```

### Checking the Topic

```
docker-compose exec kafka kafka-topics --describe --topic assessments --zookeeper zookeeper:32181
```

### Publish the assessments data to the kafka topic using kafkacat

```
docker-compose exec mids bash -c "cat /w205/project-2-daphne-yang/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments"
```

### Consuming the messages from Kafka

* Consumes the messages written to Kafka on the topic "assessments"
```
docker-compose exec mids bash -c "kafkacat -C -b kafka:29092 -t assessments -o beginning -e"
```
* Consumes the messages written to Kafka on the topic "assessments" and pipes and outputs the count of the messages
```
docker-compose exec mids bash -c "kafkacat -C -b kafka:29092 -t assessments -o beginning -e" | wc -l
```

## Launching Jupyter Notebook

### 1. Execute a bash shell into the spark containter

```
docker-compose exec spark bash
```

### 2. Create a symbolic link from the spark directory to /w205 container

```
ln -s /w205 w205
```

### 3. Exit the Spark Container

```
exit
```

### 4. From Command Line, run an enhanced version of pyspark command line to target Jupyter Notebook

```
docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark
```

## At the End of the Project, command to shutdown the cluster.

```
docker-compose down
```

# Part II: Unrolling the Json File, Spark SQL

In [2]:
p = pprint.PrettyPrinter(indent=1)

In [3]:
f = open("assessment-attempts-20180128-121051-nested.json","r")

In [4]:
s = f.read()

In [5]:
json_data = json.loads(s)

In [6]:
f.close()

In [7]:
len(json_data)

3280

### Extracting/Unrolling Json Data

In [8]:
# this will pretty print the json in alphabetic order which may or may not match the file order
example = p.pprint(json_data[0])
example

{'base_exam_id': '37f0a30a-7464-11e6-aa92-a8667f27e5dc',
 'certification': 'false',
 'exam_name': 'Normal Forms and All That Jazz Master Class',
 'keen_created_at': '1516717442.735266',
 'keen_id': '5a6745820eb8ab00016be1f1',
 'keen_timestamp': '1516717442.735266',
 'max_attempts': '1.0',
 'sequences': {'attempt': 1,
               'counts': {'all_correct': False,
                          'correct': 2,
                          'incomplete': 1,
                          'incorrect': 1,
                          'submitted': 4,
                          'total': 4,
                          'unanswered': 0},
               'id': '5b28a462-7a3b-42e0-b508-09f3906d1703',
               'questions': [{'id': '7a2ed6d3-f492-49b3-b8aa-d080a8aad986',
                              'options': [{'at': '2018-01-23T14:23:24.670Z',
                                           'checked': True,
                                           'correct': True,
                                           'id': '

In [9]:
def recursive_walk_json_object(j, level):
    """recursively walk through a json object to explore the structure
       dictionaries will be put in alphabetic order to match the pretty print above"""
    
    level += 1
    
    if type(j) is dict:
        dict_2_list = list(j.keys())
        dict_2_list.sort()
        for k in dict_2_list:
            print("   " * level + "L" + str(level), k)
            recursive_walk_json_object(j[k], level)
    
    elif type(j) is list:
        for (i, l) in enumerate(j):
            print("  " * level + "  [" + str(i) + "]")
            recursive_walk_json_object(l, level)
            
    else:
        print("   " * level + " value:", j)

In [10]:
recursive_walk_json_object(json_data[0], -1)

L0 base_exam_id
    value: 37f0a30a-7464-11e6-aa92-a8667f27e5dc
L0 certification
    value: false
L0 exam_name
    value: Normal Forms and All That Jazz Master Class
L0 keen_created_at
    value: 1516717442.735266
L0 keen_id
    value: 5a6745820eb8ab00016be1f1
L0 keen_timestamp
    value: 1516717442.735266
L0 max_attempts
    value: 1.0
L0 sequences
   L1 attempt
       value: 1
   L1 counts
      L2 all_correct
          value: False
      L2 correct
          value: 2
      L2 incomplete
          value: 1
      L2 incorrect
          value: 1
      L2 submitted
          value: 4
      L2 total
          value: 4
      L2 unanswered
          value: 0
   L1 id
       value: 5b28a462-7a3b-42e0-b508-09f3906d1703
   L1 questions
      [0]
         L3 id
             value: 7a2ed6d3-f492-49b3-b8aa-d080a8aad986
         L3 options
          [0]
               L5 at
                   value: 2018-01-23T14:23:24.670Z
               L5 checked
                   value: True
               L

### Create a Spark DataFrame by Subscribing to the Assessments Kafka Topic

In [11]:
raw_assessments = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessments").option("startingOffsets", "earliest").option("endingOffsets", "latest").load() 

In [12]:
raw_assessments.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [13]:
raw_assessments.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



### Convert Json Data as a string to a new DataFrame

In [14]:
assessments = raw_assessments.select(raw_assessments.value.cast('string'))

In [15]:
extracted_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()

### Register DataFrames as Temporary Tables

In [16]:
extracted_assessments.registerTempTable('assessments')

### Performing Spark SQL against the DataFrames created

In [17]:
spark.sql("select keen_id from assessments limit 10").show()

+--------------------+
|             keen_id|
+--------------------+
|5a6745820eb8ab000...|
|5a674541ab6b0a000...|
|5a67999d3ed3e3000...|
|5a6799694fc7c7000...|
|5a6791e824fccd000...|
|5a67a0b6852c2a000...|
|5a67b627cc80e6000...|
|5a67ac8cb0a5f4000...|
|5a67a9ba060087000...|
|5a67ac54411aed000...|
+--------------------+



In [18]:
spark.sql("select keen_timestamp, sequences.questions[0].user_incomplete from assessments limit 10").show()

+------------------+-------------------------------------------------------+
|    keen_timestamp|sequences[questions] AS `questions`[0][user_incomplete]|
+------------------+-------------------------------------------------------+
| 1516717442.735266|                                                   true|
| 1516717377.639827|                                                  false|
| 1516738973.653394|                                                  false|
|1516738921.1137421|                                                  false|
| 1516737000.212122|                                                  false|
| 1516740790.309757|                                                  false|
|1516746279.3801291|                                                  false|
| 1516743820.305464|                                                  false|
|  1516743098.56811|                                                  false|
| 1516743764.813107|                                                  false|

### Extracting/Unrolling Json Data

In [19]:
def my_lambda_sequences_id(x):
    raw_dict = json.loads(x.value)
    my_dict = {"keen_id" : raw_dict["keen_id"], "sequences_id" : raw_dict["sequences"]["id"]}
    return Row(**my_dict)

### Register DataFrames as Temporary Tables

In [20]:
my_sequences = assessments.rdd.map(my_lambda_sequences_id).toDF()

In [21]:
my_sequences.registerTempTable('sequences')

### Performing Spark SQL against the DataFrames that We Created

In [22]:
spark.sql("select sequences_id from sequences limit 10").show()

+--------------------+
|        sequences_id|
+--------------------+
|5b28a462-7a3b-42e...|
|5b28a462-7a3b-42e...|
|b370a3aa-bf9e-4c1...|
|b370a3aa-bf9e-4c1...|
|04a192c1-4f5c-4ac...|
|e7110aed-0d08-4cb...|
|5251db24-2a6e-424...|
|066b5326-e547-4da...|
|8ac691f8-8c1a-403...|
|066b5326-e547-4da...|
+--------------------+



In [23]:
spark.sql("select a.keen_id, a.keen_timestamp, s.sequences_id from assessments a join sequences s on a.keen_id = s.keen_id limit 10").show()

+--------------------+------------------+--------------------+
|             keen_id|    keen_timestamp|        sequences_id|
+--------------------+------------------+--------------------+
|5a17a67efa1257000...|1511499390.3836269|8ac691f8-8c1a-403...|
|5a26ee9cbf5ce1000...|1512500892.4166169|9bd87823-4508-4e0...|
|5a29dcac74b662000...|1512692908.8423469|e7110aed-0d08-4cb...|
|5a2fdab0eabeda000...|1513085616.2275269|cd800e92-afc3-447...|
|5a30105020e9d4000...|1513099344.8624721|8ac691f8-8c1a-403...|
|5a3a6fc3f0a100000...| 1513779139.354213|e7110aed-0d08-4cb...|
|5a4e17fe08a892000...|1515067390.1336551|9abd5b51-6bd8-11e...|
|5a4f3c69cc6444000...| 1515142249.858722|083844c5-772f-48d...|
|5a51b21bd0480b000...| 1515303451.773272|e7110aed-0d08-4cb...|
|5a575a85329e1a000...| 1515674245.348099|25ca21fe-4dbb-446...|
+--------------------+------------------+--------------------+



### Extracting/Unrolling Json Data

In [24]:
def my_lambda_questions(x):
    raw_dict = json.loads(x.value)
    my_list = []
    my_count = 0
    for l in raw_dict["sequences"]["questions"]:
        my_count += 1
        my_dict = {"keen_id" : raw_dict["keen_id"], "my_count" : my_count, "id" : l["id"]}
        my_list.append(Row(**my_dict))
    return my_list

In [25]:
my_questions = assessments.rdd.flatMap(my_lambda_questions).toDF()

### Register DataFrames as Temporary Tables

In [26]:
my_questions.registerTempTable('questions')

### Performing Spark SQL against the DataFrames that We Created

In [27]:
spark.sql("select id, my_count from questions limit 10").show()

+--------------------+--------+
|                  id|my_count|
+--------------------+--------+
|7a2ed6d3-f492-49b...|       1|
|bbed4358-999d-446...|       2|
|e6ad8644-96b1-461...|       3|
|95194331-ac43-454...|       4|
|95194331-ac43-454...|       1|
|bbed4358-999d-446...|       2|
|e6ad8644-96b1-461...|       3|
|7a2ed6d3-f492-49b...|       4|
|b9ff2e88-cf9d-4bd...|       1|
|bec23e7b-4870-49f...|       2|
+--------------------+--------+



In [28]:
spark.sql("select q.keen_id, a.keen_timestamp, q.id from assessments a join questions q on a.keen_id = q.keen_id limit 10").show()

+--------------------+------------------+--------------------+
|             keen_id|    keen_timestamp|                  id|
+--------------------+------------------+--------------------+
|5a17a67efa1257000...|1511499390.3836269|803fc93f-7eb2-412...|
|5a17a67efa1257000...|1511499390.3836269|f3cb88cc-5b79-41b...|
|5a17a67efa1257000...|1511499390.3836269|32fe7d8d-6d89-4db...|
|5a17a67efa1257000...|1511499390.3836269|5c34cf19-8cfd-4f5...|
|5a26ee9cbf5ce1000...|1512500892.4166169|0603e6f4-c3f9-4c2...|
|5a26ee9cbf5ce1000...|1512500892.4166169|26a06b88-2758-45b...|
|5a26ee9cbf5ce1000...|1512500892.4166169|25b6effe-79b0-4c4...|
|5a26ee9cbf5ce1000...|1512500892.4166169|6de03a9b-2a78-46b...|
|5a26ee9cbf5ce1000...|1512500892.4166169|aaf39991-fa83-470...|
|5a26ee9cbf5ce1000...|1512500892.4166169|aab2e817-73dc-4ff...|
+--------------------+------------------+--------------------+



### Handling Holes in Json Data

In [29]:
def my_lambda_correct_total(x):
    
    raw_dict = json.loads(x.value)
    my_list = []
    
    if "sequences" in raw_dict:
        
        if "counts" in raw_dict["sequences"]:
            
            if "correct" in raw_dict["sequences"]["counts"] and "total" in raw_dict["sequences"]["counts"]:
                    
                my_dict = {"correct": raw_dict["sequences"]["counts"]["correct"], 
                           "total": raw_dict["sequences"]["counts"]["total"]}
                my_list.append(Row(**my_dict))
    
    return my_list

### Register DataFrames as Temporary Tables

In [30]:
my_correct_total = assessments.rdd.flatMap(my_lambda_correct_total).toDF()

In [31]:
my_correct_total.registerTempTable('ct')

### Performing Spark SQL against the DataFrames that We Created

In [32]:
spark.sql("select * from ct limit 10").show()

+-------+-----+
|correct|total|
+-------+-----+
|      2|    4|
|      1|    4|
|      3|    4|
|      2|    4|
|      3|    4|
|      5|    5|
|      1|    1|
|      5|    5|
|      4|    4|
|      0|    5|
+-------+-----+



In [33]:
spark.sql("select correct / total as score from ct limit 10").show()

+-----+
|score|
+-----+
|  0.5|
| 0.25|
| 0.75|
|  0.5|
| 0.75|
|  1.0|
|  1.0|
|  1.0|
|  1.0|
|  0.0|
+-----+



In [34]:
spark.sql("select avg(correct / total)*100 as avg_score from ct limit 10").show()

+-----------------+
|        avg_score|
+-----------------+
|62.65699745547047|
+-----------------+



In [35]:
spark.sql("select stddev(correct / total) as standard_deviation from ct limit 10").show()

+-------------------+
| standard_deviation|
+-------------------+
|0.31086692286170553|
+-------------------+



# Part III: Spark SQL to Answer Business Questions

### 1. How many students took the class "Learning Git"? 

* 394 Students took the class "Learning Git".

In [36]:
spark.sql('SELECT count(*) FROM assessments WHERE exam_name like "Learning Git"').show()

+--------+
|count(1)|
+--------+
|     394|
+--------+



### 2. What is the most commonly taken course and what is the least popular course?

* The most commonly taken course was Learning Git with 394 students enrolled
* There were 4 least commonly take courses were tied 

In [37]:
spark.sql('SELECT exam_name as most_common, count(*) AS num_students FROM assessments GROUP BY exam_name ORDER BY num_students DESC Limit 1').show()

+------------+------------+
| most_common|num_students|
+------------+------------+
|Learning Git|         394|
+------------+------------+



In [38]:
spark.sql('SELECT exam_name AS least_common, count(*) AS num_students FROM assessments GROUP BY exam_name ORDER BY num_students ASC Limit 4').show()

+--------------------+------------+
|        least_common|num_students|
+--------------------+------------+
|Native Web Apps f...|           1|
|Learning to Visua...|           1|
|Nulls, Three-valu...|           1|
|Operating Red Hat...|           1|
+--------------------+------------+



# Part IV: Writing Parquet Files to HDFS for each Data Frame Created

In [39]:
extracted_assessments.write.mode('overwrite').parquet("/tmp/assessments")

In [40]:
my_sequences.write.mode('overwrite').parquet("/tmp/sequences")

In [41]:
my_questions.write.mode('overwrite').parquet("/tmp/questions")

In [42]:
my_correct_total.write.mode('overwrite').parquet("/tmp/correct_total")