In [1]:
# Importing necessary libraries 
import json
import pandas as pd
from pyspark.sql import Row
import pprint
from pyspark.sql.functions import desc

## Linux commands for the entire pipeline

#### 1. Docker commands to bring up/ tear down/ check out the Docker cluster: 
    docker-compose up -d  
    docker-compose down  
    docker-compose ps -a   
     
    
#### 2. Command to create the kafka topic with a partition and replication for kafka cluster using zookeeper as its cluster manager:   
    docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181  
    
    
#### 3. Command to publish the assessments json data to the kafka topic using kafkacat:  
    docker-compose exec mids bash -c "cat /w205/project-2-alicehua11/assessments-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessment && echo 'Produced assessment.'"  
    
    
#### 4. Command to bring up Jupyter notebook by creating a symbolic link in Spark container to /205 mount point:
    docker-compose exec spark bash  
    ln -s /w205 w205  
    exit  
    docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark
    
#### 5. Command to check out Hadoop HDFS parquet files  
    docker-compose exec cloudera hadoop fs -ls /tmp/
    

## Part 1: Create a data frame by subscribing to the kakfa topic

In [2]:
# Ask for the entire Kafka topic from beginning to end 
raw_assessments = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessment").option("startingOffsets", "earliest").option("endingOffsets", "latest").load() 

In [3]:
# Cache the PySpark dataframe to persists the data in memory for subsequent actions without reevaluation
raw_assessments.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [4]:
# A quick peak at the dataframe, notice the topic that was created called assessment with the partition in Kafka
raw_assessments.show()

+----+--------------------+----------+---------+------+--------------------+-------------+
| key|               value|     topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------+---------+------+--------------------+-------------+
|null|[7B 22 6B 65 65 6...|assessment|        0|     0|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessment|        0|     1|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessment|        0|     2|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessment|        0|     3|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessment|        0|     4|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessment|        0|     5|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessment|        0|     6|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessment|        0|     7|1969-12-31 23:59:...|            0|

In [5]:
# Check out the schema of this PySpark dataframe
raw_assessments.printSchema()
type(raw_assessments)

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



pyspark.sql.dataframe.DataFrame

## Part 2: Convert the json data as a string into dataframe

In [6]:
# Selecting the value column from the earlier dataframe and cast it hexadecimal codes into string for analysis
assessments = raw_assessments.select(raw_assessments.value.cast('string'))

In [7]:
# Recheck the schema to see if value is casted correctly
assessments.printSchema()

root
 |-- value: string (nullable = true)



In [8]:
# Take a quick peak at the newly casted dataframe
assessments.show()

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 20 rows



In [9]:
# Check out the first row of the dataframe
assessments.first()

Row(value='{"keen_timestamp":"1516717442.735266","max_attempts":"1.0","started_at":"2018-01-23T14:23:19.082Z","base_exam_id":"37f0a30a-7464-11e6-aa92-a8667f27e5dc","user_exam_id":"6d4089e4-bde5-4a22-b65f-18bce9ab79c8","sequences":{"questions":[{"user_incomplete":true,"user_correct":false,"options":[{"checked":true,"at":"2018-01-23T14:23:24.670Z","id":"49c574b4-5c82-4ffd-9bd1-c3358faf850d","submitted":1,"correct":true},{"checked":true,"at":"2018-01-23T14:23:25.914Z","id":"f2528210-35c3-4320-acf3-9056567ea19f","submitted":1,"correct":true},{"checked":false,"correct":true,"id":"d1bf026f-554f-4543-bdd2-54dcf105b826"}],"user_submitted":true,"id":"7a2ed6d3-f492-49b3-b8aa-d080a8aad986","user_result":"missed_some"},{"user_incomplete":false,"user_correct":false,"options":[{"checked":true,"at":"2018-01-23T14:23:30.116Z","id":"a35d0e80-8c49-415d-b8cb-c21a02627e2b","submitted":1},{"checked":false,"correct":true,"id":"bccd6e2e-2cef-4c72-8bfa-317db0ac48bb"},{"checked":true,"at":"2018-01-23T14:23:41.

## Part 3: Extract / unrolls the json data into new dataframes to answer you business questions

In [10]:
# Using RDD method map() and lambda to massively process
# the string content of value column into dictionary key, value to into PySpark dataframe
extracted_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()


# Using a flatMap() and custom lambda to unnest sequence col
def my_lambda_questions(x):
    raw_dict = json.loads(x.value)
    my_list = []
    my_count = 0
    for l in raw_dict["sequences"]["questions"]:
        my_count += 1
        my_dict = {"exam_name" : raw_dict["exam_name"], "my_count" : my_count}
        my_list.append(Row(**my_dict))
    return my_list

# Get the unrolled questions per assessment 
questions = assessments.rdd.flatMap(my_lambda_questions).toDF()


# Using flatMap() and custom lamda to unnest the score of sequence
def lambda_correct_total(x):
    raw_dict = json.loads(x.value)
    my_list = []
    if "sequences" in raw_dict:
        if "counts" in raw_dict["sequences"]:
            if "correct" in raw_dict["sequences"]["counts"] and "total" in raw_dict["sequences"]["counts"]:
                my_dict = {"exam_name": raw_dict["exam_name"],
                           "correct": raw_dict["sequences"]["counts"]["correct"], 
                           "total": raw_dict["sequences"]["counts"]["total"],
                           "incorrect": raw_dict["sequences"]["counts"]["incorrect"],
                           "unanswered": raw_dict["sequences"]["counts"]["unanswered"],
                           "incomplete": raw_dict["sequences"]["counts"]["incomplete"]}
                my_list.append(Row(**my_dict))
    return my_list

#Get the unrolled score dataframe
correct_total = assessments.rdd.flatMap(lambda_correct_total).toDF()

## Part 4: Register dataframes as temporary tables to allow in memory queries against them

In [11]:
# Register the PySpark DataFrame as a Spark "TempTable" aka "View" in BigQuery
extracted_assessments.registerTempTable('extracted_assessments')
questions.registerTempTable('questions')
correct_total.registerTempTable('ct')

## Part 5: Perform SQL queries against the datframes you registered 

#### Question 1: How many assessments are in the dataset?

In [12]:
spark.sql("select count(*) as total_assessments from extracted_assessments").show()

+-----------------+
|total_assessments|
+-----------------+
|             3280|
+-----------------+



#### Question 2: What is the least common courses taken?

In [13]:
spark.sql("select exam_name, count(*) as course_count from extracted_assessments group by exam_name order by course_count limit 10").show(truncate=False)

+-------------------------------------------------+------------+
|exam_name                                        |course_count|
+-------------------------------------------------+------------+
|Nulls, Three-valued Logic and Missing Information|1           |
|Native Web Apps for Android                      |1           |
|Learning to Visualize Data with D3.js            |1           |
|Operating Red Hat Enterprise Linux Servers       |1           |
|The Closed World Assumption                      |2           |
|Arduino Prototyping Techniques                   |2           |
|What's New in JavaScript                         |2           |
|Learning Spring Programming                      |2           |
|Hibernate and JPA Fundamentals                   |2           |
|Understanding the Grails 3 Domain Model          |2           |
+-------------------------------------------------+------------+



#### Question 3:  What are and the most common courses taken?

In [14]:
spark.sql("select exam_name, count(*) as course_count from extracted_assessments group by exam_name order by course_count desc limit 10").show(truncate=False)

+-----------------------------------------------------------+------------+
|exam_name                                                  |course_count|
+-----------------------------------------------------------+------------+
|Learning Git                                               |394         |
|Introduction to Python                                     |162         |
|Intermediate Python Programming                            |158         |
|Introduction to Java 8                                     |158         |
|Learning to Program with R                                 |128         |
|Introduction to Machine Learning                           |119         |
|Software Architecture Fundamentals Understanding the Basics|109         |
|Beginning C# Programming                                   |95          |
|Learning Eclipse                                           |85          |
|Learning Apache Maven                                      |80          |
+------------------------

#### Question 4: What are some courses with the most number of questions?

In [15]:
spark.sql("select exam_name, max(my_count) as num_ques from questions group by exam_name order by num_ques desc limit 10").show(truncate=False)

+------------------------------------------+--------+
|exam_name                                 |num_ques|
+------------------------------------------+--------+
|Operating Red Hat Enterprise Linux Servers|20      |
|Great Bash                                |10      |
|Learning Linux System Administration      |8       |
|Understanding the Grails 3 Domain Model   |7       |
|Learning to Program with R                |7       |
|What's New in JavaScript                  |7       |
|Introduction to Data Science with R       |7       |
|Being a Better Introvert                  |7       |
|Arduino Inputs                            |6       |
|Using Web Components                      |6       |
+------------------------------------------+--------+



#### Question 5: What are the summary statistics of average scores for all courses?

In [16]:
avg_score = spark.sql("select exam_name, round((avg(correct / total)*100), 2) as avg_score from ct group by exam_name order by avg_score desc")
avg_score.select("avg_score").describe().show()

+-------+------------------+
|summary|         avg_score|
+-------+------------------+
|  count|               102|
|   mean| 61.43039215686273|
| stddev|14.974404559911491|
|    min|              20.0|
|    max|             100.0|
+-------+------------------+



#### Question 6: What are the courses with the highest number of perfect score?

In [17]:
score = spark.sql("select exam_name, (correct / total)*100 as score from ct order by score desc")
score.filter(score.score == "100.0").groupBy(score.exam_name).count().sort(desc('count')).limit(10).show(truncate=False)

+-------------------------------------+-----+
|exam_name                            |count|
+-------------------------------------+-----+
|Learning Git                         |130  |
|Introduction to Java 8               |94   |
|Introduction to Machine Learning     |46   |
|Beginning Programming with JavaScript|29   |
|Advanced Machine Learning            |24   |
|Intermediate Python Programming      |23   |
|Git Fundamentals for Web Developers  |21   |
|Learning Apache Maven                |21   |
|Learning Eclipse                     |21   |
|Learning to Program with R           |20   |
+-------------------------------------+-----+



#### Question 7: What are the courses with highest number of scores lower than 50?

In [18]:
score.filter(score.score < "50.0").groupBy(score.exam_name).count().sort(desc('count')).limit(10).show(truncate=False)

+-----------------------------------------------------------+-----+
|exam_name                                                  |count|
+-----------------------------------------------------------+-----+
|Learning Git                                               |107  |
|Introduction to Python                                     |67   |
|Learning to Program with R                                 |58   |
|Intermediate Python Programming                            |56   |
|Software Architecture Fundamentals Understanding the Basics|42   |
|Beginning C# Programming                                   |28   |
|Learning C# Best Practices                                 |23   |
|Introduction to Data Science with R                        |23   |
|Introduction to Machine Learning                           |22   |
|Mastering Advanced Git                                     |20   |
+-----------------------------------------------------------+-----+



#### Question 8: What are the courses with highest number of incomplete and unanswered questions?

In [19]:
incomplete = correct_total.select("exam_name", "incomplete", "unanswered").groupBy("exam_name").sum().sort(desc("sum(incomplete)")).limit(10)
incomplete.show(truncate=False)

+-----------------------------------------------------------+---------------+---------------+
|exam_name                                                  |sum(incomplete)|sum(unanswered)|
+-----------------------------------------------------------+---------------+---------------+
|Learning to Program with R                                 |153            |23             |
|Introduction to Python                                     |126            |42             |
|Introduction to Data Science with R                        |72             |2              |
|Beginning C# Programming                                   |64             |8              |
|Intermediate Python Programming                            |50             |27             |
|Introduction to Machine Learning                           |49             |1              |
|Learning Eclipse                                           |49             |7              |
|Software Architecture Fundamentals Understanding the Basics

In [20]:
unanswered = correct_total.select("exam_name", "incomplete", "unanswered").groupBy("exam_name").sum().sort(desc("sum(unanswered)")).limit(10)
unanswered.show(truncate=False)

+--------------------------------------------------------------+---------------+---------------+
|exam_name                                                     |sum(incomplete)|sum(unanswered)|
+--------------------------------------------------------------+---------------+---------------+
|Learning Git                                                  |0              |119            |
|Introduction to Python                                        |126            |42             |
|Software Architecture Fundamentals Understanding the Basics   |41             |32             |
|Intermediate Python Programming                               |50             |27             |
|Mastering Advanced Git                                        |14             |24             |
|Learning to Program with R                                    |153            |23             |
|Learning C# Best Practices                                    |30             |21             |
|Great Bash                   

## Part 6: Write PySpark dataframe to Hadoop HDFS in Parquet format to create a batch and serving layers scale up SQL

In [21]:
raw_assessments.write.mode('overwrite').parquet("/tmp/raw_assessments")
assessments.write.mode("overwrite").parquet("/tmp/assessments")
extracted_assessments.write.mode("overwrite").parquet("/tmp/extracted_assessments")
questions.write.mode("overwrite").parquet("/tmp/questions")
correct_total.write.mode("overwrite").parquet("/tmp/correct_total")

## Appendix to view the json data in pprint format

In [22]:
p = pprint.PrettyPrinter(indent=1)
f = open("assessment-attempts-20180128-121051-nested.json","r")
s = f.read()
json_data = json.loads(s)
f.close()

In [23]:
# this will pretty print the json in alphabetic order which may or may not match the file order
p.pprint(json_data[79])

{'base_exam_id': '94b741b2-fc67-4db4-adc2-aafae130848f',
 'certification': 'false',
 'exam_name': 'Intermediate C# Programming',
 'keen_created_at': '1512281212.867346',
 'keen_id': '5a23947cf84983000111f60a',
 'keen_timestamp': '1512281212.867346',
 'max_attempts': '1.0',
 'sequences': {'attempt': 1,
               'counts': {'all_correct': False,
                          'correct': 1,
                          'incomplete': 0,
                          'incorrect': 3,
                          'submitted': 4,
                          'total': 4,
                          'unanswered': 0},
               'id': '4e0847c6-88fe-4c45-8ff9-820ad406f1f9',
               'questions': [{'id': '12ea2f83-fec4-4032-a1af-92ed8e1cd2d1',
                              'options': [{'at': '2017-12-03T06:05:34.832Z',
                                           'checked': True,
                                           'id': '3ebac309-aa96-4c0a-a478-82fc9c748b8c',
                                     