In [1]:
import pandas as pd
import numpy as np
import json
import pprint
import datetime as dt

### Questions
1. How many assesstments are in the dataset?
2. What's the name of your Kafka topic? How did you come up with that name?
3. How many people took *Learning Git*?
4. What is the least common course taken? And the most common?
5. Add any query(ies) you think will help the data science team

### Steps for Running the Project

#### 1. Setup Docker Compose, Get Data and Explore Data Structure:

- **Create a 'docker-compose.yml' file** including the following services: zookeeper, kafka, cloudera, spark, mids.  Please see details for yml file in **"Structure for docker-compose.yml file"** below.
  
- **Get JSON file with information**.  The name of the file is "assessment-attempts-20180128-121051-nested.json" - using the following command:

```
curl -L -o assessment-attempts-20180128-121051-nested.json https://goo.gl/ME6hjp`
```

- **Explore the JSON file with JQ** to get an idea of the structure - the following command was used to understand the fields included in the JSON file:

```
cat assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | head -1 | jq .
```

This indicates that there are 10 fields in the main level of the JSON file: (1) 'keen_timestamp', (2) 'max_attempts', (3) 'started_at', (4) 'base_exam_id', (5) 'user_exam_id', (6) 'sequences', (7) 'keen_created_at', (8) 'certification', (9) 'keen_id', (10) 'exam_name'.  
In addition, the field "sequences" include several nested levels.  Details are presented in section **"Top Line Structure of JSON File"** included later in this report.  


#### 2. Run Docker Compose, Define Kafka Topic and Produce Data to Kafka Topic
- **Once the docker-compose.yml file has been created, perform a pull to update the services and images:**

```
docker-compose pull
```

- **Run docker compose:**

```
docker-compose up -d
```

- **Check logs for kafka (use CTRL-C to stop):**

```
docker-compose logs -f kafka
```

- **Create a topic for Kafka.** The name of the topic is "examtaken" as the dataset relates to exams taken for courses from November 2017 to January 2018.  The topic was created with the following command:  

```
docker-compose exec kafka kafka-topics --create --topic examtaken --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```
  
- **Check for kafka topic:**

```
docker-compose exec kafka kafka-topics --describe --topic examtaken --zookeeper zookeeper:32181
```

- **Use kafkacat to produce test messages to the `examtaken` topic:**   

```
docker-compose exec mids bash -c "cat /w205/project-2-cmorenoUCB2021/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t examtaken"
```

#### 3. Use "pyspark" to read raw data from kafka topic:

- **Run pyspark process using the `spark` container:**

```
docker-compose exec spark pyspark
```

- **Import libraries to work with file:**  
```
import sys
import json
from pyspark.sql import Row
```  
  
  
- **Read from kafka (using the topic 'examtaken':**

```
exams = park.read.format("kafka").option("kafka.bootstrap.servers","kafka:29092").option("subscribe",“examtaken").option("startingOffsets","earliest").option("endingOffsets", "latest").load()
```

- **Cache this to cut back on warnings:**

```
exams.cache()
```

- **printSchema of what was read:** 

```
exams.printSchema()
```

- **Ensure UTF8 decoder is used:**

```
sys.stdout = open(sys.stdout.fileno(), mode='w', encoding='utf8', buffering=1)
```

- **Read the `value`s as strings, and show them:**
```
examsall = exams.rdd.map(lambda x: json.loads(x.value)).toDF()
examsall.show



#### 4. Use SparkSQL to explore data and answer business questions:

- **Create a Spark "TempTable" (or "View")**

```
examsall.registerTempTable('examview')
```

- **Use SparkSQL to answer business questions:**
  
  
*1. How many assesstments are in the dataset?:*

```
spark.sql("select count(exam_name) from examview").show()
```
```
+----------------+
|count(exam_name)|
+----------------+
|            3280|
+----------------+
```

*3. How many people took *Learning Git*?*  

```
spark.sql("select count(exam_name) from examview WHERE  exam_name = 'Learning Git' ").show()
```
```
+----------------+
|count(exam_name)|
+----------------+
|             394|
+----------------+
```

*4. What is the least common course taken?*
```
spark.sql("select exam_name, count(keen_id) as num_exam FROM examview GROUP BY exam_name ORDER BY num_exam limit 5").show()
```
- The 10 least common courses are shown in the table below:

```
+--------------------+--------+                                                 
|           exam_name|num_exam|
+--------------------+--------+
|Nulls, Three-valu...|       1|
|Native Web Apps f...|       1|
|Learning to Visua...|       1|
|Operating Red Hat...|       1|
|Understanding the...|       2|
+--------------------+--------+
```


*5. What is the most common course taken?*
```
spark.sql("select exam_name, count(keen_id) as num_exam FROM examview GROUP BY exam_name ORDER BY num_exam DESC limit 5").show()
```
- The top ten couses taken are presented in the following table:

```
+--------------------+--------+                                                 
|           exam_name|num_exam|
+--------------------+--------+
|        Learning Git|     394|
|Introduction to P...|     162|
|Intermediate Pyth...|     158|
|Introduction to J...|     158|
|Learning to Progr...|     128|
+--------------------+--------+
```

### Structure for 'docker-compose.yml' file:

```
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
    expose:
      - "2181"
      - "2888"
      - "32181"
      - "3888"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    expose:
      - "9092"
      - "29092"

  cloudera:
    image: midsw205/cdh-minimal:latest
    expose:
      - "8020" # nn
      - "50070" # nn http
      - "8888" # hue
    #ports:
    #- "8888:8888"

  spark:
    image: midsw205/spark-python:0.0.5
    stdin_open: true
    tty: true
    volumes:
      - ~/w205:/w205
    command: bash
    depends_on:
      - cloudera
    environment:
      HADOOP_NAMENODE: cloudera

  mids:
    image: midsw205/base:latest
    stdin_open: true
    tty: true
    volumes:
      - ~/w205:/w205
```

### Top Line Structure of JSON File:

```
{
  "keen_timestamp": "1516717442.735266",
  "max_attempts": "1.0",
  "started_at": "2018-01-23T14:23:19.082Z",
  "base_exam_id": "37f0a30a-7464-11e6-aa92-a8667f27e5dc",
  "user_exam_id": "6d4089e4-bde5-4a22-b65f-18bce9ab79c8",
  "sequences": {
    "questions": [
      {
        "user_incomplete": true,
        "user_correct": false,
        "options": [
          {
            "checked": true,
            "at": "2018-01-23T14:23:24.670Z",
            "id": "49c574b4-5c82-4ffd-9bd1-c3358faf850d",
            "submitted": 1,
            "correct": true
          },
          {
            "checked": true,
            "at": "2018-01-23T14:23:25.914Z",
            "id": "f2528210-35c3-4320-acf3-9056567ea19f",
            "submitted": 1,
            "correct": true
          },
          {
            "checked": false,
            "correct": true,
            "id": "d1bf026f-554f-4543-bdd2-54dcf105b826"
          }
        ],
        "user_submitted": true,
        "id": "7a2ed6d3-f492-49b3-b8aa-d080a8aad986",
        "user_result": "missed_some"
      },
      {
        "user_incomplete": false,
        "user_correct": false,
        "options": [
          {
            "checked": true,
            "at": "2018-01-23T14:23:30.116Z",
            "id": "a35d0e80-8c49-415d-b8cb-c21a02627e2b",
            "submitted": 1
          },
          {
            "checked": false,
            "correct": true,
            "id": "bccd6e2e-2cef-4c72-8bfa-317db0ac48bb"
          },
          {
            "checked": true,
            "at": "2018-01-23T14:23:41.791Z",
            "id": "7e0b639a-2ef8-4604-b7eb-5018bd81a91b",
            "submitted": 1,
            "correct": true
          }
        ],
        "user_submitted": true,
        "id": "bbed4358-999d-4462-9596-bad5173a6ecb",
        "user_result": "incorrect"
      },
      {
        "user_incomplete": false,
        "user_correct": true,
        "options": [
          {
            "checked": false,
            "at": "2018-01-23T14:23:52.510Z",
            "id": "a9333679-de9d-41ff-bb3d-b239d6b95732"
          },
          {
            "checked": false,
            "id": "85795acc-b4b1-4510-bd6e-41648a3553c9"
          },
          {
            "checked": true,
            "at": "2018-01-23T14:23:54.223Z",
            "id": "c185ecdb-48fb-4edb-ae4e-0204ac7a0909",
            "submitted": 1,
            "correct": true
          },
          {
            "checked": true,
            "at": "2018-01-23T14:23:53.862Z",
            "id": "77a66c83-d001-45cd-9a5a-6bba8eb7389e",
            "submitted": 1,
            "correct": true
          }
        ],
        "user_submitted": true,
        "id": "e6ad8644-96b1-4617-b37b-a263dded202c",
        "user_result": "correct"
      },
      {
        "user_incomplete": false,
        "user_correct": true,
        "options": [
          {
            "checked": false,
            "id": "59b9fc4b-f239-4850-b1f9-912d1fd3ca13"
          },
          {
            "checked": false,
            "id": "2c29e8e8-d4a8-406e-9cdf-de28ec5890fe"
          },
          {
            "checked": false,
            "id": "62feee6e-9b76-4123-bd9e-c0b35126b1f1"
          },
          {
            "checked": true,
            "at": "2018-01-23T14:24:00.807Z",
            "id": "7f13df9c-fcbe-4424-914f-2206f106765c",
            "submitted": 1,
            "correct": true
          }
        ],
        "user_submitted": true,
        "id": "95194331-ac43-454e-83de-ea8913067055",
        "user_result": "correct"
      }
    ],
    "attempt": 1,
    "id": "5b28a462-7a3b-42e0-b508-09f3906d1703",
    "counts": {
      "incomplete": 1,
      "submitted": 4,
      "incorrect": 1,
      "all_correct": false,
      "correct": 2,
      "total": 4,
      "unanswered": 0
    }
  },
  "keen_created_at": "1516717442.735266",
  "certification": "false",
  "keen_id": "5a6745820eb8ab00016be1f1",
  "exam_name": "Normal Forms and All That Jazz Master Class"
```