# Project: Tracking User Activity
In this project we will be getting streams of testing/assessment data from education publishing and assessment companies (e.g. Pearson). In order for analysis by data scientists to be done, the data that is being streamed will be captured using Apache Kakfa, and transformed using Spark for the various purposes (e.g. stream analysis, aggregation and storage within a distributed file system)  

# 1. Getting Started
In this section we will be creating the directory for which the project work will be stored. It will contain the necessary docker-compose.yml file needed to create the docker cluster that has Kafka, Zookeeper, and Spark. In addition, we will be importing in a sample file that will be used to publish messages in Kafka.

### 1.1. Create project2 directory for the work
```
mkdir ~/w205/project2
```

### 1.2. Change into the directory that was just created
```
cd ~/w205/project2
```

### 1.3. Copy .yml file to the project2 directory so that we can compose a cluster
```
cp ~/w205/course-content/08-Querying-Data/docker-compose.yml .
```

### 1.4. Edit the .yml file
Changing the mount directory, and also exposing port 8888 for spark so that jupyter notebook can be run.

```
vi docker-compose.yml

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
    expose:
      - "2181"
      - "2888"
      - "32181"
      - "3888"
    extra_hosts:
      - "moby:127.0.0.1"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    expose:
      - "9092"
      - "29092"
    extra_hosts:
      - "moby:127.0.0.1"

  cloudera:
    image: midsw205/cdh-minimal:latest
    expose:
      - "8020" # nn
      - "50070" # nn http
      - "8888" # hue
    #ports:
    #- "8888:8888"
    extra_hosts:
      - "moby:127.0.0.1"

  spark:
    image: midsw205/spark-python:0.0.5
    stdin_open: true
    tty: true
    volumes:
      - /home/science/w205:/w205
    expose:
      - "8888"
    ports:
      - "8888:8888"
    command: bash
    depends_on:
      - cloudera
    environment:
      HADOOP_NAMENODE: cloudera
    extra_hosts:
      - "moby:127.0.0.1"

  mids:
    image: midsw205/base:latest
    stdin_open: true
    tty: true
    volumes:
      - /home/science/w205:/w205
    extra_hosts:
      - "moby:127.0.0.1"
```

## 1.5. import sample data
```
curl -L -o assessment-attempts-20180128-121051-nested.json https://goo.gl/f5bRm4
```

## 1.6. Check that the .json is there
```
ls -l
```

# 2. Cluster setup
In this section, we will be starting up the cluster and creating a default topic within Kafka, so that messages can start being published and consumed prior to being transformed using Spark.

### 2.1. Start the docker cluster
```
docker-compose up -d
```

### 2.2. Create a topic named "commits"
```
docker-compose exec kafka \
  kafka-topics \
  	--create \
	--topic commits \
	--partitions 1 \
	--replication-factor 1 \
	--if-not-exists \
	--zookeeper zookeeper:32181
```

### 2.3. Check the topic "commits" exists
```
docker-compose exec kafka \
  kafka-topics \
    --describe \
    --topic commits \
    --zookeeper zookeeper:32181
```
### 2.4. Check the directory for hadoop
```
docker-compose exec cloudera hadoop fs -ls /tmp/
```

### 2.5. open up log
Opening a log window for kafka so that activity can be monitored

```
docker-compose logs -f kafka
```

# 3. Quick investigation on the data
Now that Kafka is setup and ready to publish/consume messages, we will do some preliminary quality check on the sample file that will be used to test out the data pipeline.

### 3.1. Check how many items are in the .json file (there are 3280)
```
docker-compose exec mids bash -c "cat /w205/project2/assessment-attempts-20180128-121051-nested.json | jq '. | length'"
```

### 3.2. Review the .json in a readable format
Execute a bash shell in the mids container and cat the .json file and pipe it through jq so that it's in a readable format
```
docker-compose exec mids bash -c "cat /w205/project2/assessment-attempts-20180128-121051-nested.json | jq '.'"
```

### 3.3. Check the keys in the .json
```
docker-compose exec mids bash -c "cat /w205/project2/assessment-attempts-20180128-121051-nested.json | jq 'keys[]'"
```

### 3.4. Check the first item in the .json
```
docker-compose exec mids bash -c "cat /w205/project2/assessment-attempts-20180128-121051-nested.json | jq '.[0]'"
```

# 4. Publishing messages
The sample data looks good, and we are now ready to start publishing messages in Kafka. In this section we will be publishing messages in Kafka, reading the messages into an RDD in spark, and doing some simple transformation, using the RDD, to ensure that the pipeline works. 

### 4.1. publish messages
Execute a bash shell in the mids container to run a microservice. The cat piped into jq. The -P flag  tells it to publish messages. The -t flag gives it the topic name of commits. The kafka:29092 tells it the container name and the port number where kafka is running.
```
docker-compose exec mids bash -c "cat /w205/project2/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t commits && echo 'Produced 3280 messages.'"
```



# 5. Working with Spark

### 5.1. Open up jupyter notebook 
Opening up jupyter notebook for the spark container.

```
docker-compose exec spark \
  env \
    PYSPARK_DRIVER_PYTHON=jupyter \
    PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' \
  pyspark
```

### 5.2. create symlink in the spark container
Creating symlink so that the jupyter notebook can be saved outside of the spark container

```
docker-compose exec spark bash -c "ln -s /w205/project2"
```

### 5.3. Imports

In [17]:
import json
from pyspark.sql import Row
from pyspark.sql.types import StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, TimestampType
from pyspark.sql import functions as F

### 5.4. Raw Data Load

In [3]:
#load messages from the "commits" topic into a spark df
raw_commits_df = (spark.
                  read.
                  format("kafka").
                  option("kafka.bootstrap.servers", "kafka:29092").
                  option("subscribe","commits").
                  option("startingOffsets", "earliest").
                  option("endingOffsets", "latest").
                  load())

In [13]:
#print schema of the raw commits
raw_commits_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
#cache the dataframe
raw_commits_df.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

### 5.5. Data Transformation
In this section, the raw dataframe will be ongoing a series of transformation so that queries can be against it. Also, a schema is written up instead of inferred so that the data can live under one table instead of multiple. Note that the sequences column will contain nested data.

In [92]:
#cast the "value" column into string since it's in binary
commits_df = raw_commits_df.select(raw_commits_df.value.cast('string'))

In [None]:
#write parquet file to hdfs, make sure to check the parquet file is there
commits_df.write.parquet("/tmp/commits_df")

In [6]:
#nested schema for the data
#to-do in the future - create a recursive function to unroll the schema so that this manual step is not needed

options_schema = ArrayType(StructType([StructField('at', StringType()),
                                       StructField('checked', BooleanType()),
                                       StructField('correct', BooleanType()),
                                       StructField('id', StringType()),
                                       StructField('submitted', IntegerType())]))

questions_schema = ArrayType(StructType([StructField('id', StringType()),
                                         StructField('options', options_schema),
                                         StructField('user_correct', BooleanType()),
                                         StructField('user_incomplete',BooleanType()),
                                         StructField('user_result', StringType()),
                                         StructField('user_submitted', BooleanType())]))

counts_schema = StructType([StructField('all_correct', BooleanType()),
                            StructField('correct', IntegerType()),
                            StructField('incomplete', IntegerType()),
                            StructField('incorrect', IntegerType()),
                            StructField('submitted', IntegerType()),
                            StructField('total', IntegerType()),
                            StructField('unanswered', IntegerType())])

sequences_schema = StructType([StructField('attempt', IntegerType()),
                               StructField('counts', counts_schema),
                               StructField('id', StringType()),
                               StructField('questions', questions_schema)])
                                         
commits_schema = StructType([StructField('base_exam_id', StringType()),
                             StructField('certification', StringType()),
                             StructField('exam_name', StringType()),
                             StructField('keen_created_at', StringType()),
                             StructField('keen_id', StringType()),
                             StructField('keen_timestamp', StringType()),
                             StructField('max_attempts', StringType()),
                             StructField('sequences', sequences_schema),
                             StructField('started_at', StringType()),
                             StructField('user_exam_id', StringType())])

In [7]:
#parse through the json and put it into an rdd
extracted_commits_rdd = commits_df.rdd.map(lambda x: Row(**json.loads(x.value)))

#impose schema that was written above when creating dataframe
extracted_commits_df = spark.createDataFrame(extracted_commits_rdd, commits_schema)

In [18]:
#print schema on the df, to ensure that the nested structure that was imposed looks as expected
extracted_commits_df.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: struct (nullable = true)
 |    |-- attempt: integer (nullable = true)
 |    |-- counts: struct (nullable = true)
 |    |    |-- all_correct: boolean (nullable = true)
 |    |    |-- correct: integer (nullable = true)
 |    |    |-- incomplete: integer (nullable = true)
 |    |    |-- incorrect: integer (nullable = true)
 |    |    |-- submitted: integer (nullable = true)
 |    |    |-- total: integer (nullable = true)
 |    |    |-- unanswered: integer (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- questions: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |

In [19]:
#register temp table so it can be queried
extracted_commits_df.registerTempTable('commits')

### 5.6. Basic Data Exploration
In this section, exploration on the dataset will be done using spark.sql to ensure that the schema that was imposed onto the dataset is correct. Queries against the nested part of the data (i.e. sequences column) in particular is important, as all the data lives in one table as opposed to creating separate table for each of the nested level.

In [20]:
#return query results to a pandas dataframe so it can be displayed nicely
spark.sql('SELECT * FROM commits LIMIT 5').toPandas().head()

#if working in a shell use below
#spark.sql('SELECT * FROM commits LIMIT 5').show()

Unnamed: 0,base_exam_id,certification,exam_name,keen_created_at,keen_id,keen_timestamp,max_attempts,sequences,started_at,user_exam_id
0,37f0a30a-7464-11e6-aa92-a8667f27e5dc,False,Normal Forms and All That Jazz Master Class,1516717442.735266,5a6745820eb8ab00016be1f1,1516717442.735266,1.0,"(1, (False, 2, 1, 1, 4, 4, 0), 5b28a462-7a3b-4...",2018-01-23T14:23:19.082Z,6d4089e4-bde5-4a22-b65f-18bce9ab79c8
1,37f0a30a-7464-11e6-aa92-a8667f27e5dc,False,Normal Forms and All That Jazz Master Class,1516717377.639827,5a674541ab6b0a0001c6e723,1516717377.639827,1.0,"(1, (False, 1, 2, 1, 4, 4, 0), 5b28a462-7a3b-4...",2018-01-23T14:21:47.505Z,2fec1534-b41f-4419-b741-79d372f05cbe
2,4beeac16-bb83-4d58-83e4-26cdc38f0481,False,The Principles of Microservices,1516738973.653394,5a67999d3ed3e300016ef0f1,1516738973.653394,1.0,"(1, (False, 3, 0, 1, 4, 4, 0), b370a3aa-bf9e-4...",2018-01-23T20:22:22.584Z,8edbc8a8-4d26-4292-a5af-ae3f246cb09f
3,4beeac16-bb83-4d58-83e4-26cdc38f0481,False,The Principles of Microservices,1516738921.113742,5a6799694fc7c70001034706,1516738921.113742,1.0,"(1, (False, 2, 2, 0, 4, 4, 0), b370a3aa-bf9e-4...",2018-01-23T20:21:10.833Z,c0ee680e-8892-4e64-a7f2-bb576a665dc5
4,6442707e-7488-11e6-831b-a8667f27e5dc,False,Introduction to Big Data,1516737000.212122,5a6791e824fccd00018c3ff9,1516737000.212122,1.0,"(1, (False, 3, 0, 1, 4, 4, 0), 04a192c1-4f5c-4...",2018-01-23T19:48:42.477Z,e4525b79-7904-4050-a068-27969b01f6bd


In [73]:
#basic queries to explore nested data structure
sql_query = '''
SELECT
    keen_id,
    sequences.id,
    sequences.attempt,
    sequences.counts.*
FROM
    commits
LIMIT
    10
'''

#show query results
spark.sql(sql_query).toPandas().head(10)

#if working in a shell use below
#spark.sql(sql_query).show()

Unnamed: 0,keen_id,id,attempt,all_correct,correct,incomplete,incorrect,submitted,total,unanswered
0,5a6745820eb8ab00016be1f1,5b28a462-7a3b-42e0-b508-09f3906d1703,1,False,2,1,1,4,4,0
1,5a674541ab6b0a0001c6e723,5b28a462-7a3b-42e0-b508-09f3906d1703,1,False,1,2,1,4,4,0
2,5a67999d3ed3e300016ef0f1,b370a3aa-bf9e-4c10-848a-8ecacbd1d93e,1,False,3,0,1,4,4,0
3,5a6799694fc7c70001034706,b370a3aa-bf9e-4c10-848a-8ecacbd1d93e,1,False,2,2,0,4,4,0
4,5a6791e824fccd00018c3ff9,04a192c1-4f5c-4ac1-91df-3f175996af99,1,False,3,0,1,4,4,0
5,5a67a0b6852c2a00018891fa,e7110aed-0d08-4cb3-9eca-3eb7caad0256,1,True,5,0,0,5,5,0
6,5a67b627cc80e60001343664,5251db24-2a6e-4247-ab89-c277fb255405,1,True,1,0,0,1,1,0
7,5a67ac8cb0a5f400017d9919,066b5326-e547-4dab-ad24-ae751f0059d3,1,True,5,0,0,5,5,0
8,5a67a9ba0600870001247a04,8ac691f8-8c1a-4033-b2e2-44e165775992,1,True,4,0,0,4,4,0
9,5a67ac54411aed0001da9129,066b5326-e547-4dab-ad24-ae751f0059d3,1,False,0,1,0,1,5,4


In [74]:
sql_query = '''
SELECT
    keen_id,
    sequences.id,
    sequences.attempt,
    sequences.questions,
    sequences.questions[0].options,
    sequences.questions[0].options.correct
FROM
    commits
LIMIT
    10
'''

#show query results
spark.sql(sql_query).toPandas().head(10)

#if working in a shell use below
#spark.sql(sql_query).show()

Unnamed: 0,keen_id,id,attempt,questions,sequences.questions AS `questions`[0].options,sequences.questions AS `questions`[0].options.correct
0,5a6745820eb8ab00016be1f1,5b28a462-7a3b-42e0-b508-09f3906d1703,1,"[(7a2ed6d3-f492-49b3-b8aa-d080a8aad986, [Row(a...","[(2018-01-23T14:23:24.670Z, True, True, 49c574...","[True, True, True]"
1,5a674541ab6b0a0001c6e723,5b28a462-7a3b-42e0-b508-09f3906d1703,1,"[(95194331-ac43-454e-83de-ea8913067055, [Row(a...","[(None, False, None, 62feee6e-9b76-4123-bd9e-c...","[None, True, None, None]"
2,5a67999d3ed3e300016ef0f1,b370a3aa-bf9e-4c10-848a-8ecacbd1d93e,1,"[(b9ff2e88-cf9d-4bd4-bcea-c2673779425c, [Row(a...","[(2018-01-23T20:22:39.089Z, True, None, c31572...","[None, True, True]"
3,5a6799694fc7c70001034706,b370a3aa-bf9e-4c10-848a-8ecacbd1d93e,1,"[(1f7c5def-904b-4834-8519-639a3b22a496, [Row(a...","[(None, False, None, 429b03ec-bda9-4d4a-bd7f-e...","[None, True, None, True]"
4,5a6791e824fccd00018c3ff9,04a192c1-4f5c-4ac1-91df-3f175996af99,1,"[(620c924f-6bd8-11e6-bcbd-a8667f27e5dc, [Row(a...","[(2018-01-23T19:48:59.018Z, True, None, 5a3ba5...","[None, True, True, None]"
5,5a67a0b6852c2a00018891fa,e7110aed-0d08-4cb3-9eca-3eb7caad0256,1,"[(fb07b16e-84a2-4654-a1d2-eafaabd9b747, [Row(a...","[(None, False, None, c464fd9f-923b-4b68-9eac-f...","[None, True, None]"
6,5a67b627cc80e60001343664,5251db24-2a6e-4247-ab89-c277fb255405,1,"[(247b4589-7f8c-4a46-905c-63e0fa4731af, [Row(a...","[(None, False, None, 9e54b46b-779c-40e6-9fbd-6...","[None, None, True, None]"
7,5a67ac8cb0a5f400017d9919,066b5326-e547-4dab-ad24-ae751f0059d3,1,"[(fc3bdc54-04a8-4b46-976b-43c15bb96e1b, [Row(a...","[(None, False, None, 47363862-3455-4975-ad3c-9...","[None, None, True]"
8,5a67a9ba0600870001247a04,8ac691f8-8c1a-4033-b2e2-44e165775992,1,"[(803fc93f-7eb2-4121-af8c-ff809e850f10, [Row(a...","[(None, False, None, ba4b7f92-44e1-49ff-8eec-c...","[None, None, True, None]"
9,5a67ac54411aed0001da9129,066b5326-e547-4dab-ad24-ae751f0059d3,1,"[(fc3bdc54-04a8-4b46-976b-43c15bb96e1b, [Row(a...","[(None, False, True, 81f2a8cc-8143-4d84-9d1b-4...","[True, None, None]"


### 5.7. Writing out parquet files to HDFS
In this section results of some queries will be written out to a parquet file in hdfs. We will be writing out the transformed data set (i.e. extracted_commits_df) onto parquet so that it can be used used by data scientists for their work. In addition we will be doing some aggregation and writing those results onto parquet files so that they can be used for dashboard and/or reporting purposes by the business intelligence analysts.

In [82]:
#write out the unaggregated transformed commits so that it can be used by data scientists 
extracted_commits_df.write.parquet('tmp/transformed_commits')

In [94]:
sql_query = '''
SELECT
    exam_name,
    COUNT(*) number_of_exams
FROM
    commits
GROUP BY
    exam_name
ORDER BY
    number_of_exams DESC
'''

#write out the query results onto parquet file
agg_exam_count = spark.sql(sql_query)
agg_exam_count.write.parquet('/tmp/agg_exam_count')

#show top 10 rows of the aggregated data
spark.sql(sql_query).toPandas().head(10)

#if working in a shell use below
#spark.sql(sql_query).show()

Unnamed: 0,exam_name,number_of_exams
0,Learning Git,394
1,Introduction to Python,162
2,Introduction to Java 8,158
3,Intermediate Python Programming,158
4,Learning to Program with R,128
5,Introduction to Machine Learning,119
6,Software Architecture Fundamentals Understandi...,109
7,Beginning C# Programming,95
8,Learning Eclipse,85
9,Learning Apache Maven,80


In [95]:
sql_query = '''
SELECT
    exam_name,
    COUNT(*) number_of_exams,
    ROUND(AVG(sequences.counts.incorrect), 2) avg_incorrects,
    ROUND(AVG(sequences.counts.correct), 2) avg_corrects,
    ROUND(AVG(sequences.counts.unanswered), 2) avg_unanswered
FROM
    commits
GROUP BY
    exam_name
ORDER BY
    number_of_exams DESC
'''

#write out the query results onto parquet file
agg_exam_performance = spark.sql(sql_query)
agg_exam_performance.write.parquet('/tmp/agg_exam_performance')

#show exams by count, avg number of incorrects, avg number of corrects
spark.sql(sql_query).toPandas().head(10)

#if working in a shell use below
#spark.sql(sql_query).show()

Unnamed: 0,exam_name,number_of_exams,avg_incorrects,avg_corrects,avg_unanswered
0,Learning Git,394,1.32,3.38,0.3
1,Introduction to Python,162,1.13,2.83,0.26
2,Introduction to Java 8,158,0.56,4.38,0.06
3,Intermediate Python Programming,158,1.46,2.05,0.17
4,Learning to Program with R,128,1.81,3.81,0.18
5,Introduction to Machine Learning,119,0.83,2.75,0.01
6,Software Architecture Fundamentals Understandi...,109,1.41,1.92,0.29
7,Beginning C# Programming,95,1.02,2.22,0.08
8,Learning Eclipse,85,0.81,3.53,0.08
9,Learning Apache Maven,80,1.0,2.44,0.16


# 6. Summary and Closing
We have thus far successfully created the cluster needed for the project work, imported the necessary file for testing, published messages via Kafka, and transformed the messages that was published using Spark, and finally outputting the transformed messages into a parquet file in hdfs. Now we will close out of Spark, tear down the cluster, prior to ending the session.

### 6.1. Check Files
Check that the parquet files that was written out to hdfs.
```
docker-compose exec cloudera hadoop fs -ls /tmp/
docker-compose exec cloudera hadoop fs -ls /tmp/commits_df/
docker-compose exec cloudera hadoop fs -ls /tmp/agg_exam_count/
docker-compose exec cloudera hadoop fs -ls /tmp/agg_exam_performance/
```

### 6.2. exit out of spark/jupyter notebook
```python
exit()
```

### 6.3. Tear down the docker cluster
```
docker-compose down
```

### 6.4. Check that the cluster is no longer running
```
docker ps -a
```

### 6.5. Future Enhancements
In this assignment I manually wrote down the schema for the data structure so that it can be imposed on the datafram, so that everything can be stored within one table. The type of work I did worked for this assignment, but would be a very tedious thing to do, as there may be more than one data source, so creating a function that will create a schema of a nested data structure that can be imposed on the data source would be nice to have as it can automate the work that I did.