# Project 2: Tracking User Activity for course assessments data

## Establish a connection between spark and kafka

In [4]:
raw_assess = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:29092") \
  .option("subscribe","assess") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load() 

In [64]:
import json
import pandas as pd
from pyspark.sql.functions import explode, split, mean
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, BooleanType, FloatType
import warnings

from pyspark.sql.functions import from_json, col

In [5]:
raw_assess.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [6]:
raw_assess.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [7]:
assess = raw_assess.select(raw_assess.value.cast('string'))
assess.show(4)

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 4 rows



## Write to hdfs

In [8]:
assess.write.parquet("/tmp/assess")

## show first message

In [9]:
assess.collect()[0]

Row(value='{"keen_timestamp":"1516717442.735266","max_attempts":"1.0","started_at":"2018-01-23T14:23:19.082Z","base_exam_id":"37f0a30a-7464-11e6-aa92-a8667f27e5dc","user_exam_id":"6d4089e4-bde5-4a22-b65f-18bce9ab79c8","sequences":{"questions":[{"user_incomplete":true,"user_correct":false,"options":[{"checked":true,"at":"2018-01-23T14:23:24.670Z","id":"49c574b4-5c82-4ffd-9bd1-c3358faf850d","submitted":1,"correct":true},{"checked":true,"at":"2018-01-23T14:23:25.914Z","id":"f2528210-35c3-4320-acf3-9056567ea19f","submitted":1,"correct":true},{"checked":false,"correct":true,"id":"d1bf026f-554f-4543-bdd2-54dcf105b826"}],"user_submitted":true,"id":"7a2ed6d3-f492-49b3-b8aa-d080a8aad986","user_result":"missed_some"},{"user_incomplete":false,"user_correct":false,"options":[{"checked":true,"at":"2018-01-23T14:23:30.116Z","id":"a35d0e80-8c49-415d-b8cb-c21a02627e2b","submitted":1},{"checked":false,"correct":true,"id":"bccd6e2e-2cef-4c72-8bfa-317db0ac48bb"},{"checked":true,"at":"2018-01-23T14:23:41.

## Extract more data to Dataframe

In [12]:
extracted_assess = assess.rdd.map(lambda x: json.loads(x.value)).toDF()



In [13]:
extracted_assess.show()

+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|Map(questions -> ...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|Map(questions -> ...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false

In [14]:
extracted_assess.registerTempTable('assess')

In [19]:
extracted_assess.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



## Now we can create DF from SQL queries

In [17]:
spark.sql("select keen_id, exam_name from assess").show()

+--------------------+--------------------+
|             keen_id|           exam_name|
+--------------------+--------------------+
|5a6745820eb8ab000...|Normal Forms and ...|
|5a674541ab6b0a000...|Normal Forms and ...|
|5a67999d3ed3e3000...|The Principles of...|
|5a6799694fc7c7000...|The Principles of...|
|5a6791e824fccd000...|Introduction to B...|
|5a67a0b6852c2a000...|        Learning Git|
|5a67b627cc80e6000...|Git Fundamentals ...|
|5a67ac8cb0a5f4000...|Introduction to P...|
|5a67a9ba060087000...|Intermediate Pyth...|
|5a67ac54411aed000...|Introduction to P...|
|5a67ad9b2ff312000...|A Practical Intro...|
|5a67b610baff90000...|Git Fundamentals ...|
|5a67ac9837b82b000...|Introduction to M...|
|5a67aaa4f21cc2000...|   Python Epiphanies|
|5a67ac46f7bce8000...|Introduction to P...|
|5a67aedaf34e85000...|Python Data Struc...|
|5a67aefef5e149000...|Python Data Struc...|
|5a67ae3f0c5f48000...|Working with Algo...|
|5a67ad579d5057000...|Learning iPython ...|
|5a67aae6753fd6000...|   Python 

## How many courses are in this assessment and how popular are they ?

In [22]:
extracted_assess.groupBy("exam_name").count().show()

+--------------------+-----+
|           exam_name|count|
+--------------------+-----+
|Learning Data Mod...|    9|
|Networking for Pe...|   15|
|Introduction to J...|  158|
|Learning Apache H...|   16|
|Learning Spring P...|    2|
|Learning iPython ...|   17|
|Introduction to P...|  162|
|Learning C# Best ...|   35|
|Introduction to A...|   14|
|A Practical Intro...|    9|
|I'm a Software Ar...|   15|
|Introduction to B...|   75|
|       View Updating|    4|
|Mastering Python ...|   25|
|Intermediate C# P...|   43|
|Starting a Grails...|    5|
|Introduction to A...|    9|
|JavaScript Templa...|   21|
|Being a Better In...|   10|
|Mastering Advance...|   34|
+--------------------+-----+
only showing top 20 rows



In [88]:
temp_commit_info = extracted_assess.groupBy("exam_name").count()

## write above query to hdfs 

In [89]:
temp_commit_info.write.parquet("/tmp/temp_commit_info")

## Force schema to query nested messages

In [29]:
final_schema = StructType([StructField('max_attempts', StringType(), True),
                     StructField('user_exam_id', StringType(), True),
                     StructField('certification', StringType(), True),
                     StructField('keen_id', StringType(), True),
                     StructField('exam_name', StringType(), True), 
                     StructField('sequences', StructType([
                         StructField('attempt', IntegerType(), True),
                         StructField('counts', StructType([
                             StructField('total', IntegerType(), True),
                             StructField('correct', IntegerType(), True),
                             StructField('submitted', IntegerType(), True)
                         ]))]))])

## # Transform to DataFrame

In [30]:
focused_extracted_assess = assess.rdd.map(lambda x: json.loads(x.value)).toDF(schema=final_schema)

In [31]:
focused_extracted_assess.show(5)

+------------+--------------------+-------------+--------------------+--------------------+-----------+
|max_attempts|        user_exam_id|certification|             keen_id|           exam_name|  sequences|
+------------+--------------------+-------------+--------------------+--------------------+-----------+
|         1.0|6d4089e4-bde5-4a2...|        false|5a6745820eb8ab000...|Normal Forms and ...|[1,[4,2,4]]|
|         1.0|2fec1534-b41f-441...|        false|5a674541ab6b0a000...|Normal Forms and ...|[1,[4,1,4]]|
|         1.0|8edbc8a8-4d26-429...|        false|5a67999d3ed3e3000...|The Principles of...|[1,[4,3,4]]|
|         1.0|c0ee680e-8892-4e6...|        false|5a6799694fc7c7000...|The Principles of...|[1,[4,2,4]]|
|         1.0|e4525b79-7904-405...|        false|5a6791e824fccd000...|Introduction to B...|[1,[4,3,4]]|
+------------+--------------------+-------------+--------------------+--------------------+-----------+
only showing top 5 rows



In [None]:
focused_extracted_assess['flag_attempt'] = focused_extracted_assess.apply(lambda x: x['max_attempts'])

## Query 1: Most popular course : Learning Git 
#### 394 students assessed this course.

In [43]:
focused_extracted_assess.groupby('exam_name').count().sort('count', ascending=False).show(5)

+--------------------+-----+
|           exam_name|count|
+--------------------+-----+
|        Learning Git|  394|
|Introduction to P...|  162|
|Introduction to J...|  158|
|Intermediate Pyth...|  158|
|Learning to Progr...|  128|
+--------------------+-----+
only showing top 5 rows



## Query 2: How many courses assessed in this json file?

In [50]:
focused_extracted_assess.select('exam_name').distinct().count()

103

## Query 3: How many assesstments are in the dataset?

In [51]:
focused_extracted_assess.select('keen_id').distinct().count()

3242

## Query 4: Calculating the percentage of questions students get correct, on average

In [75]:
100*focused_extracted_assess.select(mean("sequences.counts.correct")).collect()[0][0] / focused_extracted_assess.select(mean("sequences.counts.total")).collect()[0][0]

62.65814174942185

In [69]:
focused_extracted_assess.select('sequences.counts.total').count()

3280

## How many students took the certfication ? 
#### Certification data did not have any "true" value. So there was no data to definitively answer this query.

In [76]:
focused_extracted_assess.select('certification').distinct().show()

+-------------+
|certification|
+-------------+
|         null|
|        false|
+-------------+



#### Distinct values in the 'max_attempts' columns

In [77]:
focused_extracted_assess.select('max_attempts').distinct().show()

+------------+
|max_attempts|
+------------+
|         1.0|
+------------+



In [81]:
focused_extracted_assess.select('sequences.attempt').distinct().show()

+-------+
|attempt|
+-------+
|      1|
+-------+



## Query 5:Number of test takers getting all questions right 

In [86]:
focused_extracted_assess.filter(focused_extracted_assess['sequences.counts.correct'] == focused_extracted_assess['sequences.counts.total']).count()

841

## Write to hdfs

In [90]:
focused_extracted_assess.write.parquet("/tmp/swetabee_Forced_assess")