## Project 2 Final Report

### Pipeline Description

Before conducting data transformations and analysis, a general understanding of how we got the data to this point is required. In our command line we spun up the data pipeline described in the docker-compose.yml file. It involved creating a zookeeper, kafka, cloudera, spark, and mids base container. After spinning that up using docker-compose, we checked our HDFS instance through cloudera in the /tmp/ folder. This check confirmed that we would have a place in HDFS to land our data. 

After also confirming that our Kafka and Zookeeper containers were up and running, the next step was to create our Kafka topic. A Kafka topic is basically a category you declare to organize data. This data is sent to and received from the topic in the form of messages (or packets of data). For this project we named the topic 'assessments' because the JSON data that we're reading in is assessments data. After topic creation we can confirm that the topic exists before we begin exploring our JSON file.

To help determine how we should read in the data, we used various jq commands to output various parts of the JSON file. From these commands we determined many useful facts about the data:
1. The JSON file has 3280 elements, numbered 0-3279, that each represent an assessment attempt. We will want each of these elements to be passed through the Kafka topic as a message. In total our topic will produce 3280 messages.
2. Each element has nested features specific to the individual attempt being described. The fields all seem uniform across elements apart from the "questions" array. We cannot assume that each assessment has the same number of questions, so we will have to figure out how to dynamically read that data in for each attempt. 
3. Inside the "questions" array, there's a nested "options" array. It's not clear what this data is really representing, so I'm assuming that it does not need to be included in our transformed data.
4. I also noticed from doing a pretty print of the whole JSON file that some of the values in the exam_name field are padded with whitespace. This means we'll either have to ensure they're trimmed when transforming OR use the base_exam_id field as a unique id for each assessment.

After running the exploratory data analysis in our command line, we will publish these messages (JSON elements) to our Kafka topic. We can then consume them from the topic and run the wc command to ensure that all 3280 messages were passed correctly. However, we don't just want them to be consumed by Kafka, we want to land the data into Spark.

### Transformations in Spark

In [1]:
raw_assess = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:29092") \
  .option("subscribe","assessments") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load() 

In [2]:
raw_assess.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

When passing data from Kafka into Spark, it automatically saves it as an RDD (Resilient Distributed Dataset) with key value pairs. The values are currently in hexadecimal format, so the data is not very meaningful without some base treansformations performed.

In [4]:
raw_assess.show()

+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|[7B 22 6B 65 65 6...|assessments|        0|     0|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     1|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     2|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     3|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     4|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     5|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     6|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     7|1969-12-31 23:59:...|   

Another thing to note is the length of the RDD coming into Spark from Kafka. This length corresponds to the number of messages that we produced in our Kafka topic and then consumed, which should be 3280 messages. In the context of the data, this represents 3280 assessments. 

In [5]:
raw_assess.count()

3280

Going back to the raw_assess RDD, all of the columns apart from the 'value' column, everything from key to topic-timestampType, are not required for our transformations or analysis. So, our first step will be to remove these unnecessary columns and cast the value column as a string so that we can start to see our data entries in a readable format. 

In [3]:
assessments = raw_assess.select(raw_assess.value.cast('string'))

In [7]:
assessments.show()

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 20 rows



From this string casting, we can start to see the data inside each of the elements in the JSON file we read from. However, before we continue our transformations we need to write this raw data as a parquet file to write it to HDFS (Hadoop Distributed File System). Once we have this version of the data saved, we can read it back in and continue applying transformations to it.  

In [4]:
assessments.write.parquet("/tmp/assessments")

In [5]:
read_assess = spark.read.parquet('/tmp/assessments')

In [6]:
from pyspark.sql.functions import from_json, col
import json
import pandas as pd
from pyspark.sql.functions import explode, split
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, ArrayType
import warnings
from pyspark.sql.functions import size
from pyspark.sql.functions import regexp_replace

Now using the JSON library, we can convert our RDD into DataFrame format for easier manipulation. After that we can print the schema to see what this function picked up about the format of each of the assessment elements from our nested JSON file. Remember that this is creating one row for each assessment attempt (3280 total - as confirmed by the length of the initial RDD) and each column represents an element of the base hierarchy within an assessment.

In [56]:
extracted_assessments = read_assess.rdd.map(lambda x: json.loads(x.value)).toDF()



In [57]:
extracted_assessments.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



We can see that the json.loads function correctly identified all of the objects at the top of the hierarchy. However, it cannot correctly unnest the objects within 'sequences'. From pretty printing the original JSON file using jq commands, we know that sequences contains an array of important values. It has a nested array representing the questions in the assessment and another array of values 'counts', which provides data on things like number of submitted attempts, incomplete attempts, etc. In order to get all of this data that the inferred schema did not pick up, we will have to forcibly define the schema shown below. 

In [7]:
final_schema = StructType([StructField('exam_name', StringType(), True),
                     StructField('base_exam_id', StringType(), True),
                     StructField('max_attempts', StringType(), True),
                     StructField('sequences', StructType([
                         StructField('questions', ArrayType(StringType()), True),
                         StructField('counts', StructType([
                             StructField('incomplete', StringType(), True),
                             StructField('submitted', StringType(), True),
                             StructField('incorrect', StringType(), True),
                             StructField('all_correct', StringType(), True),
                             StructField('correct', StringType(), True),
                             StructField('total', StringType(), True),
                             StructField('unanswered', StringType(), True),
                         ]))]))])

This is the final schema we chose to transform the assessment data into. To handle the multiple questions inside the "sequences" element, we declared "questions" as an ArrayType and then we can get the size of it to return the number of questions in that particular element. For the purposes of new customers querying the data, we believe that's the only relevant portion of the questions objects. We also included all the fields within the "counts" array because they provide interesting insights regarding the attempt.

In [8]:
final_extracted_assessments = read_assess.rdd.map(lambda x: json.loads(x.value)).toDF(schema=final_schema)

In [9]:
final_extracted_assessments.printSchema()

root
 |-- exam_name: string (nullable = true)
 |-- base_exam_id: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: struct (nullable = true)
 |    |-- questions: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- counts: struct (nullable = true)
 |    |    |-- incomplete: string (nullable = true)
 |    |    |-- submitted: string (nullable = true)
 |    |    |-- incorrect: string (nullable = true)
 |    |    |-- all_correct: string (nullable = true)
 |    |    |-- correct: string (nullable = true)
 |    |    |-- total: string (nullable = true)
 |    |    |-- unanswered: string (nullable = true)



After coverting the original RDD into a dataframe (and forcing the finalized schema), we can see that the nested fields
now appear correctly when we run .printSchema()

In [10]:
final_extracted_assessments.registerTempTable('final_ex_assessments_tb')

In [11]:
transf_assessments = spark.sql("select trim(exam_name) as exam_name, base_exam_id, max_attempts, sequences.questions, \
                                sequences.counts.incomplete, sequences.counts.submitted, sequences.counts.incorrect, \
                                sequences.counts.all_correct, sequences.counts.correct, sequences.counts.total, \
                                sequences.counts.unanswered from final_ex_assessments_tb")

To utilize Spark SQL features we have to first register our dataframe as a temporary table. From there, we can access the nested fields of the dataframe using the dot operator as shown above. This will essentially flatted the schema as shown when we print the schema out again below. We also generated the num_questions column as described previously by taking the size of the questions column. This resulting column represents the number of questions for a given exam and taking the *size* works because we declared that column to be an ArrayType.

In [26]:
transf_assessments = transf_assessments.withColumn("num_questions", size(col("questions")))

In [27]:
transf_assessments.printSchema()

root
 |-- exam_name: string (nullable = true)
 |-- base_exam_id: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- questions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- incomplete: string (nullable = true)
 |-- submitted: string (nullable = true)
 |-- incorrect: string (nullable = true)
 |-- all_correct: string (nullable = true)
 |-- correct: string (nullable = true)
 |-- total: string (nullable = true)
 |-- unanswered: string (nullable = true)
 |-- num_questions: integer (nullable = false)



In [28]:
transf_assessments.registerTempTable('transf_assessments_tb')

In [29]:
trans_assessment_out = spark.sql("select exam_name, base_exam_id, max_attempts, \
                                num_questions, incomplete, submitted, incorrect, \
                                all_correct, correct as num_correct, total, unanswered \
                                from transf_assessments_tb")

Now that the schema has been flattened, we no longer have to use the dot operator to reference fields. See an sample outout of the final transformed table below.

In [30]:
trans_assessment_out.show(5)

+--------------------+--------------------+------------+-------------+----------+---------+---------+-----------+-----------+-----+----------+
|           exam_name|        base_exam_id|max_attempts|num_questions|incomplete|submitted|incorrect|all_correct|num_correct|total|unanswered|
+--------------------+--------------------+------------+-------------+----------+---------+---------+-----------+-----------+-----+----------+
|Normal Forms and ...|37f0a30a-7464-11e...|         1.0|            4|         1|        4|        1|      false|          2|    4|         0|
|Normal Forms and ...|37f0a30a-7464-11e...|         1.0|            4|         2|        4|        1|      false|          1|    4|         0|
|The Principles of...|4beeac16-bb83-4d5...|         1.0|            4|         0|        4|        1|      false|          3|    4|         0|
|The Principles of...|4beeac16-bb83-4d5...|         1.0|            4|         2|        4|        0|      false|          2|    4|         0|

After transforming the data using Spark, we will again write this table to a parquet file, which will save it onto HDFS. Once we read it back in we can query it to emulate how a new customer's data science team would do. 

In [32]:
trans_assessment_out.write.parquet("/tmp/queryable_assessments")

3 Business Questions
1. How many assessments are on this platform?
2. What are the top 5 most frequently taken assessments?
3. Average number of questions? 
4. What the average score (num correct / total) for each assessment? 

Below, we are reading in the parquet file, registering a temporary table, and printing the schema to check that it was read in from HDFS correctly.

In [33]:
read_trans_assess = spark.read.parquet('/tmp/queryable_assessments')

In [34]:
read_trans_assess.registerTempTable('questions_tb')

In [35]:
read_trans_assess.printSchema()

root
 |-- exam_name: string (nullable = true)
 |-- base_exam_id: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- num_questions: integer (nullable = true)
 |-- incomplete: string (nullable = true)
 |-- submitted: string (nullable = true)
 |-- incorrect: string (nullable = true)
 |-- all_correct: string (nullable = true)
 |-- num_correct: string (nullable = true)
 |-- total: string (nullable = true)
 |-- unanswered: string (nullable = true)



### Sample Business Questions

Now that we have the transformed data successfully saved in HDFS, we ran some sample business questions to give new customers a taste of the kind of business questions they can get answers to from the assessments data. 

#### 1. How many assessments are on this platform?

In [36]:
q1 = spark.sql("select count (distinct base_exam_id), count(distinct exam_name) from questions_tb")
q1.show()

+----------------------------+-------------------------+
|count(DISTINCT base_exam_id)|count(DISTINCT exam_name)|
+----------------------------+-------------------------+
|                         107|                      103|
+----------------------------+-------------------------+



This question brought to light an issue with the data. As mentioned earlier in the report, some of the values in the "exam_name" column were padded with white space, so there could be multiple exam_name values for the same base_exam_id value. This query confirmed that suspicion. We had tried to trim the exam_name column to remove the white space padding, but that did not work clearly. This is still an open issue with the data.

#### 2. What are the top 5 most attempted assessments?

In [21]:
q2 = spark.sql("select exam_name, base_exam_id, count(*) as num_attempts from questions_tb \
            group by exam_name, base_exam_id \
            order by num_attempts DESC \
            limit 5")
q2.show(5,False)

+-------------------------------+------------------------------------+------------+
|exam_name                      |base_exam_id                        |num_attempts|
+-------------------------------+------------------------------------+------------+
|Learning Git                   |8b4488de-43a5-4ffa-bf82-af1e19ee1b64|394         |
|Introduction to Java 8         |41858ac3-1394-451b-bf7c-c10f52034a9a|158         |
|Intermediate Python Programming|1a233da8-e6e5-48a6-8c3c-806e312cce12|158         |
|Learning to Program with R     |b114e4a4-a192-4dff-a5cd-8e7782bb1623|128         |
|Introduction to Python         |7e2e0b53-a7ba-458d-8bc6-356f8dea8815|122         |
+-------------------------------+------------------------------------+------------+



#### 3. What are the average number of questions for an assessment on our platform?

In [24]:
q3 = spark.sql("with input as (select avg(num_questions) as avg_num_questions \
               from questions_tb \
                group by base_exam_id) \
                select avg(avg_num_questions) as avg_num_questions from input")
q3.show()

+-----------------+
|avg_num_questions|
+-----------------+
|4.317757009345795|
+-----------------+



### 4. What's the average score for each exam?

Score for an exam will be calculated using sequences.counts.correct / sequences.counts.total

In [149]:
q4 = spark.sql("select trim(exam_name), base_exam_id, round(avg(num_correct / total),2) as avg_score \
                from questions_tb \
                group by exam_name, base_exam_id")
q4.show(20)

+--------------------+--------------------+---------+
|     trim(exam_name)|        base_exam_id|avg_score|
+--------------------+--------------------+---------+
|Mastering Python ...|dd9e3175-45a4-491...|     0.74|
|Learning Linux Se...|76f39b28-76be-11e...|     0.62|
|Introduction to A...|f9de34a3-748d-11e...|     0.83|
|Learning C# Desig...|a8dedd1d-0f67-4f4...|     0.72|
|Learning Data Mod...|479f39cc-70a9-11e...|      0.5|
|An Introduction t...|9ff51f14-7525-11e...|     0.59|
|Git Fundamentals ...|e1f07fac-5566-4fd...|     0.75|
|Hadoop Fundamenta...|b4da3808-7474-11e...|     0.67|
|Amazon Web Servic...|f0633ed7-748d-11e...|      0.5|
|Using R for Big D...|526908f1-9c67-4ab...|     0.56|
|Introduction to A...|0161dadc-748e-11e...|     0.55|
|The Closed World ...|f224b886-745e-11e...|      1.0|
|Data Visualizatio...|5a1a0072-47c5-478...|     0.49|
|Starting a Grails...|b0011798-80dd-4cb...|     0.75|
|Modeling for Soft...|f83fae20-8d97-40d...|     0.65|
|        Learning SQL|a3cf5a