## Notebook for Pyspark for Project 2

In [1]:
spark

### Imports of libraries for data transformation

In [2]:
# Imports
import sys
import json
from pyspark.sql import Row
import pprint

p = pprint.PrettyPrinter(indent=1)


### Consume data from our kafka topic userAct

Read in data to `messages` spark dataframe. spark.read.format("kafka") tells us we are reading in data from kafka. We specify in option our bootstrap servers and the kafka port number specified in our docker-compose file. We also specify that we want to subscribe to the topic userAct. Our startingOffsets and endingOffsets are earliest and latest to specify that we want to read from the beginning to the end of the entire data. Lastly, load() will load this data into the messages spark data frame.

In [3]:
messages = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","userAct").option("startingOffsets", "earliest").option("endingOffsets", "latest").load() 


Object `messages` is a dataframe. We cache the messages to speed up access since we will be using them frequently in this process.

In [4]:
type(messages)

pyspark.sql.dataframe.DataFrame

In [5]:
messages.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

Prints the schema of the messages data we just read in. See that there are key value pairs. Values are the item of interest for us.

In [6]:
messages.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



Show the top 20 rows of the messages.

In [7]:
messages.show()

+----+--------------------+-------+---------+------+--------------------+-------------+
| key|               value|  topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-------+---------+------+--------------------+-------------+
|null|[7B 22 6B 65 65 6...|userAct|        0|     0|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|userAct|        0|     1|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|userAct|        0|     2|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|userAct|        0|     3|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|userAct|        0|     4|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|userAct|        0|     5|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|userAct|        0|     6|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|userAct|        0|     7|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|userA

Create new dataframe `messages_as_strings` as messages but only selecting the `value` column casted as a string data type.

In [8]:
messages_as_strings=messages.selectExpr("CAST(value AS STRING)")

In [9]:
messages_as_strings.show()

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 20 rows



In [10]:
messages_as_strings.cache()

DataFrame[value: string]

In [11]:
messages_as_strings.printSchema()

root
 |-- value: string (nullable = true)



Using the `.count()` method lets us count the number of rows in this df.

In [12]:
messages_as_strings.count()

3280

#### Examine Data: Selecting one value row

from `messages_as_strings`, select the 'value' column. Take 1 entry, if this was 200 it would take 200 entries from the beginning. `[0]` specifies the index of the entry to extract. The `value` takes out the value of this row, or the dict object.

In [13]:
messages_as_strings.select('value').take(1)[0].value

'{"keen_timestamp":"1516717442.735266","max_attempts":"1.0","started_at":"2018-01-23T14:23:19.082Z","base_exam_id":"37f0a30a-7464-11e6-aa92-a8667f27e5dc","user_exam_id":"6d4089e4-bde5-4a22-b65f-18bce9ab79c8","sequences":{"questions":[{"user_incomplete":true,"user_correct":false,"options":[{"checked":true,"at":"2018-01-23T14:23:24.670Z","id":"49c574b4-5c82-4ffd-9bd1-c3358faf850d","submitted":1,"correct":true},{"checked":true,"at":"2018-01-23T14:23:25.914Z","id":"f2528210-35c3-4320-acf3-9056567ea19f","submitted":1,"correct":true},{"checked":false,"correct":true,"id":"d1bf026f-554f-4543-bdd2-54dcf105b826"}],"user_submitted":true,"id":"7a2ed6d3-f492-49b3-b8aa-d080a8aad986","user_result":"missed_some"},{"user_incomplete":false,"user_correct":false,"options":[{"checked":true,"at":"2018-01-23T14:23:30.116Z","id":"a35d0e80-8c49-415d-b8cb-c21a02627e2b","submitted":1},{"checked":false,"correct":true,"id":"bccd6e2e-2cef-4c72-8bfa-317db0ac48bb"},{"checked":true,"at":"2018-01-23T14:23:41.791Z","id"

Uses `json.loads` to pass the above output into a json object. We use `json.dumps` to nicely print the json so we can see the fields, even those nested inside.

In [14]:
# print json for first assessment to see nested schema

first_message = json.loads(messages_as_strings.select('value').take(1)[0].value)
print(json.dumps(first_message, indent=4, sort_keys=True))

{
    "base_exam_id": "37f0a30a-7464-11e6-aa92-a8667f27e5dc",
    "certification": "false",
    "exam_name": "Normal Forms and All That Jazz Master Class",
    "keen_created_at": "1516717442.735266",
    "keen_id": "5a6745820eb8ab00016be1f1",
    "keen_timestamp": "1516717442.735266",
    "max_attempts": "1.0",
    "sequences": {
        "attempt": 1,
        "counts": {
            "all_correct": false,
            "correct": 2,
            "incomplete": 1,
            "incorrect": 1,
            "submitted": 4,
            "total": 4,
            "unanswered": 0
        },
        "id": "5b28a462-7a3b-42e0-b508-09f3906d1703",
        "questions": [
            {
                "id": "7a2ed6d3-f492-49b3-b8aa-d080a8aad986",
                "options": [
                    {
                        "at": "2018-01-23T14:23:24.670Z",
                        "checked": true,
                        "correct": true,
                        "id": "49c574b4-5c82-4ffd-9bd1-c3358faf850d",
    

#### Creating a dataframe/table from our raw data

Create a new dataframe `assessments2` by taking `messages_as_strings` df from above, it's rdd map, and mapping each row through a function with the `.map` method. Our function is a lambda function that will take the value, make a json of the value, and pass the arguements of the json dictionary into a Spark DF `Row(...)`. Lastly we convert this whole object after the `map` applied into a Spark DF using `.toDF()`.

In [15]:
assessments2 = messages_as_strings.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()

In [16]:
# p.pprint(assessments2.show())
assessments2.show()

+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|Map(questions -> ...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|Map(questions -> ...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false

In [17]:
type(assessments2)

pyspark.sql.dataframe.DataFrame

In [18]:
assessments2.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



## Spark SQL: Answering Questions

Sequences is a nested json. Spark SQL can help up unpack data from this data frame. We need Spark SQL to answer business question on the dataframe and get better tables for answering the questions.

First, create a Spark "TempTable" (aka "View"). This is so that we can run Spark SQL queries on our assessments2 table.

In [19]:
assessments2.registerTempTable('assessments_tbl')

In [20]:
spark.sql("select exam_name, max_attempts as from assessments_tbl order by max_attempts desc limit 5").show()

+--------------------+---+
|           exam_name| as|
+--------------------+---+
|Normal Forms and ...|1.0|
|Normal Forms and ...|1.0|
|The Principles of...|1.0|
|The Principles of...|1.0|
|Introduction to B...|1.0|
+--------------------+---+



### 1) How many assessments are in the dataset?

I count **3280 assessments**. Determined by counting up the rows in the dataset corresponding to assessments taken.

Note that there is a limitation to this since keen_id would have been a unique key for each assessment, but counting unique keen_id's yields less than 3280 assessments. Which is to be believed? We have 38 assessments with not unique keen_id's. Were these assessments given non-unique keen_id's by accident, or is keen_id not a reliable unique primary key for assessments? 

In [22]:
# by row count
spark.sql("select count(*) as total_assessments from assessments_tbl").show()

+-----------------+
|total_assessments|
+-----------------+
|             3280|
+-----------------+



In [23]:
# by keen id
spark.sql("select count(distinct keen_id) as total_assessments from assessments_tbl").show()

+-----------------+
|total_assessments|
+-----------------+
|             3242|
+-----------------+



### 2) How many distinct exams were taken by users?

There were **107 unique exams** taken by users in this dataset. We look at base_exam_id rather than exam_name since some exams could have the same name.

In [24]:
spark.sql("select count(distinct base_exam_id) as unique_exams from assessments_tbl").show()

+------------+
|unique_exams|
+------------+
|         107|
+------------+



### 3) How many people took Learning Git?

We need to create a table for exams and times taken by exam name. On this table, we would run the queries to answer this question. However, there is a problem with how to count the times taken for an exam. 

The user_exam_id should give a distinct user assessment attempt for an exam, so we should count that for grouped exams. However, this leads to fewer times taken than just counting the number of times an exam showed up for an assessment. We will assume that the number of assessments for an exam gives the number of people who took Learning Git. Therefore we see that **394 people took Learning Git**.

In [22]:
# based on the number of assessments for this exam
exams_taken_df = spark.sql("select exam_name, count(*) as times_taken from assessments_tbl group by exam_name order by times_taken desc limit 10")
exams_taken_df.show()

+--------------------+-----------+
|           exam_name|times_taken|
+--------------------+-----------+
|        Learning Git|        394|
|Introduction to P...|        162|
|Intermediate Pyth...|        158|
|Introduction to J...|        158|
|Learning to Progr...|        128|
|Introduction to M...|        119|
|Software Architec...|        109|
|Beginning C# Prog...|         95|
|    Learning Eclipse|         85|
|Learning Apache M...|         80|
+--------------------+-----------+



In [26]:
# based on the number of distinct user_exam_id for this exam
spark.sql("select exam_name, count(distinct user_exam_id) as times_taken from assessments_tbl group by exam_name order by times_taken desc limit 10").show()

+--------------------+-----------+
|           exam_name|times_taken|
+--------------------+-----------+
|        Learning Git|        390|
|Introduction to P...|        162|
|Introduction to J...|        158|
|Intermediate Pyth...|        156|
|Learning to Progr...|        128|
|Introduction to M...|        119|
|Software Architec...|        109|
|    Learning Eclipse|         85|
|Beginning C# Prog...|         83|
|Learning Apache M...|         80|
+--------------------+-----------+



Convert this exams_taken_df to a temp table for next queries.

In [21]:
exams_taken_df = spark.sql("select exam_name, count(*) as times_taken from assessments_tbl group by exam_name order by times_taken desc")
exams_taken_df.registerTempTable('exams_taken_tbl')

### 4) What is the least common course taken? And the most common?
Least common course taken: **Learning to Visualize Data with D3.js** with 1 assessment. Most common course taken: **Learning Git** with 394 assessments.


In [274]:
spark.sql("select exam_name, times_taken from exams_taken_tbl order by times_taken desc limit 1").show(1, False)

+------------+-----------+
|exam_name   |times_taken|
+------------+-----------+
|Learning Git|394        |
+------------+-----------+



In [275]:
spark.sql("select exam_name, times_taken from exams_taken_tbl order by times_taken limit 1").show(1, False)

+-------------------------------------+-----------+
|exam_name                            |times_taken|
+-------------------------------------+-----------+
|Learning to Visualize Data with D3.js|1          |
+-------------------------------------+-----------+



# Save Tables to HDFS

Taking our Spark df, applying method to write it, specifying a parquet file for storage, and entering the path to where we want to store our data in HDFS and the name at the end.

In [23]:
# Table 1 for Q1), Q2), Q3)
assessments2.write.parquet("/tmp/assessments_tbl")

In [24]:
# Table 2 for Q4)
exams_taken_df.write.parquet("/tmp/exams_taken_tbl")

## Extra Work: Making Tables from Nested Columns
### Looking at Sequences

Make table that combines exam name and info about score on the exam and number of questions.

1-to-1 between row and extracted data from sequences, so use map instead of flatMap.

In [25]:
def percent_score_from_json_flatMap(row):
    # grab the row for this exam
    exam = json.loads(row.value)
    
    # check if keys of sequences contains counts
    score_calc = -1 # default value
    num_q_calc = -1 # default value
    
    # sequences must exist
    if "sequences" in exam.keys():
        # counts must exist
        if "counts" in exam["sequences"]:
            # question data is not weird
            if ("correct" in exam["sequences"]["counts"]) and ("total" in exam["sequences"]["counts"]) and (exam["sequences"]["counts"]["total"] != 0):
                score_calc = 100*exam["sequences"]["counts"]["correct"]/exam["sequences"]["counts"]["total"]
                num_q_calc = len(exam["sequences"]["questions"])                
    
    exam_details = {"base_exam_id": exam["base_exam_id"],
         "exam_name": exam["exam_name"],
         "keen_id": exam["keen_id"],
         "score": score_calc,
         "num_questions": num_q_calc,
         "user_exam_id": exam["user_exam_id"]}
    
    return [Row(**exam_details)]

def percent_score_from_json(row):
    # grab the row for this exam
    exam = json.loads(row.value)
    
    # check if keys of sequences contains counts
    score_calc = -1 # default value
    num_q_calc = -1 # default value
    
    # sequences must exist
    if "sequences" in exam.keys():
        # counts must exist
        if "counts" in exam["sequences"]:
            # question data is not weird
            if ("correct" in exam["sequences"]["counts"]) and ("total" in exam["sequences"]["counts"]) and (exam["sequences"]["counts"]["total"] != 0):
                score_calc = 100*exam["sequences"]["counts"]["correct"]/exam["sequences"]["counts"]["total"]
                num_q_calc = len(exam["sequences"]["questions"])                
    
    exam_details = {"base_exam_id": exam["base_exam_id"],
         "exam_name": exam["exam_name"],
         "keen_id": exam["keen_id"],
         "score": score_calc,
         "num_questions": num_q_calc,
         "user_exam_id": exam["user_exam_id"]}
    
    return Row(**exam_details)

In [26]:
exams_and_scores = messages_as_strings.rdd.map(percent_score_from_json).toDF()
# exams_and_scores = messages_as_strings.rdd.flatMap(percent_score_from_json).toDF()

In [27]:
exams_and_scores.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- num_questions: long (nullable = true)
 |-- score: double (nullable = true)
 |-- user_exam_id: string (nullable = true)



In [28]:
# exams_and_scores.show(20, False)
exams_and_scores.show()

+--------------------+--------------------+--------------------+-------------+-----------------+--------------------+
|        base_exam_id|           exam_name|             keen_id|num_questions|            score|        user_exam_id|
+--------------------+--------------------+--------------------+-------------+-----------------+--------------------+
|37f0a30a-7464-11e...|Normal Forms and ...|5a6745820eb8ab000...|            4|             50.0|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|Normal Forms and ...|5a674541ab6b0a000...|            4|             25.0|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|The Principles of...|5a67999d3ed3e3000...|            4|             75.0|8edbc8a8-4d26-429...|
|4beeac16-bb83-4d5...|The Principles of...|5a6799694fc7c7000...|            4|             50.0|c0ee680e-8892-4e6...|
|6442707e-7488-11e...|Introduction to B...|5a6791e824fccd000...|            4|             75.0|e4525b79-7904-405...|
|8b4488de-43a5-4ff...|        Learning Git|5a67a0b6852c2

In [29]:
# filter out the nulls for score
exams_and_scores = exams_and_scores.filter("score is not null") # not null
exams_and_scores = exams_and_scores.filter("score != -1") # score not default val of -1
exams_and_scores = exams_and_scores.filter("num_questions != -1") # num_questions not default val of -1


# can do it in spark sql, just save out results to spark df

# google to see how to filter out for any columns

In [30]:
# register temp table to run Spark SQL queries on
exams_and_scores.registerTempTable("examsScores_tbl")

### Total number of exams in our table

In [31]:
# count number of rows in table
spark.sql("SELECT COUNT(*) as total_num_exams FROM examsScores_tbl").show()

+---------------+
|total_num_exams|
+---------------+
|           3275|
+---------------+



In [32]:
# test query
spark.sql("select exam_name, count(*) as times_taken from examsScores_tbl group by exam_name limit 10").show()

+--------------------+-----------+
|           exam_name|times_taken|
+--------------------+-----------+
|Learning Data Mod...|          9|
|Networking for Pe...|         15|
|Introduction to J...|        158|
|Learning Apache H...|         16|
|Learning Spring P...|          2|
|Learning iPython ...|         17|
|Introduction to P...|        162|
|Learning C# Best ...|         35|
|Introduction to A...|         14|
|A Practical Intro...|          9|
+--------------------+-----------+



### Q5) Which exam had the most questions?

The exam **Operating Red Hat Enterprise Linux Servers** has the most questions: 20.


In [33]:
spark.sql("select distinct(exam_name), num_questions from examsScores_tbl order by num_questions desc limit 10").show(10, False)

+------------------------------------------+-------------+
|exam_name                                 |num_questions|
+------------------------------------------+-------------+
|Operating Red Hat Enterprise Linux Servers|20           |
|Great Bash                                |10           |
|Learning Linux System Administration      |8            |
|Learning to Program with R                |7            |
|Introduction to Data Science with R       |7            |
|Being a Better Introvert                  |7            |
|Understanding the Grails 3 Domain Model   |7            |
|What's New in JavaScript                  |7            |
|Using Web Components                      |6            |
|Arduino Prototyping Techniques            |6            |
+------------------------------------------+-------------+



### Q6) What is the average score on Learning Git? 

The average score on Learning Git is **67.61%**.

In [34]:
spark.sql("select exam_name, count(*) as times_taken, round(avg(score),2) as avg_score from examsScores_tbl group by exam_name having exam_name == 'Learning Git' order by times_taken desc limit 10").show()

+------------+-----------+---------+
|   exam_name|times_taken|avg_score|
+------------+-----------+---------+
|Learning Git|        394|    67.61|
+------------+-----------+---------+



# Write to HDFS for table used to answer Q5), Q6)

In [35]:
# Table 2 for Q5), Q6)
exams_and_scores.write.parquet("/tmp/exams_and_scores_tbl")

# Conclusion of Spark Session. Return to `proj2_writeup.md`, section `Checking saved tables in HDFS`