# Spark and SQL Pipeline Code

In [1]:
# Run some import statements to work with the data
from pyspark.sql.types import MapType,StringType
from pyspark.sql.functions import json_tuple,from_json,explode,col
import json

#### Now that we have the json file published into kafka via the run_kafka.sh script, we need to read in that data into pyspark 

In [2]:
#subscribe to kafka topic - assessments
raw_assessments = spark .read .format("kafka") .option("kafka.bootstrap.servers", "kafka:29092") .option("subscribe", "assessments") .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .load()

#cache this new df to avoid some warnings later on
raw_assessments.cache()

#convert df binary values to strings for human readability
assessments = raw_assessments.select(raw_assessments.value.cast('string'))

#Extract one layer and print Schema to see what we are looking at
first_extract = assessments.rdd.map(lambda x: json.loads(x.value)).toDF()

first_extract.printSchema()



root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



In [3]:
#nested JSON so we need to unravel more than one layer. Use command below
extracted_assessments = spark.read.json(assessments.rdd.map(lambda x: x.value))

# The above extracted_assessments command is walking through each row and unraveling it
# with the lambda/map function, pulling out the values as it unravels  

#Take a look at the schema now
extracted_assessments.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: struct (nullable = true)
 |    |-- attempt: long (nullable = true)
 |    |-- counts: struct (nullable = true)
 |    |    |-- all_correct: boolean (nullable = true)
 |    |    |-- correct: long (nullable = true)
 |    |    |-- incomplete: long (nullable = true)
 |    |    |-- incorrect: long (nullable = true)
 |    |    |-- submitted: long (nullable = true)
 |    |    |-- total: long (nullable = true)
 |    |    |-- unanswered: long (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- questions: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- options: arra

#### Now that we have unravled our Json file, lets make some queries to create a table with necessary columns 

In [4]:
#Create a temporary table under the name "exams"
extracted_assessments.registerTempTable('exams')

##Lets make a sql query that pulls out data that will be useful to the DS team
exams_table = \
    spark.sql("SELECT \
                  exam_name, \
                  user_exam_id, \
                  sequences.counts.all_correct, \
                  sequences.counts.correct, \
                  sequences.counts.incomplete, \
                  sequences.counts.incorrect, \
                  sequences.counts.submitted, \
                  sequences.counts.total, \
                  sequences.counts.unanswered, \
                  started_at, \
                  base_exam_id, \
                  certification \
              FROM \
                  exams") 


In [5]:
#Write this table to HDFS - Will be useful to DS team as it is a well structured dataset 
exams_table.write.parquet("/tmp/exams_table")

#### Make some SQL example queries

In [6]:
#Maybe our data science team wants to find out how many times a user took the 'Learning Apache Hadoop' exam/course
#using the following query, they can determine this answer

spark.sql("SELECT count(user_exam_id) FROM exams WHERE exam_name == 'Learning Apache Hadoop'").show()


+-------------------+
|count(user_exam_id)|
+-------------------+
|                 16|
+-------------------+



In [7]:
#Query to find out the highest average exam score for each course. 
scores = spark.sql("SELECT  exam_name, user_exam_id, ((sequences.counts.correct / sequences.counts.total)*100) as score FROM exams")
scores.registerTempTable('avg_score')

score_table = \
    spark.sql("SELECT exam_name, AVG(score) AS average_score FROM avg_score GROUP BY exam_name ORDER BY average_score DESC")

score_table.show()

+--------------------+-----------------+
|           exam_name|    average_score|
+--------------------+-----------------+
|Learning to Visua...|            100.0|
|The Closed World ...|            100.0|
|Nulls, Three-valu...|            100.0|
|Learning SQL for ...|97.72727272727273|
|Introduction to J...|87.59493670886076|
|Introduction to A...|83.33333333333333|
|Introduction to A...|83.33333333333333|
|Getting Ready for...|             80.0|
|Cloud Native Arch...|             80.0|
|Understanding the...|78.57142857142857|
|Introduction to A...| 76.9230769230769|
|Beginning Program...|76.58227848101266|
|Learning Apache H...|          76.5625|
|Refactor a Monoli...|76.47058823529412|
|Introduction to H...|             75.0|
|Using Storytellin...|             75.0|
|Starting a Grails...|             75.0|
|Git Fundamentals ...|             75.0|
|   Python Epiphanies|74.18300653594771|
|Mastering Python ...|             74.0|
+--------------------+-----------------+
only showing top

In [8]:
#Write this to HDFS as well so that the DS can easily access it later
score_table.write.parquet("/tmp/score_table")

In [9]:
#What are the top 10 most taken courses?
stud_count = spark.sql("SELECT exam_name, COUNT(*) as students FROM exams GROUP BY exam_name ORDER BY students DESC")

#take a look
stud_count.show(10)

+--------------------+--------+
|           exam_name|students|
+--------------------+--------+
|        Learning Git|     394|
|Introduction to P...|     162|
|Intermediate Pyth...|     158|
|Introduction to J...|     158|
|Learning to Progr...|     128|
|Introduction to M...|     119|
|Software Architec...|     109|
|Beginning C# Prog...|      95|
|    Learning Eclipse|      85|
|Learning Apache M...|      80|
+--------------------+--------+
only showing top 10 rows



In [10]:
#add this data to HDFS as well
stud_count.write.parquet("/tmp/stud_count")

#### Assumptions and other notes

###### Assumptions
* When counting the number of students per exam, the assumption is that each exam was taken by one student
* Assume that each exam attempt also indicates one student taking the class (and vice versa). (i.e. No student took the course \ without taking the exam, and no student took the exam without also enrolling in the course
* Data Science team knows how to work with Apache Hadoop and can pull the dataset from HDFS
* Assume the JSON file into our pipeline is at rest



##### Pipeline Flow 
* docker-compose houses all needed software services
* a Kafka topic is created
    * kafka topic is named assessments as this dataset is made up of assessments in online courses
    * note Zookeeper is included to manage the kafka brokers
* We then use Kafkacat to publish the json file in our working directory to the Assessments topic. 
    * jq is also used in this command to clean up the json structure just a little bit 
* We then use pySpark to subscribe to the assessments topic and consume the data from the Kafka pipeline
* We then use pyspark to unravel the nested json and view the schema 
* A "view" or temp table is created for us to make queries off of
* we use sql through pyspark to make queries to that temp table
* After we have what we need in a nice structure, we can write these tables to HDFS for longer term storage

##### After completion
* save and close jupyter instance 
* ^c twice to end instance from terminal window. 
* run docker-compose down