<center><h1> W205 Data Engineering </h1></center>
<center><h3> Assignment 2          </h3></center>
<center><h3> Dan Ortiz             </h3></center>
<center><h3> Thursday 4:00PM       </h3></center>
<center><h3> 10/25/2020            </h3><Center>

<h2> Load and Transform Data From Kafka to Spark </h2>

In [5]:
# Load dependent packages into notebook

import json
import pandas as pd
from pyspark.sql.functions import explode, split
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, BooleanType
import warnings

In [6]:
# Change settings in spark to allow transfer to pandas dataframe

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [7]:
#Smash the subscribe button for the assessment topic and read all the messages in from it.

raw_assessments = spark \
   .read \
   .format("kafka") \
   .option("kafka.bootstrap.servers", "kafka:29092") \
   .option("subscribe", "assessments") \
   .option("startingOffsets", "earliest") \
   .option("endingOffsets", "latest") \
   .load()

In [8]:
# Caches the data, forcing the previous block to fully execute, verifies syntax is good.

raw_assessments.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [9]:
# Verify the expected number of messages/events came through. Expect 3280 messages.

raw_assessments.count()

3280

In [10]:
#Cast all values into string for parsing

assessments = raw_assessments.select(raw_assessments.value.cast('string'))

In [11]:
#Converts data into an RDD then into dataframe.
#This helps convert the nested JSON files into lists and lists of lists. This 
#will help as we define the schema in the next section.

extracted_assessments = assessments.rdd.map(lambda x: json.loads(x.value)).toDF()



In [12]:
#Writes the raw dataframe to a parquet file

raw_assessments.write.parquet("/tmp/assessments_results")

<h2> Build the Final Schema </h2>

#### Decisions with the Final Schema

I scoped the schema with the mindset of answering business decisions. Information source includes higher level information that is unique to the submission and lower level information on the assessment itself. The thinking is there may be a difference between students taking the class for certification and those who are not. The goal of this schema is to clean filter out what is not needed to increase efficiency and performance. The raw data is saved, and another schema can be designed if and when it is needed.


In [13]:
# Defines the final schema

final_schema = StructType([StructField('user_exam_id', StringType(), True),
                           StructField('base_exam_id', StringType(), True),
                           StructField('exam_name', StringType(), True),
                           StructField('certification', StringType(), True),
                           StructField('max_attempts', StringType(), True),
                           StructField('started_at', StringType(), True),
                           StructField('sequences', StructType([
                               StructField('attempt', IntegerType(), True),
                               StructField('counts', StructType([
                                   StructField('submitted', IntegerType(), True),
                                   StructField('correct', IntegerType(), True),
                                   StructField('total', IntegerType(), True),
                                   StructField('unanswered', IntegerType(), True),
                                   StructField('all_correct', BooleanType(), True)
                                   ]))
                               ]))
                          ])

In [14]:
#Transforms the casted data into a dataframe using the schema
#defined above

focused_extracted_assessments = assessments.rdd.map(lambda x: json.loads(x.value)).toDF(schema=final_schema)

In [15]:
#Regesters the previously generated data frame as a temp table
#so we can query the data with PySpark.

focused_extracted_assessments.registerTempTable('focused_assessments')

<h2> Buisness Analysis </h2>

#### What the parts of the pipeline do

#### Docker and Docker Compose

Docker builds containers which provide a space for an application to run isolated from the rest of the system. This compartmentalization allows the applications we are using in the pipeline to be modular and compartmentalized. Docker-compose makes it easy to define the containers and bridges within a .yml file, allowing the user to spin up or shut down the cluster easily. In addition, this portability greatly enhances repeatability and reproducibility of results by allowing the exact environment an experiment was ran to be shared.

#### cURL

The cURL CLI command allows us to send and receive data from the command line. This allows the pipeline to get the JSON file and is the start of the pipeline.

#### Kafka and JQ

JQ is used to strip the initial document structure allowing Kafka to send messages\events at the individual assessment level instead of the document level.

Kafka is used to set up a topic and send messages in that topic. The topic for this document is "assessments". Kafka was fed from the JQ function, it will send its messages at the assessment level. In addition it sets the number of partitions and replications of the data.

#### Zookeeper

Zookeeper synchronizes services for distributed application. It keeps track the status of the Kafka cluster, its topics, partitions etc. It acts as the broker between Kafka and Spark.

#### Spark (PySpark)

Spark, and the PySpark implementation used, subscribes to the topic generated by Kafka and reads it. Once the data is read it will set a default schema to the data. In addition, it is being used to design a more tailored schema for the analysis and build an interactable dataframe.

#### Spark (SparkSql)

SparkSql is the querying language used to query data from the dataframe generated by Spark.

#### Pandas

Pandas is used to take convert the text output of SparkSql to a prettier table format for consumption.

#### What I set up myself 

Many components of the pipeline were directly provided or were reviewed in class. The following items are the items I set up or modified myself

   - Modified the week 8 docker-compose file, opening port 8888 allowing Jupyter notebooks to connect.
   - Implemented a shell command to automate the CLI portion of the pipeline
   - Modified and implemented the framework of the pipeline reviewed in lecture to work with this data
   - Named the topic "assessments"
   - Developed the final schema, allowing me to query the dataframe to answer the provided questions
   - Install docker-compose from the CLI
   - Enabled Arrow, allowing the creation of pandas DF from the spark.sql query


#### Issues and Findings In the Data 

   - We have unique exam id's but not unique student id's. This limits our understanding of the relationship between student and exam_id. This prevents us from understanding on how students interact with courses, or if students repeat courses
   - There are a lot of duplicate assessments in the data. We need to be careful on how we query the data to ensure we are not filtering this out.
   - It is difficult to understand what the data means in the "questions" portions of the data without documentation. For example, in the first assessment, question 0 shows status of incomplete, however the submissions show the user submitted answers to the questions.


### How many assesstments are in the dataset? 

#### Assumptions and Thinking 

##### Assumptions

   - The intent is to understand the number of tests, not the number of messages. There could be duplicate tests in the pipeline.
   - All students enrolled in the class took an assessment
   - Students only take the class once
   - There is only one assessment in the class
   
##### Thinking

   - The second query validates each exam_user_id is unique to each exam, and not to the user themselves because it is equal to the number of unique exam ids in the first query.
   - This indicates we know of individual exams, but we do not know if an individual took more than one class.

##### Findings

There are 3,280 total records in the json File. The file index starts at 0 and ends at 3,279. However, there are only 3,242 unique user_exam_id, indicating there are multiple duplicate records. Further analysis should be sure to use "DISTINCT" function when counting records.

In [16]:
assessment_qty = spark.sql("SELECT COUNT(user_exam_id) AS total_exams, \
                            COUNT(DISTINCT user_exam_id) AS unique_exams \
                            FROM focused_assessments")

assessment_qty_pdf = assessment_qty.select("*").toPandas()
assessment_qty_pdf

Unnamed: 0,total_exams,unique_exams
0,3280,3242


In [20]:
combo = spark.sql("SELECT COUNT(*) AS count\
                       FROM (SELECT DISTINCT user_exam_id, exam_name \
                       FROM focused_assessments)")

combo_pdf = combo.select("*").toPandas()
combo_pdf

Unnamed: 0,count
0,3242


### What's the name of your Kafka topic? How did you come up with that name? 

I named my Kafka topic "assessments" for a few reasons. First, each message is a complete assessment submission, so the topic is an accurate descriptor of the message. In addition, a name like "test" or "test-results" may cause confusion. Often programmers use "test" as a code check or a system check. I did not want the users confusing the topic as a verification. Finally, the topic needed to be concise and easy to type. Unfortunately, there is no good way to abbreviate assessment without it looking silly.

### How many people took *Learning Git*? 

##### Assumptions

   - There is only one assessment per class. Since we know the assessment ID's are unique, if there are multiple exams, it will inflate the number
   - The class name is the name of the assessment. We do not have the name of the class, but we do have the name of the assessment.
   - A student only take a class once. This is a limitation with our data, we do not know the relationship between and individual student and the exam id's. Therefore, we have to assume that a student only takes a class, and an assessment once.
   

##### Findings
 
390 students took the class "learning Git"

In [14]:
learn_git = spark.sql("SELECT COUNT(DISTINCT user_exam_id) as unique_exam_id \
                       FROM focused_assessments \
                       WHERE exam_name = 'Learning Git'")

learn_git_pdf = learn_git.select("*").toPandas()
learn_git_pdf

Unnamed: 0,unique_exam_id
0,390


### What is the least common course taken? And the most common?

##### Assumptions

   - All students enrolled in the class took an assessment
   - Students only take the class once
   - There is only one assessment in the class
   
##### Thinking

   - SparkSql does not support the ability to use a MAX/MIN function on an aggregate
   - I found it difficult to give data fames alias as common errors were the data frame does not exist
   - It is possible for multiple classes to have the same number of assessments taken

##### Findings

   - The most popular class is "Learning Git" with 390 assessments completed
   - The least popular classes are "Nulls, Three-valued Logic and Missing Information", "Native Web Apps for Android", "Learning to visualize Data with D3.js", and "Operating Red Hat Enterprise Linux Servers" all with 1 completed assessment.

In [15]:
most_pop = spark.sql("SELECT exam_name, \
                      COUNT(DISTINCT user_exam_id) AS count \
                      FROM focused_assessments \
                      GROUP BY exam_name \
                      ORDER BY count DESC \
                      limit 5")

most_pop_pdf = most_pop.select("*").toPandas()
most_pop_pdf

Unnamed: 0,exam_name,count
0,Learning Git,390
1,Introduction to Python,162
2,Introduction to Java 8,158
3,Intermediate Python Programming,156
4,Learning to Program with R,128


In [16]:
least_pop = spark.sql("SELECT exam_name, \
                      COUNT(DISTINCT user_exam_id) AS count \
                      FROM focused_assessments \
                      GROUP BY exam_name \
                      ORDER BY count \
                      LIMIT 5")

least_pop_pdf = least_pop.select("*").toPandas()
least_pop_pdf

Unnamed: 0,exam_name,count
0,"Nulls, Three-valued Logic and Missing Information",1
1,Native Web Apps for Android,1
2,Learning to Visualize Data with D3.js,1
3,Operating Red Hat Enterprise Linux Servers,1
4,Understanding the Grails 3 Domain Model,2


<h3> What are the most popular courses taken for certification? </h3>

##### Assumptions

   - All students enrolled in the class took an assessment
   - Students only take the class once
   - There is only one assessment in the class
   - Some educational platforms have a free content and monetize the certification strategy. Knowing how many students that convert to a certification path could increase our value to our customers
   
##### Thinking

   - The most popular courses students are paying for a certification, and potentially higher revenue, may not be the overall most popular courses
   - It would be great for us to know and focus on the students who are willing to pay for a certification as a way to increase revenue.
   
##### Findings

   - No students in this data are pursuing a certification. This is a huge gap that should be addressed, especially if the firms monetization strategy is dependent on certifications.

In [17]:
cert = spark.sql("SELECT exam_name as course_name, \
                    COUNT(DISTINCT user_exam_id) as number_of_students \
                    FROM focused_assessments \
                    WHERE certification='true' \
                    GROUP BY course_name \
                    ORDER BY number_of_students DESC")

cert_pdf = cert.select("*").toPandas()
cert_pdf

Unnamed: 0,course_name,number_of_students
