# Project 2: Tracking User Activity

## Abstract

In this project, a service that delivers assessments from an education tech firm was created. The data outcome is ready for further queries work according to customer's requirements. 

Main tasks of this project

- Publish and consume messages with Kafka
- Use Spark to transform the messages. 
- Use Spark to transform the messages so that can be landed in HDFS

## 1. Data & Docker Set-up

- The original data was acquired by running 

```
curl -L -o assessment-attempts-20180128-121051-nested.json https://goo.gl/ME6hjp`
```

- The data is contained in the file `assessment-attempts-20180128-121051-nested.json`, which is inclued in this repository. 

- Configuration of application service  is  defined in `docker-compose.yml`, which is included in this repository. Docker images of cloudera, kafka, mids, spark, zookeeper was used in this project. 

In [1]:
! docker-compose up -d

Starting project-2-haoyuzhang89_cloudera_1 ... 
Starting project-2-haoyuzhang89_zookeeper_1 ... 
Starting project-2-haoyuzhang89_mids_1      ... 
[3BStarting project-2-haoyuzhang89_spark_1     ... mdone[0m
[3BStarting project-2-haoyuzhang89_kafka_1     ... mdone[0m[3A[2K
[2Bting project-2-haoyuzhang89_spark_1     ... [32mdone[0m[2A[2K

- State list of Docker image:

In [2]:
! docker-compose ps

         Name                   Command           State           Ports         
--------------------------------------------------------------------------------
project-2-haoyuzhang89   cdh_startup_script.sh    Up      11000/tcp, 11443/tcp, 
_cloudera_1                                               19888/tcp, 50070/tcp, 
                                                          8020/tcp, 8088/tcp,   
                                                          8888/tcp, 9090/tcp    
project-2-haoyuzhang89   /etc/confluent/docker/   Up      29092/tcp, 9092/tcp   
_kafka_1                 run                                                    
project-2-haoyuzhang89   /bin/bash                Up      8888/tcp              
_mids_1                                                                         
project-2-haoyuzhang89   docker-entrypoint.sh     Up      0.0.0.0:8888->8888/tcp
_spark_1                 bash                                                   
project-2-haoyuzhang89   /et

## 2. Publish & Consume with Kafka

### 2.1 Public Messages

- The kafka topic is named as __assessment__ since the json file containing assessment results of users from an education tech firm. 

In [3]:
! docker-compose exec kafka \
  kafka-topics \
    --create \
    --topic assessment \
    --partitions 1 \
    --replication-factor 1 \
    --if-not-exists \
    --zookeeper zookeeper:32181

- Description of the kafka topic __assessment__ is listed to check if the topic was created successfully. 

In [4]:
! docker-compose exec kafka \
  kafka-topics \
    --describe \
    --topic assessment \
    --zookeeper zookeeper:32181

Topic: assessment	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: assessment	Partition: 0	Leader: 1	Replicas: 1	Isr: 1


- The assessment information of `assessment-attempts-20180128-121051-nested.json` was published into the kafka topic __assessment__.

In [5]:
! docker-compose exec mids bash -c "cat project-2-haoyuzhang89/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessment"

### 2.2 Consume Messages

- Read mesages from kafka and count lines number of the messages. It shows that there is 3281 lines of messages contained in the kafka topic __assessment__ by now.

In [8]:
! docker-compose exec mids bash -c "kafkacat -C -b kafka:29092 -t assessment -o beginning -e "|wc -l

3281


- Load messages from kafka to display final lines of the messages. Curretnly, messages contained in the kafka topic is barely organized. It is difficult to interprete useful information from it right now. However, it shows that the messages publish work was accomplished successfully in the previous section. 

In [9]:
! docker-compose exec mids bash -c "kafkacat -C -b kafka:29092 -t assessment -o 3278 -e"

{"keen_timestamp":"1513766763.6051481","max_attempts":"1.0","started_at":"2017-12-20T10:44:09.162Z","base_exam_id":"f80366d9-db60-41c3-a1c4-6c7789b478f8","user_exam_id":"b1896278-669f-4346-80fd-21d0ba898d5d","sequences":{"questions":[{"user_incomplete":false,"user_correct":true,"options":[{"checked":false,"id":"2e31babb-5a1c-47bd-bcb7-d4fa3d43794f"},{"checked":true,"at":"2017-12-20T10:44:53.863Z","id":"68e4f6aa-2adb-4402-b5b7-b7610993edc6","submitted":1,"correct":true},{"checked":true,"at":"2017-12-20T10:44:49.319Z","id":"bbe2135d-cc21-493b-b4e1-8a182e6211d4","submitted":1,"correct":true},{"checked":true,"at":"2017-12-20T10:44:37.080Z","id":"feba554f-1b5e-422d-93ac-b9202a91014b","submitted":1,"correct":true}],"user_submitted":true,"id":"bf8306dd-889f-4e10-b305-ef446fa4cec4","user_result":"correct"},{"user_incomplete":false,"user_correct":true,"options":[{"checked":false,"id":"17d6282d-8c2f-4d67-b15e-c563b38c6a99"},{"checked":true,"at":"2017-12-20T10:45:10.020Z","id":"5ab90b76-f93a-4078

## 3. Tranform in Spark

### 3.1 Pyspark Set-up

- The spark transform is operated by pyspark driver in a jupyter notebook. Running the spark driver in a jupyter notebook environment as below.

In [None]:
! docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root --notebook-dir=/w205/' pyspark

[32m[I 21:43:02.862 NotebookApp](B[m Serving notebooks from local directory: /w205
[32m[I 21:43:02.862 NotebookApp](B[m 0 active kernels 
[32m[I 21:43:02.863 NotebookApp](B[m The Jupyter Notebook is running at: http://0.0.0.0:8888/?token=89558d308879801675a8098e2939216f6ff3a2967764ac34
[32m[I 21:43:02.863 NotebookApp](B[m Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 21:43:02.865 NotebookApp] 
    
    Copy/paste this URL into your browser when you connect for the first time,
    to login with a token:
        http://0.0.0.0:8888/?token=89558d308879801675a8098e2939216f6ff3a2967764ac34
[32m[I 21:43:50.385 NotebookApp](B[m 302 GET /?token=89558d308879801675a8098e2939216f6ff3a2967764ac34 (198.54.105.27) 0.81ms
[33m[W 21:43:56.231 NotebookApp](B[m Notebook project-2-haoyuzhang89/Project_2.ipynb is not trusted
[32m[I 21:43:57.467 NotebookApp](B[m Kernel started: 59a8c130-93e5-423b-95b5-abf465e03c39
Using Spark's default log

- Loading packages required for futher transfor work. 

In [1]:
import json
import pandas as pd
from pyspark.sql.functions import explode, split
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import warnings

- Check out Hadoop.

In [18]:
! docker-compose exec cloudera hadoop fs -ls /tmp/

Found 2 items
drwxrwxrwt   - mapred mapred              0 2018-02-06 18:27 /tmp/hadoop-yarn
drwx-wx-wx   - root   supergroup          0 2020-10-24 21:38 /tmp/hive


### 3.2 Read from Kafka topic: assessment

- Reading the raw assessment messages from kafka topic __assessment__. 

In [None]:
raw_assessment = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:29092") \
  .option("subscribe","assessment") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load() 

- Cache the raw assessment messages. 

In [3]:
raw_assessment.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

- Check the schema of the raw assessment messages. 

In [4]:
raw_assessment.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



- Cast the raw assessment data as string type. Then save the string data in HDFS.

In [5]:
assessment = raw_assessment.select(raw_assessment.value.cast('string'))
assessment.write.parquet("/tmp/assessment")

- Display the raw assessment data as string type.

In [7]:
assessment.show(4)

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 4 rows



### 3.3 Spark Infered Schema

- Filter the assessment json file by mapping. 

In [8]:
extracted_assessment = assessment.rdd.map(lambda x: json.loads(x.value)).toDF()



- Below is the schema infered by spark. The json extraction based on the infered schema is created as a Spark TempTable of __assessment__. 

In [9]:
extracted_assessment.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



In [11]:
extracted_assessment.registerTempTable('assessment')

### 3.4 Forced Schema

- Some columns included in the spark infered schema in previous secion are not in need of further queries work. So a forced schema is created to only include the information of users, exams' identities (*user_exam_id*, *base_exam_id*, *exam_name*, *keen_id*,etc.) and assessment performance (*incomplete*,*submitted*,*incorrect*, etc.). This schema named as __final_schema__ is created as below.

In [12]:
final_schema = StructType(
    [StructField('user_exam_id', StringType(), True),
     StructField('base_exam_id', StringType(), True),
     StructField('keen_id', StringType(), True),
     StructField('exam_name', StringType(), True),
     StructField('certification', StringType(), True),
     StructField('keen_timestamp', StringType(), True),
     StructField('sequences',StructType(
         [StructField('counts',StructType([
             StructField('incomplete',IntegerType(), True),
             StructField('submitted',IntegerType(), True),
             StructField('incorrect',IntegerType(), True),
             StructField('all_correct',StringType(), True),
             StructField('correct',IntegerType(), True),
             StructField('total',IntegerType(), True),
             StructField('unanswered',IntegerType(), True)
         ]))]))
    ])


- The json extraction based on the __final_schema__ is saved to HDFS. 

In [13]:
select_extracted_assessment = assessment.rdd.map(lambda x: json.loads(x.value)).toDF(schema=final_schema)
select_extracted_assessment.write.parquet("/tmp/select_extracted_assessment")

- And a spark TempTable is created as __select_assessment__ for Spark SQL in the following section. 

In [14]:
select_extracted_assessment.registerTempTable('select_assessment')

- Check the schema of extraction  works as design.

In [15]:
select_extracted_assessment.printSchema()

root
 |-- user_exam_id: string (nullable = true)
 |-- base_exam_id: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- sequences: struct (nullable = true)
 |    |-- counts: struct (nullable = true)
 |    |    |-- incomplete: integer (nullable = true)
 |    |    |-- submitted: integer (nullable = true)
 |    |    |-- incorrect: integer (nullable = true)
 |    |    |-- all_correct: string (nullable = true)
 |    |    |-- correct: integer (nullable = true)
 |    |    |-- total: integer (nullable = true)
 |    |    |-- unanswered: integer (nullable = true)



- By now, the extracted data is shown as below, which is ready for queries.

In [16]:
select_extracted_assessment.show(4)

+--------------------+--------------------+--------------------+--------------------+-------------+------------------+--------------------+
|        user_exam_id|        base_exam_id|             keen_id|           exam_name|certification|    keen_timestamp|           sequences|
+--------------------+--------------------+--------------------+--------------------+-------------+------------------+--------------------+
|6d4089e4-bde5-4a2...|37f0a30a-7464-11e...|5a6745820eb8ab000...|Normal Forms and ...|        false| 1516717442.735266|[[1,4,1,false,2,4...|
|2fec1534-b41f-441...|37f0a30a-7464-11e...|5a674541ab6b0a000...|Normal Forms and ...|        false| 1516717377.639827|[[2,4,1,false,1,4...|
|8edbc8a8-4d26-429...|4beeac16-bb83-4d5...|5a67999d3ed3e3000...|The Principles of...|        false| 1516738973.653394|[[0,4,1,false,3,4...|
|c0ee680e-8892-4e6...|4beeac16-bb83-4d5...|5a6799694fc7c7000...|The Principles of...|        false|1516738921.1137421|[[2,4,0,false,2,4...|
+-------------------

## 4. Querying Data with Spark SQL

### 4.1 How many assesstments are in the dataset?

- Overall, there are 3280 assessments in the dataset. However, there are some duplicate records in the dataset. According to the *keen_id* and *user_exam_id*, there are 3242 distinct assesstments in the dataset. 

In [40]:
spark.sql("select count(keen_id) dateset_size from select_assessment").show()

+------------+
|dateset_size|
+------------+
|        3280|
+------------+



In [41]:
spark.sql("select count(distinct keen_id) as keen_id_num from select_assessment").show()

+-----------+
|keen_id_num|
+-----------+
|       3242|
+-----------+



In [38]:
spark.sql("select count(distinct user_exam_id) as user_exam_id_num from select_assessment").show()

+----------------+
|user_exam_id_num|
+----------------+
|            3242|
+----------------+



- In addition, there are 103 distinct exam names  in this dataset. While, there is 107 distinct exam ids in this dataset, which means some courses are of multiple exam IDs. In other words, there are multiple exam versions for some courses. Query below shows that courses like Introduction to Python, Being a Better Inrovert, Great Bash and Architectural Considerations for Hadoop Applications are of 2 exam IDs. 

In [37]:
spark.sql("select count(distinct exam_name) as exam_num from select_assessment").show()

+--------+
|exam_num|
+--------+
|     103|
+--------+



In [34]:
spark.sql("select count(distinct base_exam_id) as base_exam_id_num from select_assessment").show()

+----------------+
|base_exam_id_num|
+----------------+
|             107|
+----------------+



In [36]:
spark.sql("select count(distinct base_exam_id) as ID_num_exam, exam_name from select_assessment group by exam_name order by ID_num_exam DESC").show(10,False)

+-----------+----------------------------------------------------+
|ID_num_exam|exam_name                                           |
+-----------+----------------------------------------------------+
|2          |Introduction to Python                              |
|2          |Being a Better Introvert                            |
|2          |Great Bash                                          |
|2          |Architectural Considerations for Hadoop Applications|
|1          |Learning Apache Hadoop                              |
|1          |Learning C# Best Practices                          |
|1          |Introduction to Java 8                              |
|1          |Introduction to Architecting Amazon Web Services    |
|1          |Learning Spring Programming                         |
|1          |Learning iPython Notebook                           |
+-----------+----------------------------------------------------+
only showing top 10 rows



### 4.2 How many people took Learning Git?

- There were 390 people taking Learning Git, according to the distinct *user_exam_id*.

In [29]:
spark.sql("select count(distinct user_exam_id) as People_Taking_Learning_Git from select_assessment WHERE exam_name = 'Learning Git'").show()

+--------------------------+
|People_Taking_Learning_Git|
+--------------------------+
|                       390|
+--------------------------+



### 4.3 What is the least common course taken? And the most common?

- The least common courses are Learning to Visualize Data with D3.js, Native Web Apps for Android, Nulls, Three-valued Logic and Missing Information and The Closed World Assumption. There was only 1 person taking these courses, respectively. 

In [31]:
spark.sql("select count(user_exam_id) as Popularity, exam_name from select_assessment group by exam_name order by popularity").show(20,False)

+----------+---------------------------------------------------+
|Popularity|exam_name                                          |
+----------+---------------------------------------------------+
|1         |Learning to Visualize Data with D3.js              |
|1         |Native Web Apps for Android                        |
|1         |Nulls, Three-valued Logic and Missing Information  |
|1         |Operating Red Hat Enterprise Linux Servers         |
|2         |The Closed World Assumption                        |
|2         |Client-Side Data Storage for Web Developers        |
|2         |Arduino Prototyping Techniques                     |
|2         |Understanding the Grails 3 Domain Model            |
|2         |Hibernate and JPA Fundamentals                     |
|2         |What's New in JavaScript                           |
|2         |Learning Spring Programming                        |
|3         |Mastering Web Views                                |
|3         |Using Web Com

- The most common course is Learning Git. There were 390 people taking this course according to the *user_exam_id* and *keen_id*.

In [32]:
spark.sql("select count(distinct user_exam_id) as Popularity, exam_name from select_assessment group by exam_name order by popularity DESC").show(10, False)

+----------+-----------------------------------------------------------+
|Popularity|exam_name                                                  |
+----------+-----------------------------------------------------------+
|390       |Learning Git                                               |
|162       |Introduction to Python                                     |
|158       |Introduction to Java 8                                     |
|156       |Intermediate Python Programming                            |
|128       |Learning to Program with R                                 |
|119       |Introduction to Machine Learning                           |
|109       |Software Architecture Fundamentals Understanding the Basics|
|85        |Learning Eclipse                                           |
|83        |Beginning C# Programming                                   |
|80        |Learning Apache Maven                                      |
+----------+---------------------------------------

In [33]:
spark.sql("select count(distinct keen_id) as Popularity, exam_name from select_assessment group by exam_name order by popularity DESC").show(10, False)

+----------+-----------------------------------------------------------+
|Popularity|exam_name                                                  |
+----------+-----------------------------------------------------------+
|390       |Learning Git                                               |
|162       |Introduction to Python                                     |
|158       |Introduction to Java 8                                     |
|156       |Intermediate Python Programming                            |
|128       |Learning to Program with R                                 |
|119       |Introduction to Machine Learning                           |
|109       |Software Architecture Fundamentals Understanding the Basics|
|85        |Learning Eclipse                                           |
|83        |Beginning C# Programming                                   |
|80        |Learning Apache Maven                                      |
+----------+---------------------------------------