# Week 6: Publish and Consume Messages with Kafka

#### Spin up Cluster
```
docker-compose up -d
```
#### Make sure it is Up
```
docker-compose ps
```
#### Start Kafka Logs
```
docker-compose logs -f kafka
```
#### Create Topic
```
docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```
#### Check out the Topic
```
docker-compose exec kafka kafka-topics --describe --topic assessments --zookeeper zookeeper:32181
```
#### Check out Messagess
```
docker-compose exec mids bash -c "cat /w205/project-2-frankbruni/assessment-attempts-20180128-121051-nested.json"
```
    
```
docker-compose exec mids bash -c "cat /w205/project-2-frankbruni/assessment-attempts-20180128-121051-nested.json | jq '.'"
```
    
```
docker-compose exec mids bash -c "cat /w205/project-2-frankbruni/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c"
```
#### Publish Test Messages to Topic Assessments
```
docker-compose exec mids bash -c "cat /w205/project-2-frankbruni/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments && echo 'Produced 100 messages.'"
```
#### Consume Messages
```
docker-compose exec kafka kafka-console-consumer --bootstrap-server kafka:29092 --topic assessments --from-beginning --max-messages 42
```
#### Tear Down Cluster
```
docker-compose down
```

# Week 7: Use Spark to Transform the Messages

#### Spin up Cluster
```
docker-compose up -d
```
#### Make sure it is Up
```
docker-compose ps
```
#### Start Kafka Logs
```
docker-compose logs -f kafka
```
#### Create Topic
```
docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```
#### Check out the Topic
```
docker-compose exec kafka kafka-topics --describe --topic assessments --zookeeper zookeeper:32181
```
#### Check out Messagess
```
docker-compose exec mids bash -c "cat /w205/project-2-frankbruni/assessment-attempts-20180128-121051-nested.json"
```
    
```
docker-compose exec mids bash -c "cat /w205/project-2-frankbruni/assessment-attempts-20180128-121051-nested.json | jq '.'"
```
    
```
docker-compose exec mids bash -c "cat /w205/project-2-frankbruni/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c"
```
#### Publish Test Messages to Topic Assessments
```
docker-compose exec mids bash -c "cat /w205/project-2-frankbruni/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments && echo 'Produced 100 messages.'"
```
#### Run Spark
```
docker-compose exec spark pyspark
```
#### Read from Kafka

In [1]:
messages = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessments").option("startingOffsets", "earliest").option("endingOffsets", "latest").load()

In [2]:
messages.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [3]:
messages.show()

+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|[7B 22 6B 65 65 6...|assessments|        0|     0|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     1|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     2|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     3|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     4|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     5|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     6|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     7|1969-12-31 23:59:...|   

In [4]:
messages_as_strings=messages.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [5]:
messages_as_strings.show()

messages_as_strings.printSchema()

messages_as_strings.count()

+----+--------------------+
| key|               value|
+----+--------------------+
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
|null|{"keen_timestamp"...|
+----+--------------------+
only showing top 20 rows

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



3280

In [6]:
messages_as_strings.select('value').take(1)

[Row(value='{"keen_timestamp":"1516717442.735266","max_attempts":"1.0","started_at":"2018-01-23T14:23:19.082Z","base_exam_id":"37f0a30a-7464-11e6-aa92-a8667f27e5dc","user_exam_id":"6d4089e4-bde5-4a22-b65f-18bce9ab79c8","sequences":{"questions":[{"user_incomplete":true,"user_correct":false,"options":[{"checked":true,"at":"2018-01-23T14:23:24.670Z","id":"49c574b4-5c82-4ffd-9bd1-c3358faf850d","submitted":1,"correct":true},{"checked":true,"at":"2018-01-23T14:23:25.914Z","id":"f2528210-35c3-4320-acf3-9056567ea19f","submitted":1,"correct":true},{"checked":false,"correct":true,"id":"d1bf026f-554f-4543-bdd2-54dcf105b826"}],"user_submitted":true,"id":"7a2ed6d3-f492-49b3-b8aa-d080a8aad986","user_result":"missed_some"},{"user_incomplete":false,"user_correct":false,"options":[{"checked":true,"at":"2018-01-23T14:23:30.116Z","id":"a35d0e80-8c49-415d-b8cb-c21a02627e2b","submitted":1},{"checked":false,"correct":true,"id":"bccd6e2e-2cef-4c72-8bfa-317db0ac48bb"},{"checked":true,"at":"2018-01-23T14:23:41

In [7]:
messages_as_strings.select('value').take(1)[0].value

'{"keen_timestamp":"1516717442.735266","max_attempts":"1.0","started_at":"2018-01-23T14:23:19.082Z","base_exam_id":"37f0a30a-7464-11e6-aa92-a8667f27e5dc","user_exam_id":"6d4089e4-bde5-4a22-b65f-18bce9ab79c8","sequences":{"questions":[{"user_incomplete":true,"user_correct":false,"options":[{"checked":true,"at":"2018-01-23T14:23:24.670Z","id":"49c574b4-5c82-4ffd-9bd1-c3358faf850d","submitted":1,"correct":true},{"checked":true,"at":"2018-01-23T14:23:25.914Z","id":"f2528210-35c3-4320-acf3-9056567ea19f","submitted":1,"correct":true},{"checked":false,"correct":true,"id":"d1bf026f-554f-4543-bdd2-54dcf105b826"}],"user_submitted":true,"id":"7a2ed6d3-f492-49b3-b8aa-d080a8aad986","user_result":"missed_some"},{"user_incomplete":false,"user_correct":false,"options":[{"checked":true,"at":"2018-01-23T14:23:30.116Z","id":"a35d0e80-8c49-415d-b8cb-c21a02627e2b","submitted":1},{"checked":false,"correct":true,"id":"bccd6e2e-2cef-4c72-8bfa-317db0ac48bb"},{"checked":true,"at":"2018-01-23T14:23:41.791Z","id"

In [8]:
import json

first_message=json.loads(messages_as_strings.select('value').take(1)[0].value)

first_message

{'base_exam_id': '37f0a30a-7464-11e6-aa92-a8667f27e5dc',
 'certification': 'false',
 'exam_name': 'Normal Forms and All That Jazz Master Class',
 'keen_created_at': '1516717442.735266',
 'keen_id': '5a6745820eb8ab00016be1f1',
 'keen_timestamp': '1516717442.735266',
 'max_attempts': '1.0',
 'sequences': {'attempt': 1,
  'counts': {'all_correct': False,
   'correct': 2,
   'incomplete': 1,
   'incorrect': 1,
   'submitted': 4,
   'total': 4,
   'unanswered': 0},
  'id': '5b28a462-7a3b-42e0-b508-09f3906d1703',
  'questions': [{'id': '7a2ed6d3-f492-49b3-b8aa-d080a8aad986',
    'options': [{'at': '2018-01-23T14:23:24.670Z',
      'checked': True,
      'correct': True,
      'id': '49c574b4-5c82-4ffd-9bd1-c3358faf850d',
      'submitted': 1},
     {'at': '2018-01-23T14:23:25.914Z',
      'checked': True,
      'correct': True,
      'id': 'f2528210-35c3-4320-acf3-9056567ea19f',
      'submitted': 1},
     {'checked': False,
      'correct': True,
      'id': 'd1bf026f-554f-4543-bdd2-54dcf10

#### Exit Pyspark
    `exit()`
#### Tear Down Cluster
    `docker-compose down`

# Week 8: Use Spark to Transform the Messages and Land them in HDFS

#### Spin up Cluster
```
docker-compose up -d
```
#### Make sure it is Up
```
docker-compose ps
```
#### Start Kafka Logs
```
docker-compose logs -f kafka
```
#### Create Topic
```
docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```
#### Check out the Topic
```
docker-compose exec kafka kafka-topics --describe --topic assessments --zookeeper zookeeper:32181
```
#### Publish to Kafka topic assessments
```
docker-compose exec mids bash -c "cat /w205/project-2-frankbruni/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments"
```
#### Exec a Bash Shell into Spark Container
```
docker-compose exec spark bash
```
#### Symbolic Link from Spark to W205 Directory
```
ln -s /w205 w205
```
#### Exit Container
```
exit
```
#### Start Jupyter Notebook with Pyspark Kernel
```
docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark
```

## Explore the Json Data and Answer:
 - What were the top 3 <b>most</b> taken course?
 - What were the top 3 <b>least</b> taken courses?
 - What is the <b>average score</b> on each test?

In [9]:
import json
from pyspark.sql import Row
raw_assessments = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessments").option("startingOffsets", "earliest").option("endingOffsets", "latest").load()

In [10]:
raw_assessments.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [11]:
raw_assessments.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [12]:
assessments = raw_assessments.select(raw_assessments.value.cast('string'))

In [13]:
extracted_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()

extracted_assessments.registerTempTable('assessments')

extracted_assessments.write.mode('overwrite').parquet("/tmp/assessments")

In [14]:
spark.sql("select keen_id from assessments limit 10").show()

+--------------------+
|             keen_id|
+--------------------+
|5a6745820eb8ab000...|
|5a674541ab6b0a000...|
|5a67999d3ed3e3000...|
|5a6799694fc7c7000...|
|5a6791e824fccd000...|
|5a67a0b6852c2a000...|
|5a67b627cc80e6000...|
|5a67ac8cb0a5f4000...|
|5a67a9ba060087000...|
|5a67ac54411aed000...|
+--------------------+



Now that we have the data imported we can begin unrolling the json data. I'll first take a look at the exam names and explore how often each exam was taken

In [15]:
def my_lambda_sequences_id(x):
    raw_dict = json.loads(x.value)
    my_dict = {"base_exam_id" : raw_dict["base_exam_id"],
               "exam_name" : raw_dict["exam_name"],
               "keen_id" : raw_dict["keen_id"]}
    return Row(**my_dict)

my_sequences = assessments.rdd.map(my_lambda_sequences_id).toDF()

my_sequences.registerTempTable('sequences')

my_sequences.write.mode('overwrite').parquet("/tmp/my_sequences")

In [16]:
spark.sql("select * from sequences limit 10").show()

+--------------------+--------------------+--------------------+
|        base_exam_id|           exam_name|             keen_id|
+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|Normal Forms and ...|5a6745820eb8ab000...|
|37f0a30a-7464-11e...|Normal Forms and ...|5a674541ab6b0a000...|
|4beeac16-bb83-4d5...|The Principles of...|5a67999d3ed3e3000...|
|4beeac16-bb83-4d5...|The Principles of...|5a6799694fc7c7000...|
|6442707e-7488-11e...|Introduction to B...|5a6791e824fccd000...|
|8b4488de-43a5-4ff...|        Learning Git|5a67a0b6852c2a000...|
|e1f07fac-5566-4fd...|Git Fundamentals ...|5a67b627cc80e6000...|
|7e2e0b53-a7ba-458...|Introduction to P...|5a67ac8cb0a5f4000...|
|1a233da8-e6e5-48a...|Intermediate Pyth...|5a67a9ba060087000...|
|7e2e0b53-a7ba-458...|Introduction to P...|5a67ac54411aed000...|
+--------------------+--------------------+--------------------+



In [17]:
spark.sql("select exam_name , count(*) as num from sequences group by exam_name order by num desc limit 10").show()

+--------------------+---+
|           exam_name|num|
+--------------------+---+
|        Learning Git|394|
|Introduction to P...|162|
|Introduction to J...|158|
|Intermediate Pyth...|158|
|Learning to Progr...|128|
|Introduction to M...|119|
|Software Architec...|109|
|Beginning C# Prog...| 95|
|    Learning Eclipse| 85|
|Learning Apache M...| 80|
+--------------------+---+



Learning Git was taken 394 times, Introduction to Python was taken 162 times, and Introduction to Java was taken 158 times.

In [18]:
spark.sql("select exam_name , count(*) as num from sequences group by exam_name order by num limit 10").show()

+--------------------+---+
|           exam_name|num|
+--------------------+---+
|Nulls, Three-valu...|  1|
|Native Web Apps f...|  1|
|Learning to Visua...|  1|
|Operating Red Hat...|  1|
|Client-Side Data ...|  2|
|Hibernate and JPA...|  2|
|Learning Spring P...|  2|
|Arduino Prototypi...|  2|
|The Closed World ...|  2|
|Understanding the...|  2|
+--------------------+---+



The least taken classes were Nulls, Native Web Apps, Learning to Visualize, and Operating Red Hat all with only one time taken.

In [20]:
def my_lambda_correct_total(x):
    
    raw_dict = json.loads(x.value)
    my_list = []
    
    if "sequences" in raw_dict:
        
        if "counts" in raw_dict["sequences"]:
            
            if "correct" in raw_dict["sequences"]["counts"] and "total" in raw_dict["sequences"]["counts"]:
                    
                my_dict = {"correct": raw_dict["sequences"]["counts"]["correct"], 
                           "total": raw_dict["sequences"]["counts"]["total"],
                          "score" : raw_dict["sequences"]["counts"]["correct"]/raw_dict["sequences"]["counts"]["total"],
                          "keen_id" : raw_dict["keen_id"]}
                my_list.append(Row(**my_dict))
    
    return my_list

In [21]:
my_correct_total = assessments.rdd.flatMap(my_lambda_correct_total).toDF()

my_correct_total.registerTempTable('ct')

my_correct_total.write.mode('overwrite').parquet("/tmp/my_correct_total")

spark.sql("select keen_id, correct, total, score from ct limit 10").show()

spark.sql("select avg(correct / total)*100 as avg_score from ct limit 10").show()

spark.sql("select stddev(correct / total) as standard_deviation from ct limit 10").show()

+--------------------+-------+-----+-----+
|             keen_id|correct|total|score|
+--------------------+-------+-----+-----+
|5a6745820eb8ab000...|      2|    4|  0.5|
|5a674541ab6b0a000...|      1|    4| 0.25|
|5a67999d3ed3e3000...|      3|    4| 0.75|
|5a6799694fc7c7000...|      2|    4|  0.5|
|5a6791e824fccd000...|      3|    4| 0.75|
|5a67a0b6852c2a000...|      5|    5|  1.0|
|5a67b627cc80e6000...|      1|    1|  1.0|
|5a67ac8cb0a5f4000...|      5|    5|  1.0|
|5a67a9ba060087000...|      4|    4|  1.0|
|5a67ac54411aed000...|      0|    5|  0.0|
+--------------------+-------+-----+-----+

+-----------------+
|        avg_score|
+-----------------+
|62.65699745547047|
+-----------------+

+-------------------+
| standard_deviation|
+-------------------+
|0.31086692286170553|
+-------------------+



In [23]:
spark.sql("CREATE TABLE test_score AS select a.exam_name , b.score from sequences as a,ct as b where a.keen_id=b.keen_id")

DataFrame[]

In [32]:
spark.sql("select exam_name , AVG(score) as avg_score, count(*) as num from test_score group by exam_name order by num desc limit 10").show()

+--------------------+------------------+---+
|           exam_name|         avg_score|num|
+--------------------+------------------+---+
|        Learning Git|0.6827586206896553|406|
|Introduction to P...|0.5666666666666665|162|
|Intermediate Pyth...|0.5092592592592593|162|
|Introduction to J...|0.8759493670886075|158|
|Beginning C# Prog...|0.5629770992366412|131|
|Learning to Progr...| 0.544642857142857|128|
|Introduction to M...|0.6869747899159664|119|
|Software Architec...|0.4793577981651376|109|
|    Learning Eclipse|0.7058823529411765| 85|
|Introduction to B...|0.6450617283950617| 81|
+--------------------+------------------+---+



Here we have the average score for each test ordered by the most popular test.