# Five Spark SQL Utility Functions to Extract and Explore Complex Data Types

**This notebook is PySpark version of [this notebook](https://docs.databricks.com/_static/notebooks/complex-nested-structured.html?_gl=1*12k1b7v*_gcl_aw*R0NMLjE2MTMzODg5MjAuQ2owS0NRaUExS2lCQmhDY0FSSXNBUFdxb1NvWUkxN1JrcHNJZU9odElCMVhXaHdZbml6cDRmc0RTMEJTRkJ6YlRNY3hjd0gxTkZBT2lqb2FBaTcwRUFMd193Y0I.&_ga=2.41186337.2035966158.1617098598-1848054323.1612919861&_gac=1.19439050.1613388971.Cj0KCQiA1KiBBhCcARIsAPWqoSoYI17RkpsIeOhtIB1XWhwYnizp4fsDS0BSFBzbTMcxcwH1NFAOijoaAi70EALw_wcB). I was trying PySpark 3.0 on my local machine and that's when I found the Databricks notebook. I tried to have it in Python.**

##### Links
- [Five Spark SQL Helper Utility Functions to Extract and Explore Complex Data Types](https://databricks.com/blog/2017/06/13/five-spark-sql-utility-functions-extract-explore-complex-data-types.html)
- [Notebook](https://docs.databricks.com/_static/notebooks/complex-nested-structured.html?_gl=1*12k1b7v*_gcl_aw*R0NMLjE2MTMzODg5MjAuQ2owS0NRaUExS2lCQmhDY0FSSXNBUFdxb1NvWUkxN1JrcHNJZU9odElCMVhXaHdZbml6cDRmc0RTMEJTRkJ6YlRNY3hjd0gxTkZBT2lqb2FBaTcwRUFMd193Y0I.&_ga=2.41186337.2035966158.1617098598-1848054323.1612919861&_gac=1.19439050.1613388971.Cj0KCQiA1KiBBhCcARIsAPWqoSoYI17RkpsIeOhtIB1XWhwYnizp4fsDS0BSFBzbTMcxcwH1NFAOijoaAi70EALw_wcB)
- [Working with Nested Data Using Higher Order Functions in SQL on Databricks](https://databricks.com/blog/2017/05/24/working-with-nested-data-using-higher-order-functions-in-sql-on-databricks.html)
- [Working with Complex Data Formats with Structured Streaming in Apache Spark 2.1](https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html)

## Overview

While this in-depth blog lays out and explains the concepts and motivations for processing and handling complex data types and formats, this notebook example examines how you can apply them, with a few concrete examples, for data types that you might encounter in your use cases. This short notebook tutorial shows ways in which you can explore and employ a number of new helper Spark SQL utility functions and APIs as part of org.apache.spark.sql.functions package. In particular, they come in handy while doing Streaming ETL, in which data are JSON objects with complex and nested structures: Map and Structs embedded as JSON:

- `get_json_object()`
- `from_json()`
- `to_json()`
- `explode()`
- `selectExpr()`

The takeaway from this short tutorial is myriad ways to slice and dice nested JSON structures with Spark SQL utility functions.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, column
from pyspark.sql.types import StructField, StructType, StringType, LongType, TimestampType, MapType, DoubleType, BooleanType
import pandas as pd

In [2]:
pd.set_option('display.max_columns', None)  
pd.set_option('display.expand_frame_repr', False)
pd.set_option('max_colwidth', None)

In [3]:
spark = (SparkSession
        .builder
        .appName("Handling-Complex-Data-Types")
        .getOrCreate()
        )

Let's create a simple JSON schema with attributes and values, without any nested structures.

In [4]:
jsonSchema = (
    StructType()
        .add("battery_level", LongType())
        .add("c02_level", LongType())
        .add("cca3",StringType())
        .add("cn", StringType())
        .add("device_id", LongType())
        .add("device_type", StringType())
        .add("signal", LongType())
        .add("ip", StringType())
        .add("temp", LongType())
        .add("timestamp", TimestampType())
)

Create sample data representing device events. In all likelihood, this JSON might as well be a stream of device events read off a Kafka topic. Note that data has two fields: integer (as a device id) and a string (as a JSON string representing device events).

In [5]:
schema = "`id` INT, `device` STRING"

In [6]:
data = [
    [0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""],
    [1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""],
    [2, """{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }"""],
    [3, """{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }"""], 
    [4, """{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3": "PHL", "cn": "Philippines", "temp": 29, "signal": 11, "battery_level": 0, "c02_level": 983, "timestamp" :1475600504 }"""],
    [5, """{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": "USA", "cn": "United States", "temp": 50, "signal": 16, "battery_level": 8, "c02_level": 1574, "timestamp" :1475600506 }"""],
    [6, """{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": "CHN", "cn": "China", "temp": 21, "signal": 18, "battery_level": 9, "c02_level": 1249, "timestamp" :1475600508 }"""],
    [7, """{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": "JPN", "cn": "Japan", "temp": 27, "signal": 15, "battery_level": 0, "c02_level": 1531, "timestamp" :1475600512 }"""],
    [8, """ {"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": "USA", "cn": "United States", "temp": 40, "signal": 16, "battery_level": 9, "c02_level": 1208, "timestamp" :1475600514 }"""],
    [9, """{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": "ITA", "cn": "Italy", "temp": 19, "signal": 11, "battery_level": 0, "c02_level": 1171, "timestamp" :1475600516 }"""],
    [10, """{"device_id": 10, "device_type": "sensor-igauge", "ip": "68.28.91.22", "cca3": "USA", "cn": "United States", "temp": 32, "signal": 26, "battery_level": 7, "c02_level": 886, "timestamp" :1475600518 }"""],
    [11, """{"device_id": 11, "device_type": "sensor-ipad", "ip": "59.144.114.250", "cca3": "IND", "cn": "India", "temp": 46, "signal": 25, "battery_level": 4, "c02_level": 863, "timestamp" :1475600520 }"""],
    [12, """{"device_id": 12, "device_type": "sensor-igauge", "ip": "193.156.90.200", "cca3": "NOR", "cn": "Norway", "temp": 18, "signal": 26, "battery_level": 8, "c02_level": 1220, "timestamp" :1475600522 }"""],
    [13, """{"device_id": 13, "device_type": "sensor-ipad", "ip": "67.185.72.1", "cca3": "USA", "cn": "United States", "temp": 34, "signal": 20, "battery_level": 8, "c02_level": 1504, "timestamp" :1475600524 }"""],
    [14, """{"device_id": 14, "device_type": "sensor-inest", "ip": "68.85.85.106", "cca3": "USA", "cn": "United States", "temp": 39, "signal": 17, "battery_level": 8, "c02_level": 831, "timestamp" :1475600526 }"""],
    [15, """{"device_id": 15, "device_type": "sensor-ipad", "ip": "161.188.212.254", "cca3": "USA", "cn": "United States", "temp": 27, "signal": 26, "battery_level": 5, "c02_level": 1378, "timestamp" :1475600528 }"""],
    [16, """{"device_id": 16, "device_type": "sensor-igauge", "ip": "221.3.128.242", "cca3": "CHN", "cn": "China", "temp": 10, "signal": 24, "battery_level": 6, "c02_level": 1423, "timestamp" :1475600530 }"""],
    [17, """{"device_id": 17, "device_type": "sensor-ipad", "ip": "64.124.180.215", "cca3": "USA", "cn": "United States", "temp": 38, "signal": 17, "battery_level": 9, "c02_level": 1304, "timestamp" :1475600532 }"""],
    [18, """{"device_id": 18, "device_type": "sensor-igauge", "ip": "66.153.162.66", "cca3": "USA", "cn": "United States", "temp": 26, "signal": 10, "battery_level": 0, "c02_level": 902, "timestamp" :1475600534 }"""],
    [19, """{"device_id": 19, "device_type": "sensor-ipad", "ip": "193.200.142.254", "cca3": "AUT", "cn": "Austria", "temp": 32, "signal": 27, "battery_level": 5, "c02_level": 1282, "timestamp" :1475600536 }"""]
]

**Create a DataFrame from the above schema**

In [7]:
eventsDF = spark.createDataFrame(data, schema)

In [8]:
eventsDF.toPandas().head()

Unnamed: 0,id,device
0,0,"{""device_id"": 0, ""device_type"": ""sensor-ipad"", ""ip"": ""68.161.225.1"", ""cca3"": ""USA"", ""cn"": ""United States"", ""temp"": 25, ""signal"": 23, ""battery_level"": 8, ""c02_level"": 917, ""timestamp"" :1475600496 }"
1,1,"{""device_id"": 1, ""device_type"": ""sensor-igauge"", ""ip"": ""213.161.254.1"", ""cca3"": ""NOR"", ""cn"": ""Norway"", ""temp"": 30, ""signal"": 18, ""battery_level"": 6, ""c02_level"": 1413, ""timestamp"" :1475600498 }"
2,2,"{""device_id"": 2, ""device_type"": ""sensor-ipad"", ""ip"": ""88.36.5.1"", ""cca3"": ""ITA"", ""cn"": ""Italy"", ""temp"": 18, ""signal"": 25, ""battery_level"": 5, ""c02_level"": 1372, ""timestamp"" :1475600500 }"
3,3,"{""device_id"": 3, ""device_type"": ""sensor-inest"", ""ip"": ""66.39.173.154"", ""cca3"": ""USA"", ""cn"": ""United States"", ""temp"": 47, ""signal"": 12, ""battery_level"": 1, ""c02_level"": 1447, ""timestamp"" :1475600502 }"
4,4,"{""device_id"": 4, ""device_type"": ""sensor-ipad"", ""ip"": ""203.82.41.9"", ""cca3"": ""PHL"", ""cn"": ""Philippines"", ""temp"": 29, ""signal"": 11, ""battery_level"": 0, ""c02_level"": 983, ""timestamp"" :1475600504 }"


In [9]:
eventsDF.printSchema()

root
 |-- id: integer (nullable = true)
 |-- device: string (nullable = true)



## How to use `get_json_object()`
This method extracts a JSON object from a JSON string based on JSON path specified, and returns a JSON string as the extracted JSON object.

In [10]:
from pyspark.sql.functions import get_json_object, col

jsDF = (eventsDF
 .select(
     col("id"), 
     get_json_object(col("device"), "$.device_type").alias('device_type'),
     get_json_object(col("device"), "$.ip").alias('ip'),
     get_json_object(col("device"), "$.cca3").alias('cca3')
 )
)

jsDF.toPandas().head()

Unnamed: 0,id,device_type,ip,cca3
0,0,sensor-ipad,68.161.225.1,USA
1,1,sensor-igauge,213.161.254.1,NOR
2,2,sensor-ipad,88.36.5.1,ITA
3,3,sensor-inest,66.39.173.154,USA
4,4,sensor-ipad,203.82.41.9,PHL


## How to use `from_json()`

A variation of `get_json_object()`, this function uses schema to extract individual columns. Using `from_json()` helper function within the `select()` Dataset API call, I can extract or decode data's attributes and values from a JSON string into a DataFrame as columns, dictated by a schema. As well using the schema, I ascribe all associated atrributes and values within this JSON to represent as an entity devices. As such, not only can you use the `device.attribute` to retrieve its respective value but also all values using the * notation.

In example below:

- Uses the schema above to extract from the JSON string attributes and values and represent them as individual columns as part of `devices`
- `select()` all its columns
- Filters on desired attributes using the `.` notation

In [11]:
from pyspark.sql.functions import from_json, lit

devicesDF = (eventsDF
             .select(from_json(col("device"), jsonSchema).alias('devices'))
             .filter(col('devices.temp') > 10)
             .filter(col('devices.signal') > 15)
             .select('devices.*')
            )

In [12]:
devicesDF.printSchema()

root
 |-- battery_level: long (nullable = true)
 |-- c02_level: long (nullable = true)
 |-- cca3: string (nullable = true)
 |-- cn: string (nullable = true)
 |-- device_id: long (nullable = true)
 |-- device_type: string (nullable = true)
 |-- signal: long (nullable = true)
 |-- ip: string (nullable = true)
 |-- temp: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [13]:
devicesDF.show()

+-------------+---------+----+-------------+---------+-------------+------+---------------+----+-------------------+
|battery_level|c02_level|cca3|           cn|device_id|  device_type|signal|             ip|temp|          timestamp|
+-------------+---------+----+-------------+---------+-------------+------+---------------+----+-------------------+
|            8|      917| USA|United States|        0|  sensor-ipad|    23|   68.161.225.1|  25|2016-10-05 04:01:36|
|            6|     1413| NOR|       Norway|        1|sensor-igauge|    18|  213.161.254.1|  30|2016-10-05 04:01:38|
|            5|     1372| ITA|        Italy|        2|  sensor-ipad|    25|      88.36.5.1|  18|2016-10-05 04:01:40|
|            8|     1574| USA|United States|        5|sensor-istick|    16| 204.116.105.67|  50|2016-10-05 04:01:46|
|            9|     1249| CHN|        China|        6|  sensor-ipad|    18|  220.173.179.1|  21|2016-10-05 04:01:48|
|            9|     1208| USA|United States|        8| sensor-in

In [14]:
devicesDF.select('*').where(col('cca3') == 'USA').sort(['signal', 'temp']) .toPandas().head()

Unnamed: 0,battery_level,c02_level,cca3,cn,device_id,device_type,signal,ip,temp,timestamp
0,9,1208,USA,United States,8,sensor-inest,16,208.109.163.218,40,2016-10-05 04:01:54
1,8,1574,USA,United States,5,sensor-istick,16,204.116.105.67,50,2016-10-05 04:01:46
2,9,1304,USA,United States,17,sensor-ipad,17,64.124.180.215,38,2016-10-05 04:02:12
3,8,831,USA,United States,14,sensor-inest,17,68.85.85.106,39,2016-10-05 04:02:06
4,8,1504,USA,United States,13,sensor-ipad,20,67.185.72.1,34,2016-10-05 04:02:04


## How to use `to_json()`

Now, let's do the reverse: you can convert or encode our filtered devices into JSON string using `to_json()`. That is, convert a JSON struct into a string. The result can be republished, for instance, to Kafka or saved on disk as parquet files. To learn how to write to Kafka and other sinks, read this blog and our series on Structured Streaming blogs.

In [15]:
from pyspark.sql.functions import to_json, struct

stringJsonDF = eventsDF.select(to_json(struct('*')).alias('devices'))
stringJsonDF.toPandas().head(5)

Unnamed: 0,devices
0,"{""id"":0,""device"":""{\""device_id\"": 0, \""device_type\"": \""sensor-ipad\"", \""ip\"": \""68.161.225.1\"", \""cca3\"": \""USA\"", \""cn\"": \""United States\"", \""temp\"": 25, \""signal\"": 23, \""battery_level\"": 8, \""c02_level\"": 917, \""timestamp\"" :1475600496 }""}"
1,"{""id"":1,""device"":""{\""device_id\"": 1, \""device_type\"": \""sensor-igauge\"", \""ip\"": \""213.161.254.1\"", \""cca3\"": \""NOR\"", \""cn\"": \""Norway\"", \""temp\"": 30, \""signal\"": 18, \""battery_level\"": 6, \""c02_level\"": 1413, \""timestamp\"" :1475600498 }""}"
2,"{""id"":2,""device"":""{\""device_id\"": 2, \""device_type\"": \""sensor-ipad\"", \""ip\"": \""88.36.5.1\"", \""cca3\"": \""ITA\"", \""cn\"": \""Italy\"", \""temp\"": 18, \""signal\"": 25, \""battery_level\"": 5, \""c02_level\"": 1372, \""timestamp\"" :1475600500 }""}"
3,"{""id"":3,""device"":""{\""device_id\"": 3, \""device_type\"": \""sensor-inest\"", \""ip\"": \""66.39.173.154\"", \""cca3\"": \""USA\"", \""cn\"": \""United States\"", \""temp\"": 47, \""signal\"": 12, \""battery_level\"": 1, \""c02_level\"": 1447, \""timestamp\"" :1475600502 }""}"
4,"{""id"":4,""device"":""{\""device_id\"": 4, \""device_type\"": \""sensor-ipad\"", \""ip\"": \""203.82.41.9\"", \""cca3\"": \""PHL\"", \""cn\"": \""Philippines\"", \""temp\"": 29, \""signal\"": 11, \""battery_level\"": 0, \""c02_level\"": 983, \""timestamp\"" :1475600504 }""}"


## How to use `selectExpr()`
Another way to convert or encode a column into a JSON object as string is to use the `selectExpr()` utility function. For instance, I can convert the `device` column of our DataFrame from above into a JSON String.

In [16]:
stringsDF = eventsDF.selectExpr("CAST(id AS INT)", "CAST(device AS STRING)")

In [17]:
stringsDF.printSchema()

root
 |-- id: integer (nullable = true)
 |-- device: string (nullable = true)



In [18]:
stringsDF.toPandas().head(4)

Unnamed: 0,id,device
0,0,"{""device_id"": 0, ""device_type"": ""sensor-ipad"", ""ip"": ""68.161.225.1"", ""cca3"": ""USA"", ""cn"": ""United States"", ""temp"": 25, ""signal"": 23, ""battery_level"": 8, ""c02_level"": 917, ""timestamp"" :1475600496 }"
1,1,"{""device_id"": 1, ""device_type"": ""sensor-igauge"", ""ip"": ""213.161.254.1"", ""cca3"": ""NOR"", ""cn"": ""Norway"", ""temp"": 30, ""signal"": 18, ""battery_level"": 6, ""c02_level"": 1413, ""timestamp"" :1475600498 }"
2,2,"{""device_id"": 2, ""device_type"": ""sensor-ipad"", ""ip"": ""88.36.5.1"", ""cca3"": ""ITA"", ""cn"": ""Italy"", ""temp"": 18, ""signal"": 25, ""battery_level"": 5, ""c02_level"": 1372, ""timestamp"" :1475600500 }"
3,3,"{""device_id"": 3, ""device_type"": ""sensor-inest"", ""ip"": ""66.39.173.154"", ""cca3"": ""USA"", ""cn"": ""United States"", ""temp"": 47, ""signal"": 12, ""battery_level"": 1, ""c02_level"": 1447, ""timestamp"" :1475600502 }"


Another use of `selectExpr()` is its ability, as the function name suggests, take expressions as arguments and convert them into respective columns. For instance, say I want to express c02 levels and temperature ratios.

In [19]:
(devicesDF
 .selectExpr('c02_level', "round(c02_level/temp) as ratio_c02_temperature")
 .sort("ratio_c02_temperature", ascending=False)
).show(5)

+---------+---------------------+
|c02_level|ratio_c02_temperature|
+---------+---------------------+
|     1372|                 76.0|
|     1220|                 68.0|
|     1249|                 59.0|
|     1378|                 51.0|
|     1413|                 47.0|
+---------+---------------------+
only showing top 5 rows



The above query could as easily be expressed in Spark SQL as in DataFrame API. The power of selectExpr() lies in dealing with or working with numerical values. Let's try to create a tempoary view and express the same query, except this time we use SQL.

In [20]:
devicesDF.createOrReplaceTempView("devicesDFT")

In [21]:
spark.sql("""
select c02_level, 
        round(c02_level/temp) as ratio_c02_temperature 
        from devicesDFT
        order by ratio_c02_temperature desc
""").show(5)

+---------+---------------------+
|c02_level|ratio_c02_temperature|
+---------+---------------------+
|     1372|                 76.0|
|     1220|                 68.0|
|     1249|                 59.0|
|     1378|                 51.0|
|     1413|                 47.0|
+---------+---------------------+
only showing top 5 rows



**To verify that all your string conversions are preserved in the above DataFrame stringJsonDF, let's save to blob storage as Parquet.**

In [22]:
(stringJsonDF
  .write
  .mode("overwrite")
  .format("parquet")
  .save("/tmp/iot")
)

In [23]:
! ls -l /tmp/iot

total 64
-rw-r--r--  1 rk  wheel     0 15 Apr 19:14 _SUCCESS
-rw-r--r--  1 rk  wheel  1681 15 Apr 19:14 part-00000-a6aaad7d-0757-4c99-b0da-0a42d5986d24-c000.snappy.parquet
-rw-r--r--  1 rk  wheel  1665 15 Apr 19:14 part-00001-a6aaad7d-0757-4c99-b0da-0a42d5986d24-c000.snappy.parquet
-rw-r--r--  1 rk  wheel  1694 15 Apr 19:14 part-00002-a6aaad7d-0757-4c99-b0da-0a42d5986d24-c000.snappy.parquet
-rw-r--r--  1 rk  wheel  1830 15 Apr 19:14 part-00003-a6aaad7d-0757-4c99-b0da-0a42d5986d24-c000.snappy.parquet
-rw-r--r--  1 rk  wheel  1689 15 Apr 19:14 part-00004-a6aaad7d-0757-4c99-b0da-0a42d5986d24-c000.snappy.parquet
-rw-r--r--  1 rk  wheel  1683 15 Apr 19:14 part-00005-a6aaad7d-0757-4c99-b0da-0a42d5986d24-c000.snappy.parquet
-rw-r--r--  1 rk  wheel  1694 15 Apr 19:14 part-00006-a6aaad7d-0757-4c99-b0da-0a42d5986d24-c000.snappy.parquet
-rw-r--r--  1 rk  wheel  1839 15 Apr 19:14 part-00007-a6aaad7d-0757-4c99-b0da-0a42d5986d24-c000.snappy.parquet


Now let's verify what was saved—devices as each indivdual strings encoded from above—are actual strings.

In [24]:
parquetDF = spark.read.parquet("/tmp/iot")

Let's check the schema to ensure what was written is not different from what is read, namely the JSON string.

In [25]:
parquetDF.printSchema()

root
 |-- devices: string (nullable = true)



In [26]:
parquetDF.toPandas().head(5)

Unnamed: 0,devices
0,"{""id"":16,""device"":""{\""device_id\"": 16, \""device_type\"": \""sensor-igauge\"", \""ip\"": \""221.3.128.242\"", \""cca3\"": \""CHN\"", \""cn\"": \""China\"", \""temp\"": 10, \""signal\"": 24, \""battery_level\"": 6, \""c02_level\"": 1423, \""timestamp\"" :1475600530 }""}"
1,"{""id"":17,""device"":""{\""device_id\"": 17, \""device_type\"": \""sensor-ipad\"", \""ip\"": \""64.124.180.215\"", \""cca3\"": \""USA\"", \""cn\"": \""United States\"", \""temp\"": 38, \""signal\"": 17, \""battery_level\"": 9, \""c02_level\"": 1304, \""timestamp\"" :1475600532 }""}"
2,"{""id"":18,""device"":""{\""device_id\"": 18, \""device_type\"": \""sensor-igauge\"", \""ip\"": \""66.153.162.66\"", \""cca3\"": \""USA\"", \""cn\"": \""United States\"", \""temp\"": 26, \""signal\"": 10, \""battery_level\"": 0, \""c02_level\"": 902, \""timestamp\"" :1475600534 }""}"
3,"{""id"":19,""device"":""{\""device_id\"": 19, \""device_type\"": \""sensor-ipad\"", \""ip\"": \""193.200.142.254\"", \""cca3\"": \""AUT\"", \""cn\"": \""Austria\"", \""temp\"": 32, \""signal\"": 27, \""battery_level\"": 5, \""c02_level\"": 1282, \""timestamp\"" :1475600536 }""}"
4,"{""id"":6,""device"":""{\""device_id\"": 6, \""device_type\"": \""sensor-ipad\"", \""ip\"": \""220.173.179.1\"", \""cca3\"": \""CHN\"", \""cn\"": \""China\"", \""temp\"": 21, \""signal\"": 18, \""battery_level\"": 9, \""c02_level\"": 1249, \""timestamp\"" :1475600508 }""}"


So far this tutorial has explored ways to use get_json_object(), from_json(), to_json(), selectExpr(), and explode() helper functions handling less complex JSON objects.

Let's turn focus to a more nested structures and examine how these same APIs as applied to a complex JSON as simple one.

## Nested Structures
It's not unreasonable to assume that your JSON nested structures may have Maps as well as nested JSON. For illustration, let's use a single string comprised of complex and nested data types, including a Map. In a real life scenario, this could be a reading from a device event, with dangerous levels of C02 emissions or high temperature readings, that needs Network Operation Center (NOC) notification for some immediate action.

In [27]:
schema = (StructType()
          .add("dc_id", StringType())
          .add("source",
               MapType(
                   StringType(),
                   StructType()
                   .add("description", StringType())
                   .add("ip", StringType())
                   .add("id", LongType())
                   .add("temp", LongType())
                   .add("c02_level", LongType())
                   .add("geo", StructType()
                        .add("lat", DoubleType())
                        .add("long", DoubleType())
                       )
               )
              )
         )

In [28]:
# Create static data
data = ["""{

    "dc_id": "dc-101",
    "source": {
        "sensor-igauge": {
        "id": 10,
        "ip": "68.28.91.22",
        "description": "Sensor attached to the container ceilings",
        "temp":35,
        "c02_level": 1475,
        "geo": {"lat":38.00, "long":97.00}                        
      },
      "sensor-ipad": {
        "id": 13,
        "ip": "67.185.72.1",
        "description": "Sensor ipad attached to carbon cylinders",
        "temp": 34,
        "c02_level": 1370,
        "geo": {"lat":47.41, "long":-122.00}
      },
      "sensor-inest": {
        "id": 8,
        "ip": "208.109.163.218",
        "description": "Sensor attached to the factory ceilings",
        "temp": 40,
        "c02_level": 1346,
        "geo": {"lat":33.61, "long":-111.89}
      },
      "sensor-istick": {
        "id": 5,
        "ip": "204.116.105.67",
        "description": "Sensor embedded in exhaust pipes in the ceilings",
        "temp": 40,
        "c02_level": 1574,
        "geo": {"lat":35.93, "long":-85.46}
      }
    }
  }"""]

sc = spark._sc
dataRDD = sc.parallelize(data)

In [29]:
dataRDD

ParallelCollectionRDD[49] at readRDDFromFile at PythonRDD.scala:262

In [30]:
dataRDD.collect()

['{\n\n    "dc_id": "dc-101",\n    "source": {\n        "sensor-igauge": {\n        "id": 10,\n        "ip": "68.28.91.22",\n        "description": "Sensor attached to the container ceilings",\n        "temp":35,\n        "c02_level": 1475,\n        "geo": {"lat":38.00, "long":97.00}                        \n      },\n      "sensor-ipad": {\n        "id": 13,\n        "ip": "67.185.72.1",\n        "description": "Sensor ipad attached to carbon cylinders",\n        "temp": 34,\n        "c02_level": 1370,\n        "geo": {"lat":47.41, "long":-122.00}\n      },\n      "sensor-inest": {\n        "id": 8,\n        "ip": "208.109.163.218",\n        "description": "Sensor attached to the factory ceilings",\n        "temp": 40,\n        "c02_level": 1346,\n        "geo": {"lat":33.61, "long":-111.89}\n      },\n      "sensor-istick": {\n        "id": 5,\n        "ip": "204.116.105.67",\n        "description": "Sensor embedded in exhaust pipes in the ceilings",\n        "temp": 40,\n        "c0

In [31]:
df = spark.read.schema(schema).json(dataRDD)

In [32]:
df.printSchema()

root
 |-- dc_id: string (nullable = true)
 |-- source: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- ip: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- temp: long (nullable = true)
 |    |    |-- c02_level: long (nullable = true)
 |    |    |-- geo: struct (nullable = true)
 |    |    |    |-- lat: double (nullable = true)
 |    |    |    |-- long: double (nullable = true)



In [33]:
df.toPandas()

Unnamed: 0,dc_id,source
0,dc-101,"{'sensor-ipad': ('Sensor ipad attached to carbon cylinders', '67.185.72.1', 13, 34, 1370, (47.41, -122.0)), 'sensor-inest': ('Sensor attached to the factory ceilings', '208.109.163.218', 8, 40, 1346, (33.61, -111.89)), 'sensor-istick': ('Sensor embedded in exhaust pipes in the ceilings', '204.116.105.67', 5, 40, 1574, (35.93, -85.46)), 'sensor-igauge': ('Sensor attached to the container ceilings', '68.28.91.22', 10, 35, 1475, (38.0, 97.0))}"


### How to use `explode()`

The `explode()` function is used to show how to extract nested structures. Plus, it sheds more light when we see how it works alongside `to_json()` and `from_json()` functions, when extracting attributes and values from complex JSON structures. So on occasion, you will want to use `explode()`, alongside `to_json()` and `from_json()` functions. And here's one case where we do.

The `explode()` function creates a new row for each element in the given map column. In this case, the map column is source. Note that for each key-value in the map, you have a respective Row, in this case four.

In [34]:
from pyspark.sql.functions import explode

explodedDF = df.select("dc_id", explode("source"))

In [35]:
explodedDF.printSchema()

root
 |-- dc_id: string (nullable = true)
 |-- key: string (nullable = false)
 |-- value: struct (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- ip: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- temp: long (nullable = true)
 |    |-- c02_level: long (nullable = true)
 |    |-- geo: struct (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- long: double (nullable = true)



When you look at the schema, notice that source now has been expanded. The `key` is now part of root schema.

In [36]:
explodedDF.toPandas()

Unnamed: 0,dc_id,key,value
0,dc-101,sensor-igauge,"(Sensor attached to the container ceilings, 68.28.91.22, 10, 35, 1475, (38.0, 97.0))"
1,dc-101,sensor-ipad,"(Sensor ipad attached to carbon cylinders, 67.185.72.1, 13, 34, 1370, (47.41, -122.0))"
2,dc-101,sensor-inest,"(Sensor attached to the factory ceilings, 208.109.163.218, 8, 40, 1346, (33.61, -111.89))"
3,dc-101,sensor-istick,"(Sensor embedded in exhaust pipes in the ceilings, 204.116.105.67, 5, 40, 1574, (35.93, -85.46))"


In [37]:
notifyDeviceDF = (explodedDF
                   .select(col('dc_id').alias('dcId'), 
                           col('key').alias('deviceType'),
                           'value.ip',
                           col('value.id').alias('deviceId'),
                           'value.c02_level',
                           'value.temp',
                           'value.geo.lat',
                           'value.geo.long'
                          )
                   )

notifyDeviceDF.toPandas()

Unnamed: 0,dcId,deviceType,ip,deviceId,c02_level,temp,lat,long
0,dc-101,sensor-igauge,68.28.91.22,10,1475,35,38.0,97.0
1,dc-101,sensor-ipad,67.185.72.1,13,1370,34,47.41,-122.0
2,dc-101,sensor-inest,208.109.163.218,8,1346,40,33.61,-111.89
3,dc-101,sensor-istick,204.116.105.67,5,1574,40,35.93,-85.46


## Nest Device Data
Let's look at another complex real-life data from Nest's readings. A Nest devices emits many JSON events to its collector. That collector could be at a nearby data center, a neighborhood-central data collector or an aggregator, or it could be a device installed at home, which on regular intervals sends device readings to a central data center connected via a secured internet connection. For illusration, I have curbed some of the attributes, but it still shows how complex data can be processed—and relevant attributes extracted.

Let's define its complicated schema first. At close observation, you will notice it's not dissimilar to the schema we defined above, except it has not one map but three maps: thermostats, cameras, and smoke alarms.

In [38]:
nestSchema2 = (
    StructType()
    .add("devices", 
         StructType()
         .add("thermostats", 
              MapType(StringType(),
                      StructType()
                      .add("device_id", StringType())
                      .add("locale", StringType())
                      .add("software_version", StringType())
                      .add("structure_id", StringType())
                      .add("where_name", StringType())
                      .add("last_connection", StringType())
                      .add("is_online", BooleanType())
                      .add("can_cool", BooleanType())
                      .add("can_heat", BooleanType())
                      .add("is_using_emergency_heat", BooleanType())
                      .add("has_fan", BooleanType())
                      .add("fan_timer_active", BooleanType())
                      .add("fan_timer_timeout", StringType())
                      .add("temperature_scale", StringType())
                      .add("target_temperature_f", DoubleType())
                      .add("target_temperature_high_f", DoubleType())
                      .add("target_temperature_low_f", DoubleType())
                      .add("eco_temperature_high_f", DoubleType())
                      .add("eco_temperature_low_f", DoubleType())
                      .add("away_temperature_high_f", DoubleType())
                      .add("away_temperature_low_f", DoubleType())
                      .add("hvac_mode", StringType())
                      .add("humidity", LongType())
                      .add("hvac_state", StringType())
                      .add("is_locked", StringType())
                      .add("locked_temp_min_f", DoubleType())
                      .add("locked_temp_max_f", DoubleType())
                     )
             )
         .add("smoke_co_alarms", 
              MapType(StringType(),
                      StructType()
                      .add("device_id", StringType())
                      .add("locale", StringType())
                      .add("software_version", StringType())
                      .add("structure_id", StringType())
                      .add("where_name", StringType())
                      .add("last_connection", StringType())
                      .add("is_online", BooleanType())
                      .add("battery_health", StringType())
                      .add("co_alarm_state", StringType())
                      .add("smoke_alarm_state", StringType())
                      .add("is_manual_test_active", BooleanType())
                      .add("last_manual_test_time", StringType())
                      .add("ui_color_state", StringType())
                     )
             )
         .add("cameras", 
              MapType(StringType(),
                      StructType()
                      .add("device_id", StringType())
                      .add("software_version", StringType())
                      .add("structure_id", StringType())
                      .add("where_name", StringType())
                      .add("is_online", BooleanType())
                      .add("is_streaming", BooleanType())
                      .add("is_audio_input_enabled", BooleanType())
                      .add("last_is_online_change", StringType())
                      .add("is_video_history_enabled", BooleanType())
                      .add("web_url", StringType())
                      .add("app_url", StringType())
                      .add("is_public_share_enabled", BooleanType())
                      .add("activity_zones",
                           StructType()
                           .add("name", StringType())
                           .add("id", LongType())
                          )
                      .add("last_event", StringType()))
             )
        )
)

In [39]:
nestDataDS2 = [
"""{
    "devices": {
       "thermostats": {
          "peyiJNo0IldT2YlIVtYaGQ": {
            "device_id": "peyiJNo0IldT2YlIVtYaGQ",
            "locale": "en-US",
            "software_version": "4.0",
            "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
            "where_name": "Hallway Upstairs",
            "last_connection": "2016-10-31T23:59:59.000Z",
            "is_online": true,
            "can_cool": true,
            "can_heat": true,
            "is_using_emergency_heat": true,
            "has_fan": true,
            "fan_timer_active": true,
            "fan_timer_timeout": "2016-10-31T23:59:59.000Z",
            "temperature_scale": "F",
            "target_temperature_f": 72,
            "target_temperature_high_f": 80,
            "target_temperature_low_f": 65,
            "eco_temperature_high_f": 80,
            "eco_temperature_low_f": 65,
            "away_temperature_high_f": 80,
            "away_temperature_low_f": 65,
            "hvac_mode": "heat",
            "humidity": 40,
            "hvac_state": "heating",
            "is_locked": true,
            "locked_temp_min_f": 65,
            "locked_temp_max_f": 80
            }
          },
          "smoke_co_alarms": {
            "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs": {
              "device_id": "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs",
              "locale": "en-US",
              "software_version": "1.01",
              "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
              "where_name": "Jane's Room",
              "last_connection": "2016-10-31T23:59:59.000Z",
              "is_online": true,
              "battery_health": "ok",
              "co_alarm_state": "ok",
              "smoke_alarm_state": "ok",
              "is_manual_test_active": true,
              "last_manual_test_time": "2016-10-31T23:59:59.000Z",
              "ui_color_state": "gray"
              }
            },
         "cameras": {
          "awJo6rH0IldT2YlIVtYaGQ": {
            "device_id": "awJo6rH",
            "software_version": "4.0",
            "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
            "where_name": "Foyer",
            "is_online": true,
            "is_streaming": true,
            "is_audio_input_enabled": true,
            "last_is_online_change": "2016-12-29T18:42:00.000Z",
            "is_video_history_enabled": true,
            "web_url": "https://home.nest.com/cameras/device_id?auth=access_token",
            "app_url": "nestmobile://cameras/device_id?auth=access_token",
            "is_public_share_enabled": true,
            "activity_zones": { "name": "Walkway", "id": 244083 },
            "last_event": "2016-10-31T23:59:59.000Z"
            }
          }
        }
}"""
]

In [40]:
dataRDD = sc.parallelize(nestDataDS2)
dataRDD.collect()

['{\n    "devices": {\n       "thermostats": {\n          "peyiJNo0IldT2YlIVtYaGQ": {\n            "device_id": "peyiJNo0IldT2YlIVtYaGQ",\n            "locale": "en-US",\n            "software_version": "4.0",\n            "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",\n            "where_name": "Hallway Upstairs",\n            "last_connection": "2016-10-31T23:59:59.000Z",\n            "is_online": true,\n            "can_cool": true,\n            "can_heat": true,\n            "is_using_emergency_heat": true,\n            "has_fan": true,\n            "fan_timer_active": true,\n            "fan_timer_timeout": "2016-10-31T23:59:59.000Z",\n            "temperature_scale": "F",\n            "target_temperature_f": 72,\n            "target_temperature_high_f": 80,\n            "target_temperature_low_f": 65,\n            "eco_temperature_high_f": 80,\n            "eco_temperature_low_f": 65,\n            "away_temperature_high_f": 80,\n            "away_temper

In [41]:
nestDF2 = spark.read.schema(nestSchema2).json(dataRDD)
nestDF2.printSchema()

root
 |-- devices: struct (nullable = true)
 |    |-- thermostats: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: struct (valueContainsNull = true)
 |    |    |    |-- device_id: string (nullable = true)
 |    |    |    |-- locale: string (nullable = true)
 |    |    |    |-- software_version: string (nullable = true)
 |    |    |    |-- structure_id: string (nullable = true)
 |    |    |    |-- where_name: string (nullable = true)
 |    |    |    |-- last_connection: string (nullable = true)
 |    |    |    |-- is_online: boolean (nullable = true)
 |    |    |    |-- can_cool: boolean (nullable = true)
 |    |    |    |-- can_heat: boolean (nullable = true)
 |    |    |    |-- is_using_emergency_heat: boolean (nullable = true)
 |    |    |    |-- has_fan: boolean (nullable = true)
 |    |    |    |-- fan_timer_active: boolean (nullable = true)
 |    |    |    |-- fan_timer_timeout: string (nullable = true)
 |    |    |    |-- temperature_scale: string (nullable

In [42]:
nestDF2.toPandas()

Unnamed: 0,devices
0,"({'peyiJNo0IldT2YlIVtYaGQ': ('peyiJNo0IldT2YlIVtYaGQ', 'en-US', '4.0', 'VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw', 'Hallway Upstairs', '2016-10-31T23:59:59.000Z', True, True, True, True, True, True, '2016-10-31T23:59:59.000Z', 'F', 72.0, 80.0, 65.0, 80.0, 65.0, 80.0, 65.0, 'heat', 40, 'heating', 'true', 65.0, 80.0)}, {'RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs': ('RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs', 'en-US', '1.01', 'VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw', 'Jane's Room', '2016-10-31T23:59:59.000Z', True, 'ok', 'ok', 'ok', True, '2016-10-31T23:59:59.000Z', 'gray')}, {'awJo6rH0IldT2YlIVtYaGQ': ('awJo6rH', '4.0', 'VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw', 'Foyer', True, True, True, '2016-12-29T18:42:00.000Z', True, 'https://home.nest.com/cameras/device_id?auth=access_token', 'nestmobile://cameras/device_id?auth=access_token', True, Row(name='Walkway', id=244083), '2016-10-31T23:59:59.000Z')})"


**Converting the entire JSON object above into a JSON string**

In [43]:
stringJsonDF = nestDF2.select(to_json(struct("*"))).toDF("nestDevice")

In [44]:
stringJsonDF.printSchema()

root
 |-- nestDevice: string (nullable = true)



In [45]:
stringJsonDF.toPandas()

Unnamed: 0,nestDevice
0,"{""devices"":{""thermostats"":{""peyiJNo0IldT2YlIVtYaGQ"":{""device_id"":""peyiJNo0IldT2YlIVtYaGQ"",""locale"":""en-US"",""software_version"":""4.0"",""structure_id"":""VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw"",""where_name"":""Hallway Upstairs"",""last_connection"":""2016-10-31T23:59:59.000Z"",""is_online"":true,""can_cool"":true,""can_heat"":true,""is_using_emergency_heat"":true,""has_fan"":true,""fan_timer_active"":true,""fan_timer_timeout"":""2016-10-31T23:59:59.000Z"",""temperature_scale"":""F"",""target_temperature_f"":72.0,""target_temperature_high_f"":80.0,""target_temperature_low_f"":65.0,""eco_temperature_high_f"":80.0,""eco_temperature_low_f"":65.0,""away_temperature_high_f"":80.0,""away_temperature_low_f"":65.0,""hvac_mode"":""heat"",""humidity"":40,""hvac_state"":""heating"",""is_locked"":""true"",""locked_temp_min_f"":65.0,""locked_temp_max_f"":80.0}},""smoke_co_alarms"":{""RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs"":{""device_id"":""RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs"",""locale"":""en-US"",""software_version"":""1.01"",""structure_id"":""VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw"",""where_name"":""Jane's Room"",""last_connection"":""2016-10-31T23:59:59.000Z"",""is_online"":true,""battery_health"":""ok"",""co_alarm_state"":""ok"",""smoke_alarm_state"":""ok"",""is_manual_test_active"":true,""last_manual_test_time"":""2016-10-31T23:59:59.000Z"",""ui_color_state"":""gray""}},""cameras"":{""awJo6rH0IldT2YlIVtYaGQ"":{""device_id"":""awJo6rH"",""software_version"":""4.0"",""structure_id"":""VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw"",""where_name"":""Foyer"",""is_online"":true,""is_streaming"":true,""is_audio_input_enabled"":true,""last_is_online_change"":""2016-12-29T18:42:00.000Z"",""is_video_history_enabled"":true,""web_url"":""https://home.nest.com/cameras/device_id?auth=access_token"",""app_url"":""nestmobile://cameras/device_id?auth=access_token"",""is_public_share_enabled"":true,""activity_zones"":{""name"":""Walkway"",""id"":244083},""last_event"":""2016-10-31T23:59:59.000Z""}}}}"


Given the nested JSON object with three maps, you can get fetch individual map as a columnn, and then access attributes from it using explode().

In [46]:
mapColumnsDF = (nestDF2
                .select(col("devices.smoke_co_alarms").alias ("smoke_alarms"),
                        col("devices.cameras").alias ("cameras"),
                        col("devices.thermostats").alias ("thermostats")
                       )
               )

mapColumnsDF.toPandas()

Unnamed: 0,smoke_alarms,cameras,thermostats
0,"{'RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs': ('RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs', 'en-US', '1.01', 'VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw', 'Jane's Room', '2016-10-31T23:59:59.000Z', True, 'ok', 'ok', 'ok', True, '2016-10-31T23:59:59.000Z', 'gray')}","{'awJo6rH0IldT2YlIVtYaGQ': ('awJo6rH', '4.0', 'VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw', 'Foyer', True, True, True, '2016-12-29T18:42:00.000Z', True, 'https://home.nest.com/cameras/device_id?auth=access_token', 'nestmobile://cameras/device_id?auth=access_token', True, ('Walkway', 244083), '2016-10-31T23:59:59.000Z')}","{'peyiJNo0IldT2YlIVtYaGQ': ('peyiJNo0IldT2YlIVtYaGQ', 'en-US', '4.0', 'VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw', 'Hallway Upstairs', '2016-10-31T23:59:59.000Z', True, True, True, True, True, True, '2016-10-31T23:59:59.000Z', 'F', 72.0, 80.0, 65.0, 80.0, 65.0, 80.0, 65.0, 'heat', 40, 'heating', 'true', 65.0, 80.0)}"


In [47]:
explodedThermostatsDF = mapColumnsDF.select(explode("thermostats"))
explodedCamerasDF = mapColumnsDF.select(explode("cameras"))
explodedSmokedAlarmsDF = nestDF2.select(explode("devices.smoke_co_alarms"))

In [48]:
explodedThermostatsDF.toPandas()

Unnamed: 0,key,value
0,peyiJNo0IldT2YlIVtYaGQ,"(peyiJNo0IldT2YlIVtYaGQ, en-US, 4.0, VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw, Hallway Upstairs, 2016-10-31T23:59:59.000Z, True, True, True, True, True, True, 2016-10-31T23:59:59.000Z, F, 72.0, 80.0, 65.0, 80.0, 65.0, 80.0, 65.0, heat, 40, heating, true, 65.0, 80.0)"


In [49]:
explodedSmokedAlarmsDF.toPandas()

Unnamed: 0,key,value
0,RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs,"(RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs, en-US, 1.01, VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw, Jane's Room, 2016-10-31T23:59:59.000Z, True, ok, ok, ok, True, 2016-10-31T23:59:59.000Z, gray)"


To extract specific individual fields from map, you can use the getItem() method.

In [50]:
thermostateDF = (
    explodedThermostatsDF
    .select("value.device_id", 
            "value.locale",
            col('value.where_name').alias('location'),
            col('value.last_connection').alias('last_connected'),
            'value.humidity',
            'value.target_temperature_f',
            col('value.hvac_mode').alias('mode'),
            col('value.software_version').alias('version')
           )
)

thermostateDF.toPandas()

Unnamed: 0,device_id,locale,location,last_connected,humidity,target_temperature_f,mode,version
0,peyiJNo0IldT2YlIVtYaGQ,en-US,Hallway Upstairs,2016-10-31T23:59:59.000Z,40,72.0,heat,4.0


In [51]:
cameraDF = (
    explodedCamerasDF.
    select('value.device_id',
           col('value.where_name').alias('location'),
           col('value.software_version').alias('version'),
           col('value.activity_zones.name').alias('name'),
           col('value.activity_zones.id').alias('id')
          )
)
 
cameraDF.toPandas()

Unnamed: 0,device_id,location,version,name,id
0,awJo6rH,Foyer,4.0,Walkway,244083


In [52]:
smokedAlarmsDF = (
    explodedSmokedAlarmsDF
    .select("value.device_id",
           col('value.where_name').alias('location'),
           col('value.software_version').alias('version'),
           col('value.last_connection').alias('last_connected'),
           col('value.battery_health').alias('battery_health') 
           )
)

smokedAlarmsDF.toPandas()

Unnamed: 0,device_id,location,version,last_connected,battery_health
0,RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs,Jane's Room,1.01,2016-10-31T23:59:59.000Z,ok


Let's join two DataFrames over column version.

In [53]:
joinDF = thermostateDF.join(cameraDF, "version")
joinDF.toPandas()

Unnamed: 0,version,device_id,locale,location,last_connected,humidity,target_temperature_f,mode,device_id.1,location.1,name,id
0,4.0,peyiJNo0IldT2YlIVtYaGQ,en-US,Hallway Upstairs,2016-10-31T23:59:59.000Z,40,72.0,heat,awJo6rH,Foyer,Walkway,244083


Summary
The point of this short tutorial has been to demonstrate the easy use of utility functions to extract JSON attributes from a complex and nested structure. And once you have exploded or flattened or parsed the desired values into respective DataFrames or Datasets, you can as easily extract and query them as you would any DataFrame or Dataset, using respective APIs. Check our Structured Streaming series part 3, where we show how you can read Nest device logs from Apache Kafka and do some ETL on them.