# W205 - Spring 2020 - Project 2: Tracking User Activity

### By: Ali Asadi Nikooyan

## The list of most important commands to be used

Go to the project directory:
```
cd project-2-aanikooyan/
```
Create an _assignment_ branch (just once):
```
git branch assignment 
```
Checkout the _assignment_ branch:
```
git checkout assignment
```
Download the _assessments_ (.json file) into the repo (just once):
```
curl -L -o assessment-attempts-20180128-121051-nested.json https://goo.gl/ME6hjp (just once)
```
Looking at the _assessments_:
```
cat assessment-attempts-20180128-121051-nested.json |sort|uniq|wc -l
```
Copy the .yml file from the W205 course content repo into the project 2 repo (just once):
```
cp ~/w205/course-content/08-Querying-Data/docker-compose.yml ~/w205/project-2-aanikooyan
```
Edit the copied ```.yml``` file in order to make it possible to use the Spark in Jupyter Notebook (just once):
```
-vi docker-compose.yml
```
in the editor, add the following to the end of the _Spark_ section:
```
    expose:
      - "8888"
    ports:
      - "8888:8888"
```

Spin up the cluster and check:
```
docker-compose up -d
docker-compose ps
docker ps -a
```
Check Kafka (in a separate SSH window):
```
docker-compose logs -f kafka
```
Check out Hadoop:
```
docker-compose exec cloudera hadoop fs -ls /tmp/
```
Create a new topic called _assessments_:
```
docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```
Use ```kafkacat``` to produce test messages to the created topic:
```
docker-compose exec mids bash -c "cat /w205/project-2-aanikooyan/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments"
```
Create symbolic link in Spark (to be able to use the Spark with Jupyter Notebook):
```
docker-compose exec spark bash
```
In ``` bash shell```:
```sh
df -h
ln -s /w205 w205
ls -l
exit
```
Run Spark:
 - _Method 1_- using the spark container:
 ```
docker-compose exec spark pyspark
```
 - _Method 2_- using Jupyter notebook (to be used in this project):
   - First run:
```
docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark
```
   - Then, take my token and replace 0.0.0.0 with my own external IP address, for example:
```
        http://34.83.17.75:8888/?token=90745ddab8c1472d99ba9b3cb6fab17fe9410d1f7f99c17e
```
   - Finally, open ingcognito tab  and paste the modified address into it.
   - In the Jupyter Notebook create a new ```python3``` file called Project_2 (just once)
   
When done with ```Project_2``` in Jupyter Notebook:
 - ```File > Save and Checkpoint```
 - ``` File > Close and Halt```
 - Logout from the Jupyter Notebook
 
 Back in SSH window:
  - ```Ctrl+C``` and then ```Yes``` to terminate the Spark
 
Check out saved parquet files (in the terminal window):
```
docker-compose exec cloudera hadoop fs -ls /tmp/
docker-compose exec cloudera hadoop fs -ls /tmp/assessments/
docker-compose exec cloudera hadoop fs -ls /tmp/extracted_assessments/
```

 Docker down: 
  ```
  docker-compose down
  ```
  
 Make sure that the docker is down:
```
  docker-compose ps
  docker ps -a
  docker network ls
```


Save the history file:
```
history > history.txt
```
Finally, ```exit()``` from all windows and stop the AI Platform Notebooks.

### Check PySpark is running

In [1]:
spark

In [2]:
sc

### Import required packages

In [3]:
import json
import pprint
import sys
from pyspark.sql import Row
import pandas as pd

### Review the structure of the _assessments_ json file 

In [4]:
p = pprint.PrettyPrinter(indent=1)

In [110]:
f = open("assessment-attempts-20180128-121051-nested.json","r")
s = f.read()
json_data = json.loads(s)
f.close()

In [111]:
# this will pretty print the json in alphabetic order which may or may not match the file order
p.pprint(json_data[0])

{'base_exam_id': '37f0a30a-7464-11e6-aa92-a8667f27e5dc',
 'certification': 'false',
 'exam_name': 'Normal Forms and All That Jazz Master Class',
 'keen_created_at': '1516717442.735266',
 'keen_id': '5a6745820eb8ab00016be1f1',
 'keen_timestamp': '1516717442.735266',
 'max_attempts': '1.0',
 'sequences': {'attempt': 1,
               'counts': {'all_correct': False,
                          'correct': 2,
                          'incomplete': 1,
                          'incorrect': 1,
                          'submitted': 4,
                          'total': 4,
                          'unanswered': 0},
               'id': '5b28a462-7a3b-42e0-b508-09f3906d1703',
               'questions': [{'id': '7a2ed6d3-f492-49b3-b8aa-d080a8aad986',
                              'options': [{'at': '2018-01-23T14:23:24.670Z',
                                           'checked': True,
                                           'correct': True,
                                           'id': '

### Unrolling the _assessments_ data

In [13]:
# Read raw messages from kafka:
raw_assessments = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessments").option("startingOffsets", "earliest").option("endingOffsets", "latest").load() 

In [14]:
# Cache the raw messages (to prevent warnings later):
raw_assessments.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [15]:
# Show the Schema of the raw messages:
raw_assessments.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [16]:
# Cast the raw messages as strings:
assessments = raw_assessments.select(raw_assessments.value.cast('string'))
assessments.show()

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 20 rows



In [17]:
# Write this to hdfs:
assessments.write.parquet("/tmp/assessments")

In [18]:
# Extract the assessment data into the dataframe:
extracted_assessments = assessments.rdd.map(lambda x: json.loads(x.value)).toDF()
extracted_assessments.show()



+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|Map(questions -> ...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|Map(questions -> ...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false

In [19]:
# Show the extracted assessments:
extracted_assessments.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



In [20]:
# Save as parquet file:
extracted_assessments.write.parquet("/tmp/extracted_assessments")

In [21]:
# Use SparkSQL and create a Spark "TempTable":
extracted_assessments.registerTempTable('assessments')

### Perform some Queries

#### 1- How many assesstments are in the dataset?

In [100]:
assessments_count = spark.sql("select count(*) as total_assessments from assessments")
assessments_count.show()

+-----------------+
|total_assessments|
+-----------------+
|             3280|
+-----------------+



#### 2 - How many people took _Learning Apache Hadoop_?

In [109]:
learning_hadoop = spark.sql("select exam_name, count(*) as number_of_people_taken from assessments \
                            where exam_name like 'Learn%Apache H%' group by exam_name")
learning_hadoop.show()

+--------------------+----------------------+
|           exam_name|number_of_people_taken|
+--------------------+----------------------+
|Learning Apache H...|                    16|
+--------------------+----------------------+



#### 3- What were the top 10 most common courses taken?

In [29]:
top_10_exams = spark.sql("select distinct exam_name, count(*) as total \
           from assessments \
           group by exam_name \
           order by total desc \
           limit 10")
top_10_exams.show()

+--------------------+-----+
|           exam_name|total|
+--------------------+-----+
|        Learning Git|  394|
|Introduction to P...|  162|
|Introduction to J...|  158|
|Intermediate Pyth...|  158|
|Learning to Progr...|  128|
|Introduction to M...|  119|
|Software Architec...|  109|
|Beginning C# Prog...|   95|
|    Learning Eclipse|   85|
|Learning Apache M...|   80|
+--------------------+-----+



#### 4 - What were the bottom 10 least common courses taken?

In [27]:
bottom_10_exams = spark.sql("select distinct exam_name, count(*) as total \
           from assessments \
           group by exam_name \
           order by total \
           limit 10")
bottom_10_exams.show()

+--------------------+-----+
|           exam_name|total|
+--------------------+-----+
|Operating Red Hat...|    1|
|Learning to Visua...|    1|
|Nulls, Three-valu...|    1|
|Native Web Apps f...|    1|
|What's New in Jav...|    2|
|Hibernate and JPA...|    2|
|Arduino Prototypi...|    2|
|The Closed World ...|    2|
|Learning Spring P...|    2|
|Understanding the...|    2|
+--------------------+-----+



#### Dealing with nested multi-value and holes in the data

In [65]:
# A custom lambda function handle holes in json data

def my_lambda_sequences_count(x):
    
    raw_dict = json.loads(x.value)
    my_list = []
    
    if "sequences" in raw_dict and "exam_name" in raw_dict:
        
        if "counts" in raw_dict["sequences"]:
            
            if  "correct" in raw_dict["sequences"]["counts"] and \
                "incomplete" in raw_dict["sequences"]["counts"] and \
                "incorrect" in raw_dict["sequences"]["counts"] and \
                "submitted" in raw_dict["sequences"]["counts"] and \
                "unanswered" in raw_dict["sequences"]["counts"] and \
                "total" in raw_dict["sequences"]["counts"]  :
                    
                my_dict = {"correct": raw_dict["sequences"]["counts"]["correct"],
                           "incomplete": raw_dict["sequences"]["counts"]["incomplete"],
                           "incorrect": raw_dict["sequences"]["counts"]["incorrect"],
                           "submitted": raw_dict["sequences"]["counts"]["submitted"],
                           "unanswered": raw_dict["sequences"]["counts"]["unanswered"],
                           "total": raw_dict["sequences"]["counts"]["total"],
                           "exam_name": raw_dict["exam_name"]  }
                my_list.append(Row(**my_dict))
    
    return my_list

#### 5- How was the performance of all people who took _Learning Apache Hadoop_?

In [108]:
# Due to the holes in data, the custom my_lambda_sequences_count should be used here
course_stat = assessments.rdd.flatMap(my_lambda_sequences_count).toDF()
course_stat.registerTempTable('cs')

# Total numner of people who took the course
spark.sql("select exam_name, count(*) as number_of_people_taken from cs \
            where exam_name like 'Learn%Apache H%' group by exam_name").show()

# The performance of individuals in the exam
spark.sql("select exam_name,correct,incorrect,incomplete,submitted,unanswered, total from cs  \
            where exam_name like 'Learn%Apache H%' order by correct desc").show()

+--------------------+----------------------+
|           exam_name|number_of_people_taken|
+--------------------+----------------------+
|Learning Apache H...|                    16|
+--------------------+----------------------+

+--------------------+-------+---------+----------+---------+----------+-----+
|           exam_name|correct|incorrect|incomplete|submitted|unanswered|total|
+--------------------+-------+---------+----------+---------+----------+-----+
|Learning Apache H...|      4|        0|         0|        4|         0|    4|
|Learning Apache H...|      4|        0|         0|        4|         0|    4|
|Learning Apache H...|      4|        0|         0|        4|         0|    4|
|Learning Apache H...|      4|        0|         0|        4|         0|    4|
|Learning Apache H...|      4|        0|         0|        4|         0|    4|
|Learning Apache H...|      3|        1|         0|        4|         0|    4|
|Learning Apache H...|      3|        0|         1|       