# W205 Project 2 Tracking User Activity

### Background:

I work as a data scientst at an ed tech firm. In order to find business insights, I am going to explore assessments that lots of our clients are using and try to find the patterns of the data.  

### Read the data file

Identify my current work directory

In [6]:
! pwd

/home/jupyter/project-2-fengyaoluo


Set my work directory as /project-2-fengyaoluo/

In [7]:
! cd ~/project-2-fengyaoluo/

Read the file from the link and output it as a json file

In [8]:
! curl -L -o assessment-nested.json https://goo.gl/ME6hjp

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 9096k  100 9096k    0     0  19.9M      0 --:--:-- --:--:-- --:--:-- 57.3M


### Turn on Docker Compose

Turn up the docker compose 

In [10]:
! docker-compose up -d

Creating network "project2fengyaoluo_default" with the default driver
Creating project2fengyaoluo_mids_1
Creating project2fengyaoluo_zookeeper_1
Creating project2fengyaoluo_cloudera_1
Creating project2fengyaoluo_kafka_1
Creating project2fengyaoluo_spark_1


Check what is in Cloudera hadoop

In [12]:
! docker-compose exec cloudera hadoop fs -ls /tmp/

Found 1 items
drwxrwxrwt   - mapred mapred          0 2018-02-06 18:27 /tmp/hadoop-yarn


### Push data into kafka

Create the kafka topic "assessment"

In [13]:
! docker-compose exec kafka kafka-topics --create --topic assessment --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181

Created topic assessment.


If showing the error as "ERROR: No container found for kafka_1", try execute the following command. If not, skip.

`! docker-compose down`

### Parse Json file line by line to kafka

Use jq command to read the json file and -c to concatnate records in one line, then push into kafka, under the topic of assessment

In [14]:
! docker-compose exec mids bash -c "cat /project-2-fengyaoluo/assessment-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessment"

### Run Pyspark

Use docker compose to execute pyspark (This will take a while)

In [None]:
! docker-compose exec spark pyspark

Python 3.6.1 |Anaconda 4.4.0 (64-bit)| (default, May 11 2017, 13:09:58) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/10/21 21:52:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/10/21 21:53:04 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
20/10/21 21:53:04 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
20/10/21 21:53:06 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _

Use spark to read the data from the top to the end in the topic "assessment" and name it as "raw_assessment"

In [None]:
raw_assessment = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessment").option("startingOffsets", "earliest").option("endingOffsets", "latest").load()  

Cache the raw_assessment therefore we won't encounter an error unless we evaluate it

In [None]:
raw_assessment.cache()

In [None]:
output: DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

Check the schema of the data

In [None]:
raw_assessment.printSchema()

In [None]:
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

The schema shows the variables and their data type. The information that we are interested are stored inside the "value".

The schema shows that there is nested structure under the "sequences" column. We can use the spark sql to unwrap it and store it in HDFS for future querying. 

In [None]:
raw_assessment.write.parquet("/tmp/raw_assessment")

I wrote the raw_assessment file into the hdfs just in case any data scientists would like to use the raw data to do analysis in the future. 

### Cast raw data value to string and turn it into RDD

Cast the data's value as string and save it under the name as "assessment1"

In [None]:
assessment1 = raw_assessment.select(raw_assessment.value.cast('string'))  

import sys package and then take the string and encode it as 'utf8' and output as write a file 

In [None]:
import sys
sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1)

import json package and then load the value of raw_assessment to RDD dataframe, and then apply a map to it. In the end, show the table. 

In [None]:
import json
assessment1.rdd.map(lambda x: json.loads(x.value)).toDF().show()

In [None]:
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|Map(questions -> ...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|Map(questions -> ...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false|The Principles of...| 1516738973.653394|5a67999d3ed3e3000...| 1516738973.653394|         1.0|Map(questions -> ...|2018-01-23T20:22:...|8edbc8a8-4d26-429...|
|4beeac16-bb83-4d5...|        false|The Principles of...|1516738921.1137421|5a6799694fc7c7000...|1516738921.1137421|         1.0|Map(questions -> ...|2018-01-23T20:21:...|c0ee680e-8892-4e6...|
|6442707e-7488-11e...|        false|Introduction to B...| 1516737000.212122|5a6791e824fccd000...| 1516737000.212122|         1.0|Map(questions -> ...|2018-01-23T19:48:...|e4525b79-7904-405...|
|8b4488de-43a5-4ff...|        false|        Learning Git| 1516740790.309757|5a67a0b6852c2a000...| 1516740790.309757|         1.0|Map(questions -> ...|2018-01-23T20:51:...|3186dafa-7acf-47e...|
|e1f07fac-5566-4fd...|        false|Git Fundamentals ...|1516746279.3801291|5a67b627cc80e6000...|1516746279.3801291|         1.0|Map(questions -> ...|2018-01-23T22:24:...|48d88326-36a3-4cb...|
|7e2e0b53-a7ba-458...|        false|Introduction to P...| 1516743820.305464|5a67ac8cb0a5f4000...| 1516743820.305464|         1.0|Map(questions -> ...|2018-01-23T21:43:...|bb152d6b-cada-41e...|
|1a233da8-e6e5-48a...|        false|Intermediate Pyth...|  1516743098.56811|5a67a9ba060087000...|  1516743098.56811|         1.0|Map(questions -> ...|2018-01-23T21:31:...|70073d6f-ced5-4d0...|
|7e2e0b53-a7ba-458...|        false|Introduction to P...| 1516743764.813107|5a67ac54411aed000...| 1516743764.813107|         1.0|Map(questions -> ...|2018-01-23T21:42:...|9eb6d4d6-fd1f-4f3...|
|4cdf9b5f-fdb7-4a4...|        false|A Practical Intro...|1516744091.3127241|5a67ad9b2ff312000...|1516744091.3127241|         1.0|Map(questions -> ...|2018-01-23T21:45:...|093f1337-7090-457...|
|e1f07fac-5566-4fd...|        false|Git Fundamentals ...|1516746256.5878439|5a67b610baff90000...|1516746256.5878439|         1.0|Map(questions -> ...|2018-01-23T22:24:...|0f576abb-958a-4c0...|
|87b4b3f9-3a86-435...|        false|Introduction to M...|  1516743832.99235|5a67ac9837b82b000...|  1516743832.99235|         1.0|Map(questions -> ...|2018-01-23T21:40:...|0c18f48c-0018-450...|
|a7a65ec6-77dc-480...|        false|   Python Epiphanies|1516743332.7596769|5a67aaa4f21cc2000...|1516743332.7596769|         1.0|Map(questions -> ...|2018-01-23T21:34:...|b38ac9d8-eef9-495...|
|7e2e0b53-a7ba-458...|        false|Introduction to P...| 1516743750.097306|5a67ac46f7bce8000...| 1516743750.097306|         1.0|Map(questions -> ...|2018-01-23T21:41:...|bbc9865f-88ef-42e...|
|e5602ceb-6f0d-11e...|        false|Python Data Struc...|1516744410.4791961|5a67aedaf34e85000...|1516744410.4791961|         1.0|Map(questions -> ...|2018-01-23T21:51:...|8a0266df-02d7-44e...|
|e5602ceb-6f0d-11e...|        false|Python Data Struc...|1516744446.3999851|5a67aefef5e149000...|1516744446.3999851|         1.0|Map(questions -> ...|2018-01-23T21:53:...|95d4edb1-533f-445...|
|f432e2e3-7e3a-4a7...|        false|Working with Algo...| 1516744255.840405|5a67ae3f0c5f48000...| 1516744255.840405|         1.0|Map(questions -> ...|2018-01-23T21:50:...|f9bc1eff-7e54-42a...|
|76a682de-6f0c-11e...|        false|Learning iPython ...| 1516744023.652257|5a67ad579d5057000...| 1516744023.652257|         1.0|Map(questions -> ...|2018-01-23T21:46:...|dc4b35a7-399a-4bd...|
|a7a65ec6-77dc-480...|        false|   Python Epiphanies|1516743398.6451161|5a67aae6753fd6000...|1516743398.6451161|         1.0|Map(questions -> ...|2018-01-23T21:35:...|d0f8249a-597e-4e1...|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
only showing top 20 rows

This is the df which displayed the top layer of the Json file. From the first glance, the column "sequences" has the nested structure. 

From the pyspark.sql package import Row function;
create the row objects from json;
this method is more common compared to the last one, therefore, we are going to stick with it and use it to create a temperature table to do some analysis 

In [None]:
from pyspark.sql import Row
extracted_assessment1 = assessment1.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()

In [None]:
extracted_assessment1.printSchema()

In [None]:
root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)

The schema proves our thinking that the sequences column has the nested data and we are going to unwrap the sequences column and save it in hdfs, for future usage. However, this sql schema does not give us the name of the elements inside the nested structure. In order to have better ideas about the data, we are going to use the following line in another terminal to take a look at the Json file. 

### Check Json file structure (in another terminal)

In [None]:
! docker-compose exec mids bash -c "cat /project-2-fengyaoluo/assessment-nested.json | jq '.[]'"

create a spark temporary table and the use SQL query to unwrap the data and answer our business questions

In [None]:
extracted_assessment1.registerTempTable('assessment')

### Unnest the Json file and save to HDFS

use the spark sql to query the user exam id (for future join if needed) and query the questions from the exam

In [None]:
questions_from_exam = spark.sql("select user_exam_id, sequences.questions from assessment ")

use the spark sql to query the user exam id, and whether the user gets the first question correct or wrong

In [None]:
correct_from_question_1 = spark.sql("select user_exam_id, sequences.questions[1].user_correct from assessment")

put both tables into hdfs. Ideally, we can user the unique key to join different tables to get the full picture of how many questions did the use answer correctly. 

In [None]:
questions_from_exam.write.parquet("/tmp/questions_from_exam")
correct_from_question_1.write.parquet("/temp/correct_from_question_1")

### Analysis the data in spark

Question 1: How many assesstments are in the dataset?

In [None]:
spark.sql("select count(*) from assessment ").show()

In [None]:
+--------+
|count(1)|
+--------+
|    3280|
+--------+

Question 2: What's the name of your Kafka topic? How did you come up with that name?

The name of my kafka topic for this data is "assessment". The topic should be related to the content of the dataset. Because the data set is about the assessment of different exams, I chose the topic name as "topic". 

Question 3: How many people took Learning Git?

In [None]:
spark.sql("select exam_name, count(*)  from assessment where exam_name = 'Learning Git' group by exam_name").show()

In [None]:
+------------+--------+                                                         
|   exam_name|count(1)|
+------------+--------+
|Learning Git|     394|
+------------+--------+

Question 4: What is the least common course taken? And the most common?

In [None]:
spark.sql("select exam_name, count(*)  from assessment group by exam_name order by count(*) ").show(truncate = False)

In [None]:
+---------------------------------------------------+--------+                  
|exam_name                                          |count(1)|
+---------------------------------------------------+--------+
|Learning to Visualize Data with D3.js              |1       |
|Native Web Apps for Android                        |1       |
|Nulls, Three-valued Logic and Missing Information  |1       |
|Operating Red Hat Enterprise Linux Servers         |1       |
|The Closed World Assumption                        |2       |
|Client-Side Data Storage for Web Developers        |2       |
|Arduino Prototyping Techniques                     |2       |
|Understanding the Grails 3 Domain Model            |2       |
|Hibernate and JPA Fundamentals                     |2       |
|What's New in JavaScript                           |2       |
|Learning Spring Programming                        |2       |
|Mastering Web Views                                |3       |
|Using Web Components                               |3       |
|Service Based Architectures                        |3       |
|Getting Ready for Angular 2                        |3       |
|Building Web Services with Java                    |3       |
|View Updating                                      |4       |
|Using Storytelling to Effectively Communicate Data |4       |
|An Introduction to Set Theory                      |5       |
|Example Exam For Development and Testing oh yeahsdf|5       |
+---------------------------------------------------+--------+
only showing top 20 rows

In [None]:
spark.sql("select exam_name, count(*)  from assessment group by exam_name order by count(*) desc ").show(truncate = False)

In [None]:
+--------------------------------------------------------------+--------+       
|exam_name                                                     |count(1)|
+--------------------------------------------------------------+--------+
|Learning Git                                                  |394     |
|Introduction to Python                                        |162     |
|Introduction to Java 8                                        |158     |
|Intermediate Python Programming                               |158     |
|Learning to Program with R                                    |128     |
|Introduction to Machine Learning                              |119     |
|Software Architecture Fundamentals Understanding the Basics   |109     |
|Beginning C# Programming                                      |95      |
|Learning Eclipse                                              |85      |
|Learning Apache Maven                                         |80      |
|Beginning Programming with JavaScript                         |79      |
|Mastering Git                                                 |77      |
|Introduction to Big Data                                      |75      |
|Advanced Machine Learning                                     |67      |
|Learning Linux System Administration                          |59      |
|JavaScript: The Good Parts Master Class with Douglas Crockford|58      |
|Learning SQL                                                  |57      |
|Practical Java Programming                                    |53      |
|HTML5 The Basics                                              |52      |
|Python Epiphanies                                             |51      |
+--------------------------------------------------------------+--------+
only showing top 20 row

### Export the extracted table as Parquet to HDFS

In [None]:
extracted_assessment1.write.parquet("/tmp/extracted_assessment")

### Exit from the spark

use ctrl +d to exit spark 

### Save spark history 

In [None]:
! docker-compose exec spark cat /root/.python_history > hist_spark.text

### Check whether the tables have been saved in HDFS

In [None]:
! docker-compose exec cloudera hadoop fs -ls /tmp/

In [None]:
drwxr-xr-x   - root   supergroup          0 2020-10-25 22:46 /tmp/extracted_assessment
drwxrwxrwt   - mapred mapred              0 2018-02-06 18:27 /tmp/hadoop-yarn
drwx-wx-wx   - root   supergroup          0 2020-10-25 20:31 /tmp/hive
drwxr-xr-x   - root   supergroup          0 2020-10-25 22:37 /tmp/questions_from_exam

### Turn down docker compose

In [None]:
! docker-compose down

### Save Terminal history

In [None]:
! history > fengyaoluo_hist.txt