# Project 2 Notebook
### Class: W205.5 - Data Engineering
    Student: Blair Jones

    Date: 23 Oct 2020

## Context
The company has created a service that delivers assessments. Now lots of different customers (e.g., Pearson) want to publish their assessments on the service. This work is to prepare for data scientists who work for these customers to run queries on the data. 

## Objective / Problem Statement
- Create a data pipeline to receive data from customers, transform it into a form suitable for usage by data scientists, and save it into storage that will be accessible to the data scientists.

### Sample customer usage

Typical business queries expected by our customers include:
- Are a majority of students consistently achieving high scores on certain exams?  If so, it may be time to adjust the complexity level of the exam.
- Which are the least popular exams?  Should they be redesigned, replaced or retired?
- Which exam questions have the most wrong responses?  Should they be redesigned, replaced or retired?

## Solution Approach
- The architecture is intended to receive data from assessment companies, and transform and publish it in a way that the customer's data scientists can analyze the information.
- A critical consideration is that our solution must protect each customer's data.  Each customer's data must be logically partitioned to prevent inadvertent access to the wrong information.
- For now, we will consider that a user presenting credentials granting access to a specific customer's data will have unlimited access to that data.  Role-based access controls may be considered in the future.


## Notebook setup

In [5]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import SVG


## Solution Overview

### Logical component view

<div style='background-color: gray'>
    <img src='./images/pipeline-overall.svg' />
</div>

## Technology Choices (for in-scope components)

#### Driving principles
The solution requires low-latency, scalability and portability.  It must also be flexibility to adapt as new functional requirements are identified as customers add more datasets and require additional types of queries.


#### Technologies
- Data Sources (event logs)
    - Customer API not in scope.  To be designed and implemented in another project.
    
    
- Streaming Context
    - Topic Queues: Kafka
    - Data Transformation: Spark
    
    
- Distributed Storage
    - Files:  Hadoop (HDFS)
    

- Runtime Platform
    - Containers: Docker, Docker-Compose.  All other technologies will be run inside Docker containers.  The use of Docker ensure portability to different hosting options.
    - Hosting: Google Cloud Platform (GCP).  All components will be deployed to GCP for development and testing.  The choice of production environment can be made later.  


- Query Tools
    - Not in scope
    

## Solution Design

We will approach the design simultaneously from two directions:
* From the customer data input side (event logs) to identify the input data that is currently available to us.
* From the customer usage side (queries) to identify the way the output data will be used.  And to identify if any additional data should be requested or specified in the input.

### Exploratory Analysis of Incoming Data

A sample dataset was provided to aid in the design and implementation of the solution.  We explored some preliminary questions to understand the current design.

- How many assessments are in the dataset?
- How many people took Learning Git?
- What is the least common course taken? And the most common?


How many assessments are in the dataset?

In [9]:
%cat assessment-attempts-20180128-121051-nested.json | jq '.[].exam_name' | wc -l

3280


How many unique exams are in the dataset?

In [17]:
%cat assessment-attempts-20180128-121051-nested.json | jq '.[].exam_name' | sort | uniq | wc -l

103


Note

- While inspecting the list of exam names, we found an entry entitled "Example Exam For Development and Testing oh yeahsdf".  This looks like a special case not intended for production.  Through the use of additional queries we determined that there are a total of 5 entries with this name in the dataset, and they have a different data structure from the other entries.

<span style='font-size: 1.2em; color: blue;'> Recommendation </span>
- A policy should be developed for content publishers so that data quality is not degraded by inadvertent inclusion of non-production data.

In [21]:
%cat assessment-attempts-20180128-121051-nested.json | jq '.[].exam_name' | grep 'yeahsdf' | wc -l

5


How many assessments were there for "Learning Git"?

In [8]:
%cat assessment-attempts-20180128-121051-nested.json | jq '.[] | select(.exam_name=="Learning Git") | .exam_name' | wc -l

394


What is the least common course taken? And the most common?

In [69]:
import subprocess
cmd = "cat assessment-attempts-20180128-121051-nested.json | jq '.[].exam_name' | sort | uniq -c | sort -n"
exam_count = subprocess.getoutput(cmd)
exam_count = exam_count.split("\n")
print('The least popular exam is ',exam_count[0].split('"')[-2])
print('The MOST popular exam is ',exam_count[-1].split('"')[-2])

The least popular exam is  Learning to Visualize Data with D3.js
The MOST popular exam is  Learning Git


*** Key findings from data exploration ***
* The sample dataset may contain corrupt data.  On first inspection (conducted by loading the dataset into Pandas in a separate notebook) it seems that the detailed exam question responses do not tally with the given summary statistics for each test.
* There are duplicate assessment ids in the sample file, however the detailed questions and responses differ.  This represents 170 entries out for the entire dataset of 3280 entries, over 38 assessment id's (keen id's
* There sample dataset contains 5 entries for development/test data (description is "Example Exam For Development and Testing oh yeahsdf").  A policy is needed for whether or not these types of entries will be accepted in incoming data feeds and how they will be treated.

### Topics 

A Topic is a queue that, in its most simplistic form, takes in event data and makes that data available to various consumers.  The events stored can be of any level of granularity, held for any length of time, and consumed by anyone and by multiple parties.

The selection of Topic(s) for this project is driven by business needs.  Specifically, we want to represent meaningful business entities that data scientists then analyze.

This initial design provides a single Topic design for all customers.  In the future special topics may be designed for specific customers.

**Input-side Perspective**

> From the initial data exploration, some natural candidate entities are:
>   * Exams - each Exam is represented with summary metadata and statistics.
>   * Assessments - each individual Assessment across all Exams is represented with summary metadata and statistics.  The sample dataset is organized with 1 entry for each assessment completed.
>   * Questions - each individual Question across all Assessments is represented, along with the detailed responses from exam-takers.  This entity contains some potentially interesting data related to timings and number of times a student selected an option, which enables analysis of user interactions with the assessment system.


**Query-side Perspective**

> Users are likely to need a view onto these entities:
>    * Assessments - for purpose of analysing across exams, types of exams and assessments, within and across different families.  Users can summarize this entity to view Exams.
>    * Questions - as described above.
>    * Students - to test hypotheses related to student profiles, learning needs and assessment performance.  Non-identifiable data.

These entities do cover the example customer queries described in the Problem Statement section of this document.


<span style='font-size: 1.2em; color: blue;'> Recommendation </span>

- Based on the format and contents of the sample file, we recommend moving forward with Assessments as the primary Input Topic.  This will be implemented in Kafka.

- For the data that will be used to support queries, we will use Assessments and Questions as the primary Entities.  These will be stored in Hadoop.

- We also recommend including *non-identifiable* Student Profile data in the future specification of minimum incoming data.



### Data Transformation

The Exams / Assessments data is extracted easily from the incoming data stream.

Questions are more challenging.  Preliminary data exploration indicates that the data is deeply nested within each assessment, and can contain a variable number of questions.

The objective is to extract each question as a single row, but maintain a link back to the original exam from which it came.  Other metadata could be extracted as well.  But once the basic task of extracting questions and their link to the exam is accomplished, extracting other fields becomes trivial.

For the purpose of this project, only the question and the link to the main exam are used.

### Data Storage

Storage in Hadoop is simplified after the transformations achieved with Spark.  We simply save the dataframe generated through the transformations into an appropriately named Hadoop fileset.


## Solution Implementation

### Environment Launch

In [None]:
!docker-compose up -d

### 1) Kafka Topic

**Topic Setup**

In [43]:
!docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181

Created topic assessments.


In [63]:
!docker-compose exec kafka kafka-topics --describe --topic assessments --zookeeper zookeeper:32181

Topic: assessments	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: assessments	Partition: 0	Leader: 1	Replicas: 1	Isr: 1


**Sample dataset load into Kafka Topic**

In [65]:
!docker-compose exec mids bash -c "cat /w205/project-2-bjonesneu/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments && echo 'Produced messages.'"

Produced messages.


In [66]:
!docker-compose exec mids bash -c "kafkacat -C -b kafka:29092 -t assessments -o beginning -e" | wc -l

3281


### 2) Hadoop Storage

**Verify Status**

In [49]:
!docker-compose exec cloudera hadoop fs -ls /tmp/

Found 2 items
drwxrwxrwt   - mapred mapred              0 2018-02-06 18:27 /tmp/hadoop-yarn
drwx-wx-wx   - root   supergroup          0 2020-10-19 15:39 /tmp/hive


### 3) Spark Stream

This section is executed using the CLI.  The code used is copied here for reference.

First, we launch the pyspark interface on the Spark container.

```code
CLI:   docker-compose exec spark pyspark
```

Next, we run these commands inside pyspark to read data from the kafka topic, cast it from binary to string, and convert from string to json and from there to a dataframe.

```
PYSPARK:  
    raw_assessments = spark \
      .read \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "kafka:29092") \
      .option("subscribe","assessments") \
      .option("startingOffsets", "earliest") \
      .option("endingOffsets", "latest") \
      .load() 

    raw_assessments.cache()

    raw_assessments.printSchema()

    assessments = raw_assessments.select(raw_assessments.value.cast('string'))

    from pyspark.sql import Row
    import json
    extracted_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()
    extracted_assessments.printSchema()
```

Output from extracted_assessments.printSchema()
```
        root
         |-- base_exam_id: string (nullable = true)
         |-- certification: string (nullable = true)
         |-- exam_name: string (nullable = true)
         |-- keen_created_at: string (nullable = true)
         |-- keen_id: string (nullable = true)
         |-- keen_timestamp: string (nullable = true)
         |-- max_attempts: string (nullable = true)
         |-- sequences: map (nullable = true)
         |    |-- key: string
         |    |-- value: array (valueContainsNull = true)
         |    |    |-- element: map (containsNull = true)
         |    |    |    |-- key: string
         |    |    |    |-- value: boolean (valueContainsNull = true)
         |-- started_at: string (nullable = true)
         |-- user_exam_id: string (nullable = true)
```

As observed with the *printSchema* method and confirmed from initial exploration on dataset, the data contains a nested structure.

#### Assessments Entity

We next use SparkSQL to interact with the data in a nicer way.

```
PYSPARK:  extracted_assessments.registerTempTable('assessments')
```

We select key fields and exclude the test data identified in the sample dataset.

```code
PYSPARK:  
    assessment_info = spark.sql("select base_exam_id, exam_name, certification, started_at from assessments WHERE NOT (exam_name LIKE '%yeahsdf%')")

    assessment_info.select('base_exam_id').distinct().count()
```

There are 106 unique exams within the sample dataset.  We could save just the data for these exams out to Hadoop along with the last time the test was taken, but it would remove the information related to how often the exams are used, which can be useful.  So we will save the full set of assessment data related to exams to Hadoop.

At this point it would be useful to extract "counts" metadata to store at the overall assessment level.  "Counts" is a field nested within the "sequences" field at the root level.  It contains the summary values of assessment results for a single assessment (for example:  # of correct/incorrect/incomplete answers).  Using spark.sql to select sequences.counts yields null values because the mapping function above finds inconsistencies in counts across the entire dataset and ignores the field.  The technique to extract this type of information is implemented in the next section but is not used in this section for brevity.

We next save the data to Hadoop.
```code
PYSPARK:  assessment_info.write.parquet("/tmp/assessment_info")
```

And then verify the operation completed successfully.

```code
CLI:  docker-compose exec cloudera hadoop fs -ls /tmp/
CLI:  docker-compose exec cloudera hadoop fs -ls /tmp/assessment_info
```

This shows that a file for **assessment_info** was created and that the internal partition contains data.  We will demonstrate use of the data in Hadoop in another section of this notebook.

Please note that, per recommendation, in the future for production usage a unique client identifier should be added to the dataset to logically partition the data for access controls

#### Questions Entity

In this section we extract all questions from each assessment, and store each as an individual row with a reference to the base_exam_id.  This allows data scientists to run analysis, such as determining level of difficulty, time spent per question, frequency of changing answers, etc.

Please note that for this section we use the entire sample dataset, including the 5 "development" assessments.

```code
PYSPARK:  
    from pyspark.sql.functions import udf
    from pyspark.sql.types import *
    from pyspark.sql.functions import explode

    @udf(ArrayType(StringType()))
    def flatten_questions(assessment):
        assessment_json = json.loads(assessment)
        questions_flattened = []
        for question in assessment_json['sequences']['questions']:
            questions_flattened.append(question)
        return questions_flattened

    # This line was used to test the "python" version (without @udf) of the flatten function
    # test = flatten_questions('{"base_exam_id":999, "sequences": {"questions": ["a", "b", "c"], "counts": 123} }')

    @udf('string')
    def flatten_ids(assessment):
        assessment_json = json.loads(assessment)
        return assessment_json['base_exam_id']

    flattened_questions = raw_assessments \
        .select(raw_assessments.value.cast('string').alias('raw')) \
        .withColumn('q_array', flatten_questions('raw')) \
        .withColumn('base_exam_id', flatten_ids('raw')) \
        .withColumn("question", explode('q_array')) \
        .drop('q_array')
    flattened_questions.show(2)
    flattened_questions.count()
```

The 3280 assessments in the sample dataset were converted into 14,717 questions.

The output was then written to Hadoop and verified.

```code
PYSPARK:  flattened_questions.write.parquet("/tmp/assessment_questions")
CLI:      docker-compose exec cloudera hadoop fs -ls /tmp/assessment_questions
```

<span style='font-size: 1.5em; color:blue'><b>Special Note</b></span>

The solution provided for the "Questions" entity works and addresses the problem of storing the list of questions and their relationship to the exams from which they came.
    
The nested structure of "questions" within the assessments presented special challenges for breaking out those questions individually in a usable form.  The solution addresses several issues:
    
- Flattening the "deeply" nested question structure
- Retaining top-level key data (base_exam_id) to retain link between individual questions and base exam
- Data type conversions to make use of Spark native features
    
My solution was inspired by stackoverflow research, a very helpful conversation with Mark and Taylor in office hours, and a code snippet from week 11 sync-slides.md they pointed out.
    
In office hours we discussed some alternatives, including using jq or Pandas to achieve the flattening.  I chose to stay with Spark for learning purposes and to mimic the need to implement this as a single, simplified pipeline.
    
However, I did not examine the performance characteristics of this solution.  Given the small size of the dataset used, I would test this for scalability before using it.


## Sample Query

Here is a sample query that may be helpful for future data science team users of this platform.

### Query - What is the last date each exam was taken?

```spark
last_used_exams = spark.sql("SELECT a.base_exam_id, a.exam_name, a.started_at FROM assessments a INNER JOIN (SELECT base_exam_id, max(started_at) AS maxdate FROM assessments GROUP BY base_exam_id) am ON a.base_exam_id = am.base_exam_id AND a.started_at = am.maxdate")
```

```sql
SELECT a.base_exam_id, a.exam_name, a.started_at
FROM assessments a 
INNER JOIN (
    SELECT base_exam_id, max(started_at) AS maxdate 
    FROM assessments 
    GROUP BY base_exam_id
) am ON a.base_exam_id = am.base_exam_id AND a.started_at = am.maxdate
```

Output

```
+--------------------+--------------------+--------------------+                
|        base_exam_id|           exam_name|          started_at|
+--------------------+--------------------+--------------------+
|76a682de-6f0c-11e...|Learning iPython ...|2018-01-23T21:46:...|
|a8dedd1d-0f67-4f4...|Learning C# Desig...|2018-01-27T04:01:...|
|0fed9e6e-6438-464...|          Great Bash|2018-01-26T18:37:...|
|b2264d14-7699-11e...|        Learning DNS|2018-01-27T18:06:...|
|479f39cc-70a9-11e...|Learning Data Mod...|2018-01-16T10:15:...|
+--------------------+--------------------+--------------------+
only showing top 5 rows
```

## Closing Thoughts

I struggled quite a bit with the way that Spark reads data and converts data types, sometimes converting in a way that I was unable to override.  Ultimately this made it extremely difficult to manipulate the data as compared to other programming languages such as Java or Python.  While Spark brings the possibility of massive scalability, I find the challenge of working around the type-casting approach to be extremely time consuming.  There were many many examples of solutions for a myriad of casting issues on stackoverflow, however they seemed extremely specialized and seemed more like "tricks".  I'm unsure that solutions like that should be used in production systems just from the perspective of maintainability.

I wanted to store some fields in Hadoop as Maps for the later purpose of performing Reduce functions for simple calculations, such as sums, counts, etc.  At the end I was unable to achieve this to the degree that I originally planned.
