# Project 2 Report

## Summary

This project uses kafka to consume messages in a batch fashion from a given json file. Spark is then used to consume and transform those messages and unroll them appropriately into a format that can be soted in HDFS and queried using spark sql.

## Description of the pipeline

The .yml file contains information required to spin up each of the containers. The first image is a zookeeper image that works with kafka to track the status of nodes in the Kafka cluster and maintain a list of Kafka topics and messages. The second image is kafka, which is the broker that can receive messages produced by a producer and store them in a topic. We use only 1 broker, 1 partition and no replications for this image. We chose the topic name assessments because the data is about assessments.

The next image is cloudera which is ecosystem that let's us use hadoop and HDFS to store the data. We use Spark as the service to consume the messages, and it depends on the cloudera image so that we can save data in HDFS. We expose port 8889 so we can connect to it later for running our notebook. We use spark to consume messages from kafka, unroll the nested json data using a forced schema and store it into HDFS as parquet files. We can then use spark sql to query the data. The last container is the MIDS container which is a ubuntu machine to mirror the host machine.

## Description of the Data

The data is provided in a form of a json file and we consume it at once in a batch fashion. The data dscribes a series of assessments taken by users on various topics. It is in a nested form with general information about the assessment and the user on the first level. The sequences field on the first level containts 4 more keys. One of the keys is questions. The questions key contains a value of each of the questions in that particular assessment. Each assessment has a different number of questions, which makes unrolling this json file more difficult. There are ways to unroll it using ArrayType when forcing the schema, but for our purposes the information under questions is not required, so I have not attempted to do that. Each question further has keys for all the options for that question and other information related to how the user asnwered that question. The other key under sequences is counts which contains all the useful information regarding the users performance in the assessment.

The questions that are answered using the queries at the end of this report ar

1. What is the count of assessments?
2. Which are the most common courses taken?3Which are the least common courses taken?
3. Which are the least common courses taken?
4. How many assessments have all questions answered correctly by the users and how many are not?
5. What is the distribution of questions per assessment?
6. What courses have the highest scores by the user who took assessments in that course?


## Relevant portion of the history file

The commands used to create the pipeline are shown below. Note that I restarted my virtual machine just before running these, so the command history from earlier in the semester was lost. 


    16  docker-compose up -d
    17  docker-compose logs zookeeper | grep -i binding
    18  docker-compose logs kafka | grep -i started
    19  docker-compose ps
    20  docker-compose logs kafka
    22  docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
    23  docker-compose exec kafka kafka-topics --describe --topic assessments --zookeeper zookeeper:32181
    24  docker-compose exec mids bash -c "cat /w205/project-2-drkulkarni236/assessment-attempts-20180128-121051-nested.json | jq ".[]" -c | kafkacat -P -b kafka:29092 -t assessments "
    25  docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:29092 --topic assessments --from-beginning
    27  docker-compose exec mids bash -c "kafkacat -C -b kafka:29092 -t assessments -o beginning -e"
    29  history > drkulkarni236-history.txt
    
    
Following is a brief description of what the commands are doing.

- Line 16 - We first spin up our containers using docer-compose according to the docker-compose.yml file. 
- Line 17-20 - Sanity checks to make sure all ther services are up.
- Line 22 - Create a kafka topic, i.e. the feed name to wchi hthe records are stored and published.
- Line 23 - Sanity check to make sure kafka topic is created.
- Line 24 - Execute through mids bash, open the data from the json file, pipe it to jq to convert it into different messages (because kafka needs a newline at the end of every message), -c to make the messages compact, use kafkacat as producer (-P) with particular broker (-b) and topic name (-t) to produce the messages.
- Line 25, 27 - Sanity check, consume messages to make sure they were produced properly.
- Line 29 - Save all the command to history file. 

The last command to open this notebook is missing from the history file as it was run after creating the history file.
    
    docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8889 --ip 0.0.0.0 --allow-root --notebook-dir=/w205/' pyspark

## Using spark to unroll .json file.

In [1]:
# Import libraries
import json
import pandas as pd
from pyspark.sql.functions import explode, split
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import warnings

In [2]:
# Read raw data from the kafka topic
raw_assessments = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:29092") \
  .option("subscribe","assessments") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load() 

In [3]:
# Cache the data
raw_assessments.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [4]:
# Cast the data as string because kafka sends the data in binary fromat
assessments = raw_assessments.select(raw_assessments.value.cast('string'))

In [5]:
# Check the assessments object
assessments.show()

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 20 rows



In [6]:
# Look at first message
assessments.collect()[0]

Row(value='{"keen_timestamp":"1516717442.735266","max_attempts":"1.0","started_at":"2018-01-23T14:23:19.082Z","base_exam_id":"37f0a30a-7464-11e6-aa92-a8667f27e5dc","user_exam_id":"6d4089e4-bde5-4a22-b65f-18bce9ab79c8","sequences":{"questions":[{"user_incomplete":true,"user_correct":false,"options":[{"checked":true,"at":"2018-01-23T14:23:24.670Z","id":"49c574b4-5c82-4ffd-9bd1-c3358faf850d","submitted":1,"correct":true},{"checked":true,"at":"2018-01-23T14:23:25.914Z","id":"f2528210-35c3-4320-acf3-9056567ea19f","submitted":1,"correct":true},{"checked":false,"correct":true,"id":"d1bf026f-554f-4543-bdd2-54dcf105b826"}],"user_submitted":true,"id":"7a2ed6d3-f492-49b3-b8aa-d080a8aad986","user_result":"missed_some"},{"user_incomplete":false,"user_correct":false,"options":[{"checked":true,"at":"2018-01-23T14:23:30.116Z","id":"a35d0e80-8c49-415d-b8cb-c21a02627e2b","submitted":1},{"checked":false,"correct":true,"id":"bccd6e2e-2cef-4c72-8bfa-317db0ac48bb"},{"checked":true,"at":"2018-01-23T14:23:41.

In [7]:
# Write raw data in parquet format
assessments.write.parquet("/tmp/assessments")

In [8]:
# Read from parquet
read_assessments = spark.read.parquet('/tmp/assessments')
read_assessments.show()

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 20 rows



## Spark Infered Schema

In [9]:
# Convert to data frame
extracted_assessments = assessments.rdd.map(lambda x: json.loads(x.value)).toDF()



In [10]:
# Look at the data frame, the data is not unrolled properly due to the nested nature of the json file
extracted_assessments.show()

+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|Map(questions -> ...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|Map(questions -> ...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false

In [11]:
# Looking at the schema, it's all messed up
extracted_assessments.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



# Force the Schema

In [12]:
# We look at the json file and force a schema that makes sense.
final_schema = StructType([StructField('keen_timestamp', StringType(), True),
                     StructField('max_attempts', StringType(), True),
                     StructField('started_at', StringType(), True),
                     StructField('base_exam_id', StringType(), True),
                     StructField('user_exam_id', StringType(), True),
                     StructField('keen_created_at', StringType(), True),
                     StructField('certification', StringType(), True),
                     StructField('keen_id', StringType(), True),
                     StructField('exam_name', StringType(), True),
                     StructField('sequences', StructType([
                         StructField('attempt', StringType(), True),
                         StructField('id', StringType(), True),
                         StructField('questions', StringType(), True),                          
                         StructField('counts', StructType([
                             StructField('incomplete', StringType(), True),
                             StructField('submitted', StringType(), True),
                             StructField('incorrect', StringType(), True),
                             StructField('all_correct', StringType(), True),
                             StructField('correct', StringType(), True),
                             StructField('total', StringType(), True),
                             StructField('unanswered', StringType(), True)
                         ]))]))])

In [13]:
# Convert to data frame using the forced schema
focused_extracted_assessments = assessments.rdd.map(lambda x: json.loads(x.value)).toDF(schema=final_schema)

In [14]:
# Inspecting one element, it makes more sense now
focused_extracted_assessments.show(5)

+------------------+------------+--------------------+--------------------+--------------------+------------------+-------------+--------------------+--------------------+--------------------+
|    keen_timestamp|max_attempts|          started_at|        base_exam_id|        user_exam_id|   keen_created_at|certification|             keen_id|           exam_name|           sequences|
+------------------+------------+--------------------+--------------------+--------------------+------------------+-------------+--------------------+--------------------+--------------------+
| 1516717442.735266|         1.0|2018-01-23T14:23:...|37f0a30a-7464-11e...|6d4089e4-bde5-4a2...| 1516717442.735266|        false|5a6745820eb8ab000...|Normal Forms and ...|[1,5b28a462-7a3b-...|
| 1516717377.639827|         1.0|2018-01-23T14:21:...|37f0a30a-7464-11e...|2fec1534-b41f-441...| 1516717377.639827|        false|5a674541ab6b0a000...|Normal Forms and ...|[1,5b28a462-7a3b-...|
| 1516738973.653394|         1.0|20

In [15]:
# Printing the force schema, it looks like what we want
focused_extracted_assessments.printSchema()

root
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- started_at: string (nullable = true)
 |-- base_exam_id: string (nullable = true)
 |-- user_exam_id: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- sequences: struct (nullable = true)
 |    |-- attempt: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- questions: string (nullable = true)
 |    |-- counts: struct (nullable = true)
 |    |    |-- incomplete: string (nullable = true)
 |    |    |-- submitted: string (nullable = true)
 |    |    |-- incorrect: string (nullable = true)
 |    |    |-- all_correct: string (nullable = true)
 |    |    |-- correct: string (nullable = true)
 |    |    |-- total: string (nullable = true)
 |    |    |-- unanswered: string (nullable = true)



In [55]:
# Write filtered data in parquet format
focused_extracted_assessments.write.parquet("/tmp/focussed_assessments")

In [17]:
# Create temp table using the filtered data
focused_extracted_assessments.registerTempTable('focused_assessments')

In [25]:
# Count of assessments
spark.sql("select count(*) from focused_assessments").show()

+--------+
|count(1)|
+--------+
|    3280|
+--------+



In [29]:
# Which are the most common courses taken?
spark.sql("select exam_name, count(user_exam_id) as user_counts from focused_assessments \
          group by exam_name order by user_counts DESC limit 10").show()

+--------------------+-----------+
|           exam_name|user_counts|
+--------------------+-----------+
|        Learning Git|        394|
|Introduction to P...|        162|
|Intermediate Pyth...|        158|
|Introduction to J...|        158|
|Learning to Progr...|        128|
|Introduction to M...|        119|
|Software Architec...|        109|
|Beginning C# Prog...|         95|
|    Learning Eclipse|         85|
|Learning Apache M...|         80|
+--------------------+-----------+



In [28]:
# Which are the least common courses taken?
spark.sql("select exam_name, count(user_exam_id) as user_counts from focused_assessments\
          group by exam_name order by user_counts limit 10").show()

+--------------------+-----------+
|           exam_name|user_counts|
+--------------------+-----------+
|Nulls, Three-valu...|          1|
|Learning to Visua...|          1|
|Native Web Apps f...|          1|
|Operating Red Hat...|          1|
|Client-Side Data ...|          2|
|Arduino Prototypi...|          2|
|What's New in Jav...|          2|
|Understanding the...|          2|
|Hibernate and JPA...|          2|
|Learning Spring P...|          2|
+--------------------+-----------+



In [27]:
# How many assessments have all questions answered correctly by the users and how many are not?
spark.sql("select count(*), sequences.counts.all_correct from focused_assessments \
          group by sequences.counts.all_correct").show()

+--------+-----------+
|count(1)|all_correct|
+--------+-----------+
|       5|       null|
|    2434|      false|
|     841|       true|
+--------+-----------+



In [26]:
# What is the distribution of questions per assessment?
spark.sql("select count(*), CAST(sequences.counts.total as int) as total_questions from focused_assessments \
          group by total_questions order by total_questions").show()

+--------+---------------+
|count(1)|total_questions|
+--------+---------------+
|       5|           null|
|      56|              1|
|      21|              2|
|     239|              3|
|    1646|              4|
|     940|              5|
|     122|              6|
|     177|              7|
|      59|              8|
|      14|             10|
|       1|             20|
+--------+---------------+



In [54]:
# What courses have the highest scores by the user who took assessments in that course?
spark.sql("select exam_name, avg(percent_correct) as avg_correct_percent from \
          (select exam_name, 100*CAST(sequences.counts.correct as int)/CAST(sequences.counts.total as int) \
         as percent_correct from focused_assessments)t \
          group by exam_name order by avg_correct_percent DESC").show()

+--------------------+-------------------+
|           exam_name|avg_correct_percent|
+--------------------+-------------------+
|The Closed World ...|              100.0|
|Nulls, Three-valu...|              100.0|
|Learning to Visua...|              100.0|
|Learning SQL for ...|  97.72727272727273|
|Introduction to J...|  87.59493670886076|
|Introduction to A...|  83.33333333333333|
|Introduction to A...|  83.33333333333333|
|Getting Ready for...|               80.0|
|Cloud Native Arch...|               80.0|
|Understanding the...|  78.57142857142857|
|Introduction to A...|  76.92307692307692|
|Beginning Program...|  76.58227848101266|
|Learning Apache H...|            76.5625|
|Refactor a Monoli...|  76.47058823529412|
|Starting a Grails...|               75.0|
|Using Storytellin...|               75.0|
|Introduction to H...|               75.0|
|Git Fundamentals ...|               75.0|
|   Python Epiphanies|  74.18300653594771|
|Mastering Python ...|               74.0|
+----------