# Project 2: Tracking User Activity

In this project, you work at an ed-tech firm. You've created a service that
delivers assessments, and now lots of different customers (e.g., Pearson) want
to publish their assessments on it. You need to get ready for data scientists
who work for these customers to run queries on the data. 

Included below are directions for data pipeline setup, assumptions about the data entered into the pipeline, table creation using pyspark and table writing to HDFS using parquet, as well as examples of applicable business questions.


## Data Pipeline Setup Linux Commands

### Docker
***
##### Spin up and Check Cluster (run everytime to start up cluster)
* remove stray containers or networks to avoid port conflicts

```
docker-compose up -d
docker-compose ps
docker ps -a
```
***
### Kafka 
***
##### Create kafka topic assessments
```
docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```
##### Check kafka topic (optional)
```
docker-compose exec kafka kafka-topics --describe --topic assessments --zookeeper zookeeper:32181
```

##### Publish messages to kafka using kafkacat
```
docker-compose exec mids bash -c "cat /w205/project-2-Cuffnela/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments"
```
***

### PySpark 
***
##### Spin up pyspark
```
docker-compose exec spark pyspark
```
***

## Assumptions

There are some assumptions that we make about our data as we unroll and query the data. The assumptions we have made in this Jupyter notebook are the following:

* User_id is unique to an exam taker. i.e. they have the same user_id for each exam they take within this system. And all entries have a user_id
* The order of question_ids within the questions list is in the order the questions appear on the exam.
* Exam names are consistent throughout the Json file. For example, the exam name is always Intermediate Python Programming and never abbreviated to Int. Python Programming or Python Programming (Intermediate).

## Use PySpark to Create Queriable Tables

### Subscribe to Kakfa assessments topic and cache

In [294]:
raw_assessments = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessments").option("startingOffsets", "earliest").option("endingOffsets", "latest").load() 

raw_assessments.cache()

raw_assessments.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



### Create Assessments Table

The assessments table unrolls the outer layer of the initial json file. Values within this table are still nested, which can be seen in the columns with map() values.

In [512]:
assessments = raw_assessments.select(raw_assessments.value.cast('string'))

In [513]:
import json
from pyspark.sql import Row

extracted_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()

In [514]:
extracted_assessments.registerTempTable('assessments')

In [515]:
extracted_assessments.show()

+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|Map(questions -> ...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|Map(questions -> ...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false

Write out to HDFS.

In [516]:
extracted_assessments.write.mode("overwrite").parquet("/tmp/assessments")

### Create Sequences Table 

The `sequence_id` is nested within the `sequences` dictionary. The lambda function below pulls out the `sequences_id` using the [] operator.

In [517]:
def my_lambda_sequences_id(x):
    raw_dict = json.loads(x.value)
    my_dict = {"keen_id" : raw_dict["keen_id"], "sequences_id" : raw_dict["sequences"]["id"]}
    return Row(**my_dict)

my_sequences = assessments.rdd.map(my_lambda_sequences_id).toDF()

In [518]:
my_sequences.registerTempTable('sequences')

Write out to hdfs.

In [519]:
my_sequences.write.mode("overwrite").parquet("/tmp/sequences")

##### SQL example queries

In [520]:
spark.sql("select distinct sequences_id from sequences limit 10").show(truncate=False)

+------------------------------------+
|sequences_id                        |
+------------------------------------+
|ced04db4-0842-47ee-bfa2-8cf45db9f433|
|355710e7-c2ad-46bc-8844-0cb71a9148f0|
|7214e626-ac4b-42be-ad86-109e764c6d85|
|9acbffb6-6ec0-41e5-aaf0-f24bb64040de|
|cec4308a-64dc-484e-ac96-f87e74c33a75|
|3585eaaa-512d-4ff5-9f36-aeb54477b1c5|
|1a64dcab-b2d8-4fd0-a294-62ef4b090491|
|1be180d1-2c0b-4ca9-b39c-09bf3d3761a9|
|f02fa40c-5572-458f-b1ed-2d45248df61c|
|0b14f3a0-7068-411f-bad6-e0eb4cc64e6e|
+------------------------------------+



The following sql joins together the assessments temporary table and the sequences temporary table, joining over the `keen_id`.

In [521]:
spark.sql("select a.keen_id, a.keen_timestamp, s.sequences_id from assessments a join sequences s on a.keen_id = s.keen_id limit 10").show()


+--------------------+------------------+--------------------+
|             keen_id|    keen_timestamp|        sequences_id|
+--------------------+------------------+--------------------+
|5a17a67efa1257000...|1511499390.3836269|8ac691f8-8c1a-403...|
|5a26ee9cbf5ce1000...|1512500892.4166169|9bd87823-4508-4e0...|
|5a29dcac74b662000...|1512692908.8423469|e7110aed-0d08-4cb...|
|5a2fdab0eabeda000...|1513085616.2275269|cd800e92-afc3-447...|
|5a30105020e9d4000...|1513099344.8624721|8ac691f8-8c1a-403...|
|5a3a6fc3f0a100000...| 1513779139.354213|e7110aed-0d08-4cb...|
|5a4e17fe08a892000...|1515067390.1336551|9abd5b51-6bd8-11e...|
|5a4f3c69cc6444000...| 1515142249.858722|083844c5-772f-48d...|
|5a51b21bd0480b000...| 1515303451.773272|e7110aed-0d08-4cb...|
|5a575a85329e1a000...| 1515674245.348099|25ca21fe-4dbb-446...|
+--------------------+------------------+--------------------+



### Create Questions Table

This table assigns a number to each problem in a user's test. Here we assume that the order of the ids stored in the value of the questions is the same order the questions appear to the exam user. This table also includes boolean values for if the user was correct if the question was incomplete, and if the user submitted this question. 

The lambda function below extracts information from multi-level nest dictionaries. We do not assume that user_correct, user_imcomplete, and user_submitted exist for each question. As such, we must check these keys exist in the questions dictionary. If they do not exists they are cast as NULL otherwise their value is added to the list. 

In [522]:
def my_lambda_questions(x):
    raw_dict = json.loads(x.value)
    my_list = []
    my_count = 0
    for l in raw_dict["sequences"]["questions"]:
        my_count += 1
        for x in ["user_correct", "user_incomplete","user_submitted"]:
            if x in l:
                continue
            else:
                l[x]=None
        my_dict = {"keen_id" : raw_dict["keen_id"], "number" : my_count, "question_id" : l["id"], 
                  "correct" : l["user_correct"], "incomplete" : l["user_incomplete"], 
                   "submitted" : l["user_submitted"]}
        my_list.append(Row(**my_dict))
    return my_list

my_questions = assessments.rdd.flatMap(my_lambda_questions).toDF()

In [523]:
my_questions.registerTempTable('questions')

Write out to hdfs.

In [524]:
my_questions.write.mode("overwrite").parquet("/tmp/questions")

##### SQL example queries

This query only pulls information from the questions table. The query result shows the question id, the number of the question on the user's exam, the result, and incomplete status.

In [525]:
spark.sql("select question_id, number, correct, incomplete from questions limit 10").show()

+--------------------+------+-------+----------+
|         question_id|number|correct|incomplete|
+--------------------+------+-------+----------+
|7a2ed6d3-f492-49b...|     1|  false|      true|
|bbed4358-999d-446...|     2|  false|     false|
|e6ad8644-96b1-461...|     3|   true|     false|
|95194331-ac43-454...|     4|   true|     false|
|95194331-ac43-454...|     1|   true|     false|
|bbed4358-999d-446...|     2|  false|      true|
|e6ad8644-96b1-461...|     3|  false|     false|
|7a2ed6d3-f492-49b...|     4|  false|      true|
|b9ff2e88-cf9d-4bd...|     1|  false|     false|
|bec23e7b-4870-49f...|     2|   true|     false|
+--------------------+------+-------+----------+



The query below joins the assessments table and questions table to include the exam name in addition to the question id, number of the question within an exam, and the result. 

In [526]:
spark.sql("select q.keen_id, q.question_id, a.exam_name, q.number as question_number, q.correct from assessments a join questions q on a.keen_id = q.keen_id  where exam_name = 'A Practical Introduction to React.js' limit 10").show()


+--------------------+--------------------+--------------------+---------------+-------+
|             keen_id|         question_id|           exam_name|question_number|correct|
+--------------------+--------------------+--------------------+---------------+-------+
|5a4f3c69cc6444000...|a6effaf7-94ba-458...|A Practical Intro...|              1|   true|
|5a4f3c69cc6444000...|dab47905-63c6-46b...|A Practical Intro...|              2|   true|
|5a4f3c69cc6444000...|7ff41d87-1f73-406...|A Practical Intro...|              3|   true|
|5a4f3c69cc6444000...|80aad87e-c7b2-4e1...|A Practical Intro...|              4|   true|
|5a3c02bc6863ce000...|80aad87e-c7b2-4e1...|A Practical Intro...|              1|  false|
|5a3c02bc6863ce000...|a6effaf7-94ba-458...|A Practical Intro...|              2|   true|
|5a3c02bc6863ce000...|dab47905-63c6-46b...|A Practical Intro...|              3|  false|
|5a3c02bc6863ce000...|7ff41d87-1f73-406...|A Practical Intro...|              4|   true|
|5a4f3ba34ce2fc000...

The query below joins all three previously created tables to combine this information. This new table can be stored as a view or temporary table for additional queries.

In [528]:
spark.sql("select q.keen_id, a.keen_timestamp, s.sequences_id, q.question_id, a.exam_name, q.number from assessments a join questions q on a.keen_id = q.keen_id join sequences s on s.keen_id = q.keen_id where s.sequences_id = 'ced04db4-0842-47ee-bfa2-8cf45db9f433' limit 30").show()


+--------------------+------------------+--------------------+--------------------+--------------------+------+
|             keen_id|    keen_timestamp|        sequences_id|         question_id|           exam_name|number|
+--------------------+------------------+--------------------+--------------------+--------------------+------+
|5a156f32621ed9000...|1511354162.7768071|ced04db4-0842-47e...|5a244dab-fea1-492...|Networking for Pe...|     1|
|5a156f32621ed9000...|1511354162.7768071|ced04db4-0842-47e...|3fd6f8d9-3f2a-4d8...|Networking for Pe...|     2|
|5a156f32621ed9000...|1511354162.7768071|ced04db4-0842-47e...|09201e96-9485-44a...|Networking for Pe...|     3|
|5a1579f36d8efa000...|  1511356915.58588|ced04db4-0842-47e...|3fd6f8d9-3f2a-4d8...|Networking for Pe...|     1|
|5a1579f36d8efa000...|  1511356915.58588|ced04db4-0842-47e...|5a244dab-fea1-492...|Networking for Pe...|     2|
|5a1579f36d8efa000...|  1511356915.58588|ced04db4-0842-47e...|09201e96-9485-44a...|Networking for Pe...|

### Create Time Table

This table pulls the starting time and ending time for each exam. Ending time is the final (maximum) timestamp for each assessment. Ending time is extracted from a dictionary nested inside a list that is nested inside multiple dictionaries. Not all questions have an `at` key which is a possible ending time. 

The lambda function below finds all `at` times given for each question on an assessment. It then assigns the latest (max) `at` time as the ending time for the assessment. To maximize parallelization, the lambda function converts both the starting time and ending time into an easier-to-use DateTime format and then calculates the exam time in seconds. 

Note: There are clear anomalies in the time data. The minimum exam time is approximately -88 seconds and there are 18 assessments, roughly 0.5%, that return a negative exam time. As the end-time generated by the lambda function is the maximum of all `at` times this implies that all `at` times in that assessment occur before the end time. 

In [529]:
from datetime import datetime

def my_lambda_times(x):
    raw_dict = json.loads(x.value)
    my_list = []
    times = []
    
    for l in raw_dict["sequences"]["questions"]:
        if "options" in l:
            for j in l["options"]:
                if "at" in j:
                    times.append(j['at'])
                    
    start_time = datetime.strptime(raw_dict["started_at"] , '%Y-%m-%dT%H:%M:%S.%fZ')
    if len(times) == 0:
        total = 0
        end_time = None
    else:
        end_time = datetime.strptime(max(times) , '%Y-%m-%dT%H:%M:%S.%fZ')
        total = (end_time - start_time).total_seconds()
        
    my_dict = {"keen_id" : raw_dict["keen_id"], "started_at" : start_time, "ended_at" : end_time,
                "exam_time" : total }
    my_list.append(Row(**my_dict))
    return my_list

my_times = assessments.rdd.flatMap(my_lambda_times).toDF()

In [530]:
my_times.show(truncate = False)

+-----------------------+---------+------------------------+-----------------------+
|ended_at               |exam_time|keen_id                 |started_at             |
+-----------------------+---------+------------------------+-----------------------+
|2018-01-23 14:24:00.807|41.725   |5a6745820eb8ab00016be1f1|2018-01-23 14:23:19.082|
|2018-01-23 14:22:55.494|67.989   |5a674541ab6b0a0001c6e723|2018-01-23 14:21:47.505|
|2018-01-23 20:22:52.423|29.839   |5a67999d3ed3e300016ef0f1|2018-01-23 20:22:22.584|
|2018-01-23 20:21:59.075|48.242   |5a6799694fc7c70001034706|2018-01-23 20:21:10.833|
|2018-01-23 19:49:56.912|74.435   |5a6791e824fccd00018c3ff9|2018-01-23 19:48:42.477|
|2018-01-23 20:53:08.554|81.272   |5a67a0b6852c2a00018891fa|2018-01-23 20:51:47.282|
|2018-01-23 22:30:58.374|384.182  |5a67b627cc80e60001343664|2018-01-23 22:24:34.192|
|2018-01-23 21:43:39.445|33.296   |5a67ac8cb0a5f400017d9919|2018-01-23 21:43:06.149|
|2018-01-23 21:31:31.708|23.248   |5a67a9ba0600870001247a04|2018-

In [531]:
my_times.registerTempTable('times')

Write out to hdfs.

In [532]:
my_times.write.mode("overwrite").parquet("/tmp/times")


##### SQL example queries

Summary data for all assessments in the database.

In [534]:
spark.sql("select min(exam_time)/60 as min_min, max(exam_time)/60 as max_min, avg(exam_time)/60 as avg_min, std(exam_time)/60 as std_min from times t limit 20").show(truncate=False)


+--------+-----------------+-----------------+-----------------+
|min_min |max_min          |avg_min          |std_min          |
+--------+-----------------+-----------------+-----------------+
|-1.46735|963.7744166666666|7.360868511722728|60.54505247064996|
+--------+-----------------+-----------------+-----------------+



Summary data broken down by exam. This is down by joining the times temporary table and assessments temporary table.

In [535]:
spark.sql("select a.exam_name, min(t.exam_time) as min_sec, avg(t.exam_time) as avg_sec, max(t.exam_time) as max_sec from times t join assessments a on t.keen_id = a.keen_id group by exam_name order by avg_sec desc limit 10").show(truncate=False)


+----------------------------------------------------+-------+------------------+---------+
|exam_name                                           |min_sec|avg_sec           |max_sec  |
+----------------------------------------------------+-------+------------------+---------+
|HTML5 The Basics                                    |11.212 |11210.113038461535|57826.465|
|Architectural Considerations for Hadoop Applications|24.135 |2549.5588         |24861.206|
|Client-Side Data Storage for Web Developers         |131.608|2362.5325000000003|4593.457 |
|Introduction to Shiny                               |7.686  |2038.1042962962965|23624.016|
|Getting Ready for Angular 2                         |22.917 |1133.0826666666667|3325.964 |
|Hadoop Fundamentals for Data Scientists             |11.708 |1079.3948333333333|6356.736 |
|Practical Java Programming                          |9.521  |967.199811320755  |37912.611|
|Using Web Components                                |34.801 |888.8933333333333 

Assessments in the database with negative exam times.

In [536]:
spark.sql("select * from times t where(exam_time<0)  limit 30").show(truncate=False)


+-----------------------+---------+------------------------+-----------------------+
|ended_at               |exam_time|keen_id                 |started_at             |
+-----------------------+---------+------------------------+-----------------------+
|2017-12-25 17:11:08.649|-88.041  |5a41318a1786c600014c7978|2017-12-25 17:12:36.69 |
|2017-12-25 17:09:51.873|-29.84   |5a41313d5682ed000101d065|2017-12-25 17:10:21.713|
|2017-12-25 17:10:47.386|-63.75   |5a413176f21cc20001f47b56|2017-12-25 17:11:51.136|
|2018-01-07 13:48:54.597|-1.996   |5a52254b4fc7c7000149956b|2018-01-07 13:48:56.593|
|2017-12-11 14:30:31.919|-60.501  |5a2e96e4aaa9b70001d283b6|2017-12-11 14:31:32.42 |
|2018-01-25 10:20:17.121|-24.823  |5a69af8b0a940000016a73f8|2018-01-25 10:20:41.944|
|2018-01-17 05:00:04.452|-18.991  |5a5ed87fbe96950001e665e2|2018-01-17 05:00:23.443|
|2018-01-01 22:03:41.578|-9.148   |5a4ab04b08a1890001f55fed|2018-01-01 22:03:50.726|
|2018-01-01 22:03:33.82 |-9.178   |5a4ab043d93f790001c1ac36|2018-

## Create Results Table 

The custom lambda function below unrolls nested exam data relevant to the responses for each exam attempt in the data. This nesting is three levels deep. A new temporary table `results` contains the number of `keen_id`, the number of correct responses, and the number of total questions on the exam. We assume here that all assessments contain values for `correct` and `total`.

In [537]:
def my_lambda_results(x):
    
    raw_dict = json.loads(x.value)
    my_list = []
    
    if "sequences" in raw_dict:
        
        if "counts" in raw_dict["sequences"]:
            
            if "correct" in raw_dict["sequences"]["counts"] and "total" in raw_dict["sequences"]["counts"]:
                    
                my_dict = {"keen_id" : raw_dict["keen_id"],"correct": raw_dict["sequences"]["counts"]["correct"], 
                           "total": raw_dict["sequences"]["counts"]["total"]}
                my_list.append(Row(**my_dict))
    
    return my_list

my_results = assessments.rdd.flatMap(my_lambda_results).toDF()

my_results.registerTempTable('results')

Write out to hdfs.

In [538]:
my_results.write.mode("overwrite").parquet("/tmp/results")


##### SQL example queries

Using sql queries we can join this table with the previous assessments table, to get the exam name, and then calculate the exam score using `correct/total*100`. 

In [539]:
spark.sql("select a.keen_id, a.exam_name, r.correct, r.total, r.correct / r.total*100 as score from assessments a join results r on a.keen_id = r.keen_id limit 10").show()

+--------------------+--------------------+-------+-----+-----+
|             keen_id|           exam_name|correct|total|score|
+--------------------+--------------------+-------+-----+-----+
|5a17a67efa1257000...|Intermediate Pyth...|      1|    4| 25.0|
|5a26ee9cbf5ce1000...|Learning to Progr...|      7|    7|100.0|
|5a29dcac74b662000...|        Learning Git|      4|    5| 80.0|
|5a2fdab0eabeda000...|        Learning SQL|      3|    4| 75.0|
|5a30105020e9d4000...|Intermediate Pyth...|      1|    4| 25.0|
|5a3a6fc3f0a100000...|        Learning Git|      5|    5|100.0|
|5a4e17fe08a892000...|Introduction to H...|      0|    1|  0.0|
|5a4f3c69cc6444000...|A Practical Intro...|      4|    4|100.0|
|5a51b21bd0480b000...|        Learning Git|      5|    5|100.0|
|5a575a85329e1a000...|Beginning C# Prog...|      3|    4| 75.0|
+--------------------+--------------------+-------+-----+-----+



Average score across all exams.

In [540]:
spark.sql("select round(avg(correct / total)*100,2) as average_score from results limit 10").show()

+-------------+
|average_score|
+-------------+
|        62.66|
+-------------+



Standard deviation of scores across all exams.

In [541]:
spark.sql("select stddev(correct / total)*100 as standard_deviation from results limit 10").show()

+------------------+
|standard_deviation|
+------------------+
|31.086692286170553|
+------------------+



# Sample Buisness Questions

Below you will find several examples of business questions that can be answered using this dataset and the temporary tables created using the custom lambda functions above. Each example below provides the business question, corresponding SQL query, and output (limited to 10 lines).

## How many assessments are in the dataset?

In [542]:
spark.sql("select count(*) as count from assessments").show(truncate=False)

+-----+
|count|
+-----+
|3280 |
+-----+



## How many people took [insert exam]?
This example is for the course "Introduction to Python".

In [543]:
spark.sql("select count(*) as count from assessments where exam_name = 'Introduction to Python'").show(truncate=False)

+-----+
|count|
+-----+
|162  |
+-----+



## What are the top 10 most popular assessments?

In [544]:
spark.sql("select exam_name, count(*) as count from assessments group by exam_name order by count desc limit 10").show(truncate=False)

+-----------------------------------------------------------+-----+
|exam_name                                                  |count|
+-----------------------------------------------------------+-----+
|Learning Git                                               |394  |
|Introduction to Python                                     |162  |
|Introduction to Java 8                                     |158  |
|Intermediate Python Programming                            |158  |
|Learning to Program with R                                 |128  |
|Introduction to Machine Learning                           |119  |
|Software Architecture Fundamentals Understanding the Basics|109  |
|Beginning C# Programming                                   |95   |
|Learning Eclipse                                           |85   |
|Learning Apache Maven                                      |80   |
+-----------------------------------------------------------+-----+



## Which assessments have the most retakes?

In [545]:
spark.sql("select exam_name, count(*)-count(distinct user_exam_id) as retakes from assessments group by exam_name order by retakes desc limit 10").show(truncate=False)

+-------------------------------------------------------+-------+
|exam_name                                              |retakes|
+-------------------------------------------------------+-------+
|Beginning C# Programming                               |12     |
|Learning C# Best Practices                             |6      |
|Learning Git                                           |4      |
|Learning C# Design Patterns                            |4      |
|Intermediate C# Programming                            |4      |
|Introduction to Big Data                               |2      |
|An Introduction to d3.js: From Scattered to Scatterplot|2      |
|Learning DNS                                           |2      |
|Intermediate Python Programming                        |2      |
|A Practical Introduction to React.js                   |0      |
+-------------------------------------------------------+-------+



## What are the five assessments with the lowest average score?

In [546]:
spark.sql("select a.exam_name, avg(s.correct / s.total)*100 as avg_score from assessments a join ct s on a.keen_id = s.keen_id group by a.exam_name order by avg_score asc limit 5").show(truncate=False)


+-------------------------------------------+------------------+
|exam_name                                  |avg_score         |
+-------------------------------------------+------------------+
|Client-Side Data Storage for Web Developers|20.0              |
|Native Web Apps for Android                |25.0              |
|View Updating                              |25.0              |
|Arduino Prototyping Techniques             |33.33333333333333 |
|Mastering Advanced Git                     |36.029411764705884|
+-------------------------------------------+------------------+



## What are the five assessments with the highest average score?


In [547]:
spark.sql("select a.exam_name, avg(s.correct / s.total)*100 as avg_score from assessments a join ct s on a.keen_id = s.keen_id group by a.exam_name order by avg_score desc limit 5").show(truncate=False)


+-------------------------------------------------+-----------------+
|exam_name                                        |avg_score        |
+-------------------------------------------------+-----------------+
|The Closed World Assumption                      |100.0            |
|Learning to Visualize Data with D3.js            |100.0            |
|Nulls, Three-valued Logic and Missing Information|100.0            |
|Learning SQL for Oracle                          |97.72727272727273|
|Introduction to Java 8                           |87.59493670886073|
+-------------------------------------------------+-----------------+



## What was the most missed question on the Learning Git Exam?

In [548]:
spark.sql("select q.question_id, round(count( case when q.correct='false' then 1 else null end)/count(*)*100,2) as percent_incorrect, round(count( case when q.correct='true' then 1 else null end)/count(*)*100, 2) as percent_correct from questions q join assessments a on q.keen_id = a.keen_id where a.exam_name = 'Learning Git' group by q.question_id order by percent_incorrect desc limit 1").show(truncate=False)

+------------------------------------+-----------------+---------------+
|question_id                         |percent_incorrect|percent_correct|
+------------------------------------+-----------------+---------------+
|5cc84ec3-713c-45f0-a9b8-711a369d0dfc|44.33            |55.67          |
+------------------------------------+-----------------+---------------+



## Is there a relationship between when a user is asked a question and their performance on the question?

The sequel below returns a table with the total number of incorrect response for each question for each observed numeric placement of that question on the exam for 'Intermediate Python Programming'. This can be used to answer the question above with the additional of some additional statistical exploration. 

In [549]:
spark.sql("select q.question_id, q.number, count(q.question_id) as incorrect from questions q join assessments a on q.keen_id = a.keen_id where q.correct = 'false' and a.exam_name = 'Intermediate Python Programming' group by q.question_id, q.number order by question_id, incorrect desc limit 10").show(truncate=False)

+------------------------------------+------+---------+
|question_id                         |number|incorrect|
+------------------------------------+------+---------+
|32fe7d8d-6d89-4db4-a17a-a368c5ea3ca0|3     |28       |
|32fe7d8d-6d89-4db4-a17a-a368c5ea3ca0|4     |24       |
|32fe7d8d-6d89-4db4-a17a-a368c5ea3ca0|1     |19       |
|32fe7d8d-6d89-4db4-a17a-a368c5ea3ca0|2     |19       |
|3477bf63-4db1-4351-8d8c-914b5fc46266|2     |1        |
|3477bf63-4db1-4351-8d8c-914b5fc46266|1     |1        |
|3477bf63-4db1-4351-8d8c-914b5fc46266|3     |1        |
|3f9ff9ee-4497-49b2-8c7c-ac1ce5d7bd92|4     |1        |
|3f9ff9ee-4497-49b2-8c7c-ac1ce5d7bd92|3     |1        |
|5c34cf19-8cfd-4f56-91c2-0a109dc990b9|2     |22       |
+------------------------------------+------+---------+



## Shutdown 


### Clean exit if using Jupyter notebook or pyspark 
***
Clean exit notebook
```
Save and Checkpoint
Close and Halt
```

Clean exit pyspark
```
exit()
```

***

### Tear Down Cluster
***
```
docker-compose down
```
***

### Check everything is down
***
```
docker-compose ps
docker ps -a
```
***