**Import JSON and PySpark.SQL packages**

In [1]:
import json
from pyspark.sql import Row

**Read the messages from the topic eduAssessment and cache messages to cut back warnings later**

In [2]:
messages = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","eduAssessment").option("startingOffsets", "earliest").option("endingOffsets", "latest").load() 
messages.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

**Print the schema**

In [3]:
messages.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



**Cast messages as strings**

In [4]:
messages = messages.select(messages.value.cast('string'))

**Write this to hdfs**

In [5]:
messages.write.parquet("/tmp/messages")

**Show messages**

In [7]:
messages.show()

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 20 rows



**get rdd and apply a map to it**

In [8]:
messages.rdd.map(lambda x: json.loads(x.value)).toDF().show()



+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|Map(questions -> ...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|Map(questions -> ...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false

**Map the JSON content and create Row projects from JSON fields**

In [9]:
extracted_messages = messages.rdd.map(lambda x: json.loads(x.value)).toDF()
extracted_messages = messages.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()
extracted_messages.show()



+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|Map(questions -> ...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|Map(questions -> ...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false

**From the above, we can see that the DF still looks messy. Below shows schema of the converted messages. We can see that the data have nests.**

In [10]:
extracted_messages.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



**Save the converted messages to hdfs**

In [11]:
extracted_messages.write.parquet("/tmp/extracted_message")

**We will use SparkSQL to let us pick and choose the fields we want to analyze. First, we create a Spark TempTable called assessments**

In [12]:
extracted_messages.registerTempTable('assessments')

**The followings are three business questions that I believe data scientists may be interested to know about these data and explore more in depth to create new business ideas.**

 Question 1: How many assessments are in the dataset?

In [13]:
spark.sql("select count(*) from assessments").show()

+--------+
|count(1)|
+--------+
|    3280|
+--------+



Answer: The dataset has 3280 assessments.


Question 2: What are the 10 most popular courses and the 10 least popular courses? And how many times they have been taken?

Answer:

In [15]:
spark.sql("select exam_name, count(exam_name) as freq from assessments group by exam_name order by freq desc limit 10").show(truncate=False)

+-----------------------------------------------------------+----+
|exam_name                                                  |freq|
+-----------------------------------------------------------+----+
|Learning Git                                               |394 |
|Introduction to Python                                     |162 |
|Intermediate Python Programming                            |158 |
|Introduction to Java 8                                     |158 |
|Learning to Program with R                                 |128 |
|Introduction to Machine Learning                           |119 |
|Software Architecture Fundamentals Understanding the Basics|109 |
|Beginning C# Programming                                   |95  |
|Learning Eclipse                                           |85  |
|Learning Apache Maven                                      |80  |
+-----------------------------------------------------------+----+



In [16]:
spark.sql("select exam_name, count(exam_name) as freq from assessments group by exam_name order by freq limit 10").show(truncate=False)

+-------------------------------------------------+----+
|exam_name                                        |freq|
+-------------------------------------------------+----+
|Nulls, Three-valued Logic and Missing Information|1   |
|Learning to Visualize Data with D3.js            |1   |
|Native Web Apps for Android                      |1   |
|Operating Red Hat Enterprise Linux Servers       |1   |
|Client-Side Data Storage for Web Developers      |2   |
|Arduino Prototyping Techniques                   |2   |
|What's New in JavaScript                         |2   |
|Understanding the Grails 3 Domain Model          |2   |
|Hibernate and JPA Fundamentals                   |2   |
|Learning Spring Programming                      |2   |
+-------------------------------------------------+----+




Question 3: How many users are loyal customers and have taken more than 1 assessment?

In [17]:
spark.sql("select count(distinct user_exam_id) from assessments").show()

+----------------------------+
|count(DISTINCT user_exam_id)|
+----------------------------+
|                        3242|
+----------------------------+



In [18]:
spark.sql("select user_exam_id, count(exam_name) as num_exam_pp from assessments group by user_exam_id having num_exam_pp>1").show()

+--------------------+-----------+
|        user_exam_id|num_exam_pp|
+--------------------+-----------+
|d4ab4aeb-1368-486...|          3|
|bd96cfbe-1532-4ba...|          3|
|028ad26f-a89f-4a6...|          3|
|a7e6fc04-245f-4e3...|          3|
|a45b5ee6-a4ed-4b1...|          3|
|fa23b287-0d0a-468...|          3|
|b7ac6d15-97e1-4e9...|          3|
|c1eb4d4a-d6ef-43e...|          2|
|66d91177-c436-4ee...|          3|
|cdc5859d-b332-4fb...|          3|
|6132da16-2c0c-436...|          3|
|00745aef-f3af-412...|          3|
|37cf5b0c-4807-421...|          3|
|ac80a11a-2e79-40e...|          3|
|a244c11a-d890-4e3...|          3|
|c320d47f-60d4-49a...|          3|
|949aa36c-74c7-4fc...|          3|
|3d63ec69-8d97-4f9...|          3|
|1e325cc1-47a9-480...|          3|
|6e4889ab-5978-44b...|          2|
+--------------------+-----------+



Answer: There are total 3242 users, less than the total number of assessments taken by users. Clearly some users took more than 1 assessment.  Table from the second query shows there are 20 loyal customers who have taken more than 1 assessment.