# Project 2

In [37]:
import json
import pandas as pd
from pyspark.sql.functions import explode, split
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, TimestampType, ArrayType
import warnings

In [3]:
raw_assessments = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:29092") \
  .option("subscribe","assessments") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load() 

In [4]:
raw_assessments.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [5]:
raw_assessments.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
raw_assessments.show(1)

+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|[7B 22 6B 65 65 6...|assessments|        0|     0|1969-12-31 23:59:...|            0|
+----+--------------------+-----------+---------+------+--------------------+-------------+
only showing top 1 row



In [7]:
assessments = raw_assessments.select(raw_assessments.value.cast('string'))
assessments.show(1)

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
+--------------------+
only showing top 1 row



In [8]:
assessments.collect()[0]

Row(value='{"keen_timestamp":"1516717442.735266","max_attempts":"1.0","started_at":"2018-01-23T14:23:19.082Z","base_exam_id":"37f0a30a-7464-11e6-aa92-a8667f27e5dc","user_exam_id":"6d4089e4-bde5-4a22-b65f-18bce9ab79c8","sequences":{"questions":[{"user_incomplete":true,"user_correct":false,"options":[{"checked":true,"at":"2018-01-23T14:23:24.670Z","id":"49c574b4-5c82-4ffd-9bd1-c3358faf850d","submitted":1,"correct":true},{"checked":true,"at":"2018-01-23T14:23:25.914Z","id":"f2528210-35c3-4320-acf3-9056567ea19f","submitted":1,"correct":true},{"checked":false,"correct":true,"id":"d1bf026f-554f-4543-bdd2-54dcf105b826"}],"user_submitted":true,"id":"7a2ed6d3-f492-49b3-b8aa-d080a8aad986","user_result":"missed_some"},{"user_incomplete":false,"user_correct":false,"options":[{"checked":true,"at":"2018-01-23T14:23:30.116Z","id":"a35d0e80-8c49-415d-b8cb-c21a02627e2b","submitted":1},{"checked":false,"correct":true,"id":"bccd6e2e-2cef-4c72-8bfa-317db0ac48bb"},{"checked":true,"at":"2018-01-23T14:23:41.

In [9]:
assessments.write.parquet("/tmp/assessments")

In [10]:
read_assessments = spark.read.parquet('/tmp/assessments')
read_assessments.show()

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 20 rows



In [11]:
read_assessments.count()

3280

In [12]:
# Transform to DataFrame
from pyspark.sql.functions import from_json, col
json_schema = spark.read.json(read_assessments.rdd.map(lambda row: row.value)).schema
read_assessments = read_assessments.withColumn('json', from_json(col('value'), json_schema))
read_assessments.select(
    read_assessments.json.max_attempts,
    read_assessments.json.sequences,
    read_assessments.json.certification,
    read_assessments.json.exam_name
).show()

+-----------------+--------------------+------------------+--------------------+
|json.max_attempts|      json.sequences|json.certification|      json.exam_name|
+-----------------+--------------------+------------------+--------------------+
|              1.0|[1,[false,2,1,1,4...|             false|Normal Forms and ...|
|              1.0|[1,[false,1,2,1,4...|             false|Normal Forms and ...|
|              1.0|[1,[false,3,0,1,4...|             false|The Principles of...|
|              1.0|[1,[false,2,2,0,4...|             false|The Principles of...|
|              1.0|[1,[false,3,0,1,4...|             false|Introduction to B...|
|              1.0|[1,[true,5,0,0,5,...|             false|        Learning Git|
|              1.0|[1,[true,1,0,0,1,...|             false|Git Fundamentals ...|
|              1.0|[1,[true,5,0,0,5,...|             false|Introduction to P...|
|              1.0|[1,[true,4,0,0,4,...|             false|Intermediate Pyth...|
|              1.0|[1,[false

In [69]:
assessments_schema = StructType([StructField('keen_timestamp', StringType(), True),
                                    StructField('max_attempts', StringType(), True),
                                    StructField('started_at', StringType(), True),
                                    StructField('base_exam_id', StringType(), True),
                                    StructField('user_exam_id', StringType(), True),
                                    StructField('keen_created_at', StringType(), True),
                                    StructField('certification', StringType(), True),
                                    StructField('keen_id', StringType(), True),
                                    StructField('exam_name', StringType(), True),
                                    StructField('sequences', StructType([
                                        StructField('attempt', StringType(), True),
                                        StructField('id', StringType(), True),
                                        StructField('questions', ArrayType(StructType([
                                            StructField('user_incomplete', BooleanType(), True),
                                            StructField('user_correct', BooleanType(), True),
                                            StructField('user_submitted', StringType(), True),
                                            StructField('id', StringType(), True),
                                            StructField('user_result', StringType(), True),
                                            StructField('options', ArrayType(StructType([
                                                StructField('checked', BooleanType(), True),
                                                StructField('at', StringType(), True),
                                                StructField('id', StringType(), True),
                                                StructField('submitted', StringType(), True),
                                                StructField('correct', BooleanType(), True)
                                            ]), False), True)
                                        ]), False)),
                                        StructField('counts', StructType([
                                            StructField('incomplete', IntegerType(), True),
                                            StructField('submitted', IntegerType(), True),
                                            StructField('incorrect', IntegerType(), True),
                                            StructField('all_correct', BooleanType(), True),
                                            StructField('correct', IntegerType(), True),
                                            StructField('total', IntegerType(), True),
                                            StructField('unanswered', IntegerType(), True)
                                        ]))
                                    ]))])

In [70]:
structured_assessments = assessments.rdd.map(lambda x: json.loads(x.value)).toDF(schema=assessments_schema)

In [75]:
structured_assessments.take(1)[0]['sequences']['questions'][0]['options'][0]['checked']

True

In [77]:
structured_assessments.printSchema()

root
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- started_at: string (nullable = true)
 |-- base_exam_id: string (nullable = true)
 |-- user_exam_id: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- sequences: struct (nullable = true)
 |    |-- attempt: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- questions: array (nullable = true)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- user_incomplete: boolean (nullable = true)
 |    |    |    |-- user_correct: boolean (nullable = true)
 |    |    |    |-- user_submitted: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- user_result: string (nullable = true)
 |    |    |    |-- options: array (nullable = true)
 |    |    |    |    |-- element: st

In [84]:
structured_assessments.registerTempTable('assessments')

In [85]:
spark.sql("SELECT exam_name FROM assessments").show(10)

+--------------------+
|           exam_name|
+--------------------+
|Normal Forms and ...|
|Normal Forms and ...|
|The Principles of...|
|The Principles of...|
|Introduction to B...|
|        Learning Git|
|Git Fundamentals ...|
|Introduction to P...|
|Intermediate Pyth...|
|Introduction to P...|
+--------------------+
only showing top 10 rows



In [86]:
spark.sql("SELECT exam_name FROM assessments WHERE sequences.counts.all_correct = True").show(1)

+------------+
|   exam_name|
+------------+
|Learning Git|
+------------+
only showing top 1 row



In [88]:
spark.sql("SELECT COUNT(DISTINCT base_exam_id) FROM assessments").show()

+----------------------------+
|count(DISTINCT base_exam_id)|
+----------------------------+
|                         107|
+----------------------------+

