In [116]:
import json
from pyspark.sql import Row
from pyspark.sql import functions as F 

#### Read assessments data from kafka into a pyspark dataframe

In [2]:
raw_assessments = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessments").option("startingOffsets", "earliest").option("endingOffsets", "latest").load() 

In [3]:
type(raw_assessments)

pyspark.sql.dataframe.DataFrame

#### Cache the dataframe

In [4]:
raw_assessments.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

#### Check count to make sure data was written properly

In [5]:
raw_assessments.count()

3280

#### Cast to strings

In [6]:
assessments = raw_assessments.select(raw_assessments.value.cast('string'))

#### Extract json fields

In [7]:
extracted_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()


In [8]:
type(extracted_assessments)

pyspark.sql.dataframe.DataFrame

Take a look at the new dataframe

In [9]:
extracted_assessments.show()

+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|Map(questions -> ...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|Map(questions -> ...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false

#### Still a mess, but less so

Let's look at the schema:

In [10]:
extracted_assessments.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



#### Only sequences is really nested

#### Create a temp table to begin to unnest

In [11]:
extracted_assessments.registerTempTable('assessments')

#### Look at unnested columns first

In [12]:
spark.sql("select base_exam_id, certification, exam_name, keen_created_at from assessments limit 5").show()

+--------------------+-------------+--------------------+------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|
+--------------------+-------------+--------------------+------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|
|4beeac16-bb83-4d5...|        false|The Principles of...| 1516738973.653394|
|4beeac16-bb83-4d5...|        false|The Principles of...|1516738921.1137421|
|6442707e-7488-11e...|        false|Introduction to B...| 1516737000.212122|
+--------------------+-------------+--------------------+------------------+



In [13]:
spark.sql("select keen_id, keen_timestamp, max_attempts, started_at, user_exam_id from assessments limit 5").show()

+--------------------+------------------+------------+--------------------+--------------------+
|             keen_id|    keen_timestamp|max_attempts|          started_at|        user_exam_id|
+--------------------+------------------+------------+--------------------+--------------------+
|5a6745820eb8ab000...| 1516717442.735266|         1.0|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|5a674541ab6b0a000...| 1516717377.639827|         1.0|2018-01-23T14:21:...|2fec1534-b41f-441...|
|5a67999d3ed3e3000...| 1516738973.653394|         1.0|2018-01-23T20:22:...|8edbc8a8-4d26-429...|
|5a6799694fc7c7000...|1516738921.1137421|         1.0|2018-01-23T20:21:...|c0ee680e-8892-4e6...|
|5a6791e824fccd000...| 1516737000.212122|         1.0|2018-01-23T19:48:...|e4525b79-7904-405...|
+--------------------+------------------+------------+--------------------+--------------------+



In [32]:
spark.sql("select sequences from assessments limit 5").show()

+--------------------+
|           sequences|
+--------------------+
|Map(questions -> ...|
|Map(questions -> ...|
|Map(questions -> ...|
|Map(questions -> ...|
|Map(questions -> ...|
+--------------------+



There's no columns, at face value, I'd throw out. I could see how they could all have analytical value.

In [14]:
sequences_df = spark.sql("select sequences from assessments")

In [15]:
sequences_df.select('sequences').take(1)

[Row(sequences={'questions': [{'options': None, 'user_correct': False, 'user_incomplete': True, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': False, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': True, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': True, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}], 'id': None, 'attempt': None, 'counts': None})]

In [16]:
sequences_df.select('sequences').take(1)[0]

Row(sequences={'questions': [{'options': None, 'user_correct': False, 'user_incomplete': True, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': False, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': True, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': True, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}], 'id': None, 'attempt': None, 'counts': None})

In [19]:
sequences_df.select('sequences').take(1)[0][0]

{'attempt': None,
 'counts': None,
 'id': None,
 'questions': [{'id': None,
   'options': None,
   'user_correct': False,
   'user_incomplete': True,
   'user_result': None,
   'user_submitted': True},
  {'id': None,
   'options': None,
   'user_correct': False,
   'user_incomplete': False,
   'user_result': None,
   'user_submitted': True},
  {'id': None,
   'options': None,
   'user_correct': True,
   'user_incomplete': False,
   'user_result': None,
   'user_submitted': True},
  {'id': None,
   'options': None,
   'user_correct': True,
   'user_incomplete': False,
   'user_result': None,
   'user_submitted': True}]}

In [20]:
sequences_df.select('sequences').take(1)[0][0]['questions']

[{'id': None,
  'options': None,
  'user_correct': False,
  'user_incomplete': True,
  'user_result': None,
  'user_submitted': True},
 {'id': None,
  'options': None,
  'user_correct': False,
  'user_incomplete': False,
  'user_result': None,
  'user_submitted': True},
 {'id': None,
  'options': None,
  'user_correct': True,
  'user_incomplete': False,
  'user_result': None,
  'user_submitted': True},
 {'id': None,
  'options': None,
  'user_correct': True,
  'user_incomplete': False,
  'user_result': None,
  'user_submitted': True}]

In [21]:
sequences_df.select('sequences').take(1)[0][0]['questions'][0]

{'id': None,
 'options': None,
 'user_correct': False,
 'user_incomplete': True,
 'user_result': None,
 'user_submitted': True}

In [22]:
sequences_df.select('sequences').take(1)[0][0]['questions'][0]['user_incomplete']

True

In [26]:
assessments_df = spark.read.json(assessments.rdd.map(lambda x: x.value))

In [None]:
temp_df = .rdd.map(lambda x: Row(**json.loads(x.value))).toDF()

In [27]:
assessments_df.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: struct (nullable = true)
 |    |-- attempt: long (nullable = true)
 |    |-- counts: struct (nullable = true)
 |    |    |-- all_correct: boolean (nullable = true)
 |    |    |-- correct: long (nullable = true)
 |    |    |-- incomplete: long (nullable = true)
 |    |    |-- incorrect: long (nullable = true)
 |    |    |-- submitted: long (nullable = true)
 |    |    |-- total: long (nullable = true)
 |    |    |-- unanswered: long (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- questions: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- options: arra

In [52]:
assessments_df.select('sequences').take(1)[0][0]['questions'][0]['user_incomplete']

True

In [123]:
assessments_df.select('sequences.questions').show()

+--------------------+
|           questions|
+--------------------+
|[[7a2ed6d3-f492-4...|
|[[95194331-ac43-4...|
|[[b9ff2e88-cf9d-4...|
|[[1f7c5def-904b-4...|
|[[620c924f-6bd8-1...|
|[[fb07b16e-84a2-4...|
|[[247b4589-7f8c-4...|
|[[fc3bdc54-04a8-4...|
|[[803fc93f-7eb2-4...|
|[[fc3bdc54-04a8-4...|
|[[a6effaf7-94ba-4...|
|[[247b4589-7f8c-4...|
|[[0d12c14d-1abe-4...|
|[[26ddad33-aa1d-4...|
|[[7bdbbf4a-b5d8-4...|
|[[d2ac7f0d-82bd-4...|
|[[59d444b5-49fd-4...|
|[[e272a3d1-bd67-4...|
|[[dee14932-a24e-4...|
|[[861c3405-83fc-4...|
+--------------------+
only showing top 20 rows



In [124]:
assessments_df.select('sequences.counts.total', 'sequences.counts.correct').show()

+-----+-------+
|total|correct|
+-----+-------+
|    4|      2|
|    4|      1|
|    4|      3|
|    4|      2|
|    4|      3|
|    5|      5|
|    1|      1|
|    5|      5|
|    4|      4|
|    5|      0|
|    4|      3|
|    1|      1|
|    6|      4|
|    6|      4|
|    5|      4|
|    4|      3|
|    4|      3|
|    4|      4|
|    4|      2|
|    6|      6|
+-----+-------+
only showing top 20 rows



In [49]:
spark.sql("select sequences.questions[0].counts from assessments limit 10").show()

+----------------------------------------------+
|sequences[questions] AS `questions`[0][counts]|
+----------------------------------------------+
|                                          null|
|                                          null|
|                                          null|
|                                          null|
|                                          null|
|                                          null|
|                                          null|
|                                          null|
|                                          null|
|                                          null|
+----------------------------------------------+



In [34]:
spark.sql("select sequences.questions[0] from assessments limit 10").show()

+--------------------------------------+
|sequences[questions] AS `questions`[0]|
+--------------------------------------+
|                  Map(user_incomple...|
|                  Map(user_incomple...|
|                  Map(user_incomple...|
|                  Map(user_incomple...|
|                  Map(user_incomple...|
|                  Map(user_incomple...|
|                  Map(user_incomple...|
|                  Map(user_incomple...|
|                  Map(user_incomple...|
|                  Map(user_incomple...|
+--------------------------------------+



In [None]:
sequences_df.select('sequences')

In [None]:
sequences_df[0:0]

In [23]:
assessments.rdd.map(lambda x: json.loads(x.extracted_assessments.sequences)).toDF().show()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 14, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark-2.2.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/types.py", line 1499, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'extracted_assessments' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/spark-2.2.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/spark-2.2.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/spark-2.2.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/spark-2.2.0-bin-hadoop2.6/python/pyspark/rdd.py", line 1339, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-23-614e436598e5>", line 1, in <lambda>
  File "/spark-2.2.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/types.py", line 1504, in __getattr__
    raise AttributeError(item)
AttributeError: extracted_assessments

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:446)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark-2.2.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/types.py", line 1499, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'extracted_assessments' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/spark-2.2.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/spark-2.2.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/spark-2.2.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/spark-2.2.0-bin-hadoop2.6/python/pyspark/rdd.py", line 1339, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-23-614e436598e5>", line 1, in <lambda>
  File "/spark-2.2.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/types.py", line 1504, in __getattr__
    raise AttributeError(item)
AttributeError: extracted_assessments

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
