# Project 2: Tracking User Activity

### Set up

This notebook is designed to be run in a pyspark container. The environment is set up using the `docker-compose.yml` file in this repo and by running the following steps in the terminal:

Start container and pull images:  
`docker-compose up -d`

Create topic called assessments to store assessment data:  
`docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181`

Link spark back to main directory:  
`docker-compose exec spark ln -s /w205 w205`

Create data folder and download data to it:   
`mkdir data`  
`curl -L -o data/assessment-attempts-20180128-121051-nested.json https://goo.gl/ME6hjp`

Publish data to our assessments topic:  
`docker-compose exec mids bash -c "cat /w205/project-2-courtney-smith-97/data/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments"`

Connect to pyspark and launch a jupyter notebook:  
`docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark`    

To access the notebook, copy the URL and replace '0.0.0.0' with the external IP address of the GCP VM instance.

### Processing and Querying the Data

Next we read the data from our assessments topic and create an RDD.

In [1]:
raw_assessments = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessments").option("startingOffsets", "earliest").option("endingOffsets", "latest").load()
raw_assessments.cache()
raw_assessments.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



`value` contains the data about the assessments. The next step is to extract this component and create a temporary table.

In [2]:
import json
from pyspark.sql import Row
#select only value component
val_assessments = raw_assessments.select(raw_assessments.value.cast('string'))
#create temporary table from df
extract_assessments = val_assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()
extract_assessments.registerTempTable('assessments')

Now we can run queries against the temporary table `assessments`.

In [3]:
#look at the first few rows of data
spark.sql('select * from assessments limit 5').show()

+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|Map(questions -> ...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|Map(questions -> ...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false

In [4]:
#count the number of distinct exams taken
spark.sql('select count(distinct(base_exam_id)) from assessments').show()

+----------------------------+
|count(DISTINCT base_exam_id)|
+----------------------------+
|                         107|
+----------------------------+



In [5]:
#show the 5 most common exams
spark.sql('select exam_name, count(*) as num_taken from assessments group by exam_name order by num_taken desc limit 5').show()

+--------------------+---------+
|           exam_name|num_taken|
+--------------------+---------+
|        Learning Git|      394|
|Introduction to P...|      162|
|Intermediate Pyth...|      158|
|Introduction to J...|      158|
|Learning to Progr...|      128|
+--------------------+---------+



In [None]:
#show the average score for the 5 most common exams

#create function to extract total number of questions and number of correct answers from sequences nested dict
def map_correct(assessments):
    raw_dict = json.loads(assessments.value)
    assessment_counts = []
    if "sequences" in raw_dict:        
        if "counts" in raw_dict["sequences"]:   
            if "correct" in raw_dict["sequences"]["counts"] and "total" in raw_dict["sequences"]["counts"]:   
                seq_dict = {"keen_id" : raw_dict["keen_id"],
                           "num_correct" : raw_dict["sequences"]["counts"]["correct"], 
                           "total" : raw_dict["sequences"]["counts"]["total"]}
                assessment_counts.append(Row(**seq_dict))
    return assessment_counts
#create df with id, num correct, and total questions
correct_total = val_assessments.rdd.flatMap(map_correct).toDF()
correct_total.registerTempTable('correct_counts')

#join new df to assessments on id, get avg exam score for top 5 most common exams
spark.sql("select assessments.exam_name, " +
          "count(*) as num_taken, "+
          "round(avg(correct_counts.num_correct*100 / correct_counts.total),2) as avg_score " +
          "from assessments, correct_counts where assessments.keen_id = correct_counts.keen_id " +
          "group by assessments.exam_name " +
          "order by avg_score desc " +
          "limit 5").show()

### Save data and shut down

In [24]:
#write extracted assessment data to 
extract_assessments.write.parquet("/tmp/extract_assessments")

Run `docker-compose exec cloudera hadoop fs -ls /tmp/extract_assessments` to check that the file saved, then shut down the kernel and run `docker-compose down` in the terminal to stop the container.