Basically what we will be doing is that we will be slicing and dicing the nested JSON stucture(used by the NoSQL databases) using spark SQL functions as there exists ways to do that

The higher order functions are primarily suited for manipulating the arrays in spark SQL

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

Now creating a JSON schema with attributes and values

In [19]:
#this is the way of defining the schema in pyspark(First entry(name),second entry(datatype),third entry(nullable or not))
"""schema = StructType([StructField('Name', StringType(), True),
                     StructField('DateTime', TimestampType(), True)
                     StructField('Age', IntegerType(), True)])"""

#This the basic JSON schema(actually the json documents appear in this format)
"""{
  "$schema": "http://json-schema.org/draft/2019-09/schema",
  "title": "Product",
  "type": "object",
  "required": ["id", "name", "price"],
  "properties": {
    "id": {
      "type": "number",
      "description": "Product identifier"
    },
    "name": {
      "type": "string",
      "description": "Name of the product"
    },
    "price": {
      "type": "number",
      "minimum": 0
    },
    "tags": {
      "type": "array",
      "items": {
        "type": "string"
      }
    },
    "stock": {
      "type": "object",
      "properties": {
        "warehouse": {
          "type": "number"
        },
        "retail": {
          "type": "number"
        }
      }
    }
  }
}
"""
#Dealing with JSON data format
#Creating the JSON schema
schema = StructType().add("dc_id", StringType()).add("source", MapType(StringType(), StructType()
                        .add("description", StringType())
                        .add("ip", StringType())
                        .add("id", IntegerType())
                        .add("temp", ArrayType(IntegerType()))
                        .add("c02_level", ArrayType(IntegerType()))
                        .add("geo", StructType()
                             .add("lat", DoubleType())
                             .add("long", DoubleType()))))

In [20]:
#Converting the JSON string th the dataframe(second citizer of spark)@also parallelizing
def JSON_To_DataFrame(json, schema=None):
    reader = spark.read
    if schema:
        reader.schema(schema)
    return reader.json(sc.parallelize([json]))

In [21]:
#Taking a JSON document and converting it to dataframe using the above function
#Giving input json document and the schema schema structure we created
#we can give the format of json as we want(according to our necessity)
Data_Frame = JSON_To_DataFrame("""{

    "dc_id": "dc-101",
    "source": {
        "sensor-igauge": {
        "id": 10,
        "ip": "68.28.91.22",
        "description": "Sensor attached to the container ceilings",
        "temp":[35,35,35,36,35,35,32,35,30,35,32,35],
        "c02_level": [1475,1476,1473],
        "geo": {"lat":38.00, "long":97.00}                        
      },
      "sensor-ipad": {
        "id": 13,
        "ip": "67.185.72.1",
        "description": "Sensor ipad attached to carbon cylinders",
        "temp": [45,45,45,46,45,45,42,35,40,45,42,45],
        "c02_level": [1370,1371,1372],
        "geo": {"lat":47.41, "long":-122.00}
      },
      "sensor-inest": {
        "id": 8,
        "ip": "208.109.163.218",
        "description": "Sensor attached to the factory ceilings",
        "temp": [40,40,40,40,40,43,42,40,40,45,42,45],
        "c02_level": [1346,1345, 1343],
        "geo": {"lat":33.61, "long":-111.89}
      },
      "sensor-istick": {
        "id": 5,
        "ip": "204.116.105.67",
        "description": "Sensor embedded in exhaust pipes in the ceilings",
        "temp":[30,30,30,30,40,43,42,40,40,35,42,35],
        "c02_level": [1574,1570, 1576],
        "geo": {"lat":35.93, "long":-85.46}
      }
    }
  }""",schema)
#displaying the dataframe data format
display(Data_Frame)

DataFrame[dc_id: string, source: map<string,struct<description:string,ip:string,id:int,temp:array<int>,c02_level:array<int>,geo:struct<lat:double,long:double>>>]

In [22]:
Data_Frame.show()

+------+--------------------+
| dc_id|              source|
+------+--------------------+
|dc-101|[sensor-igauge ->...|
+------+--------------------+



In [23]:
#Printing the schema of dataframe
Data_Frame.printSchema()

root
 |-- dc_id: string (nullable = true)
 |-- source: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- ip: string (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- temp: array (nullable = true)
 |    |    |    |-- element: integer (containsNull = true)
 |    |    |-- c02_level: array (nullable = true)
 |    |    |    |-- element: integer (containsNull = true)
 |    |    |-- geo: struct (nullable = true)
 |    |    |    |-- lat: double (nullable = true)
 |    |    |    |-- long: double (nullable = true)



In [24]:
#Using the explode functions to explode the column source into individual columns
#As the column source has values corresponding to the string(sensor-igauge,etc)
#Key and Values as we have defigned map in our schema structure
exploded_source_df = Data_Frame.select("dc_id", explode("source"))
exploded_source_df.show()

+------+-------------+--------------------+
| dc_id|          key|               value|
+------+-------------+--------------------+
|dc-101|sensor-igauge|[Sensor attached ...|
|dc-101|  sensor-ipad|[Sensor ipad atta...|
|dc-101| sensor-inest|[Sensor attached ...|
|dc-101|sensor-istick|[Sensor embedded ...|
+------+-------------+--------------------+



In [26]:
#Now working with the value column(which is a struct) and it has various fields
devices_df = exploded_source_df.select("dc_id", "key","value.ip",col("value.id").alias("device_id"),
                        col("value.c02_level").alias("c02_levels"),
                        "value.temp")
devices_df.show()

+------+-------------+---------------+---------+------------------+--------------------+
| dc_id|          key|             ip|device_id|        c02_levels|                temp|
+------+-------------+---------------+---------+------------------+--------------------+
|dc-101|sensor-igauge|    68.28.91.22|       10|[1475, 1476, 1473]|[35, 35, 35, 36, ...|
|dc-101|  sensor-ipad|    67.185.72.1|       13|[1370, 1371, 1372]|[45, 45, 45, 46, ...|
|dc-101| sensor-inest|208.109.163.218|        8|[1346, 1345, 1343]|[40, 40, 40, 40, ...|
|dc-101|sensor-istick| 204.116.105.67|        5|[1574, 1570, 1576]|[30, 30, 30, 30, ...|
+------+-------------+---------------+---------+------------------+--------------------+



In [27]:
devices_df.printSchema()

root
 |-- dc_id: string (nullable = true)
 |-- key: string (nullable = false)
 |-- ip: string (nullable = true)
 |-- device_id: integer (nullable = true)
 |-- c02_levels: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- temp: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [29]:
#Now ceating a temporary table or the view of table
#If you want to have a temporary view that is shared among all sessions and keep alive 
#until the Spark application terminates, you can create a global temporary view.
#Global temporary view is tied to a system preserved database global_temp.
devices_df.createOrReplaceTempView("iot_devices")

In [31]:
#Using higher order SQL functions
df1 = spark.sql("select * from iot_devices")
df1.show()

+------+-------------+---------------+---------+------------------+--------------------+
| dc_id|          key|             ip|device_id|        c02_levels|                temp|
+------+-------------+---------------+---------+------------------+--------------------+
|dc-101|sensor-igauge|    68.28.91.22|       10|[1475, 1476, 1473]|[35, 35, 35, 36, ...|
|dc-101|  sensor-ipad|    67.185.72.1|       13|[1370, 1371, 1372]|[45, 45, 45, 46, ...|
|dc-101| sensor-inest|208.109.163.218|        8|[1346, 1345, 1343]|[40, 40, 40, 40, ...|
|dc-101|sensor-istick| 204.116.105.67|        5|[1574, 1570, 1576]|[30, 30, 30, 30, ...|
+------+-------------+---------------+---------+------------------+--------------------+



In [32]:
#descriptions of df1 dataframe
#Consist of 5 columns and all with datatype string
df1.describe()

DataFrame[summary: string, dc_id: string, key: string, ip: string, device_id: string]

Higher order functions and lambda expressions

In [38]:
#Using the transform function
#Basic structure of transform function is "transform(values, value -> lambda expression)"
#Takes an array and a anonymous function as input(transform apply fn to each element of array)
#we can specify multiple arguments by creating a comma separated list of arguments enclosed by parenthesis like (x, y) -> x + y.
df = spark.sql("select key, ip,temp,transform (temp, t -> ((t * 9) div 5) + 32 ) as f_tem from iot_devices")
#Converting temperature from celsius to fahrenheit scale
df.show()

+-------------+---------------+--------------------+--------------------+
|          key|             ip|                temp|               f_tem|
+-------------+---------------+--------------------+--------------------+
|sensor-igauge|    68.28.91.22|[35, 35, 35, 36, ...|[95, 95, 95, 96, ...|
|  sensor-ipad|    67.185.72.1|[45, 45, 45, 46, ...|[113, 113, 113, 1...|
| sensor-inest|208.109.163.218|[40, 40, 40, 40, ...|[104, 104, 104, 1...|
|sensor-istick| 204.116.105.67|[30, 30, 30, 30, ...|[86, 86, 86, 86, ...|
+-------------+---------------+--------------------+--------------------+



In [46]:
#Changing the CO2 levels in iot_devices
spark.sql("select key,device_id,c02_levels,transform(c02_levels,t->t<500) as low_c02_levels from iot_devices").show()

+-------------+---------+------------------+--------------------+
|          key|device_id|        c02_levels|      low_c02_levels|
+-------------+---------+------------------+--------------------+
|sensor-igauge|       10|[1475, 1476, 1473]|[false, false, fa...|
|  sensor-ipad|       13|[1370, 1371, 1372]|[false, false, fa...|
| sensor-inest|        8|[1346, 1345, 1343]|[false, false, fa...|
|sensor-istick|        5|[1574, 1570, 1576]|[false, false, fa...|
+-------------+---------+------------------+--------------------+



In [48]:
#Using Filters(filter and transform has nearly same syntax)
#Unlike transform() with a boolean expression,filter produces an output array from an input array by only adding elements for which predicate function<T, Boolean> holds
spark.sql("select key,device_id,c02_levels,filter(c02_levels,t->t>1500) as high_c02_levels from iot_devices").show()

+-------------+---------+------------------+------------------+
|          key|device_id|        c02_levels|   high_c02_levels|
+-------------+---------+------------------+------------------+
|sensor-igauge|       10|[1475, 1476, 1473]|                []|
|  sensor-ipad|       13|[1370, 1371, 1372]|                []|
| sensor-inest|        8|[1346, 1345, 1343]|                []|
|sensor-istick|        5|[1574, 1570, 1576]|[1574, 1570, 1576]|
+-------------+---------+------------------+------------------+



Basically the main thing is understanding the difference between the transform function and filter function 
As out transform function return a boolean array having true and false as values in case when we apply some condition 
of > or < but at the same time filter returns an array of the values which satisfy the condition(used to filter
the input based on a condition and retrieve values)->this is the main concern here

In [55]:
#Using exists function
#Return true if any one of the element in the array meets the required condition.
#exists(array<T>, function<T, V, Boolean>) Boolean Return true if predicate function<T, Boolean> holds for any element in input array
spark.sql("select key,device_id,temp,c02_levels,exists(c02_levels,t->t>1500)as existing_values from iot_devices").show()

+-------------+---------+--------------------+------------------+---------------+
|          key|device_id|                temp|        c02_levels|existing_values|
+-------------+---------+--------------------+------------------+---------------+
|sensor-igauge|       10|[35, 35, 35, 36, ...|[1475, 1476, 1473]|          false|
|  sensor-ipad|       13|[45, 45, 45, 46, ...|[1370, 1371, 1372]|          false|
| sensor-inest|        8|[40, 40, 40, 40, ...|[1346, 1345, 1343]|          false|
|sensor-istick|        5|[30, 30, 30, 30, ...|[1574, 1570, 1576]|           true|
+-------------+---------+--------------------+------------------+---------------+



In [None]:
#Using the reduce function
#Aggregate function is similar as reduce function
#Reduce function allows aggregation
#reduce(array<T>, B, function<B, T, B>, function<B, R>)R Reduce the elements of array<T> into a single value R by merging the elements into a buffer B using function<B, T, B> and by applying a finish function<B, R> on the final buffer. The initial value B is determined by a zero expression
#The finalize function is optional, if you do not specify the function the finalize function the identity function (id -> id) is used. This is the only higher-order function that takes two lambda functions.
spark.sql("select key,ip,device_id,c02_levels,reduce(c02_levels, 0, (t, acc)->t + acc,acc->acc div size(c02_levels)) as average_c02_levels from iot_devices sort by average_c02_levels desc").show()

Some of the built in functions for arrays and strings

In [58]:
#Using array_distinct
#Shows only the distinct element
spark.sql("select array_distinct(array(1,2,3,3,1,2,3,4,5,6))").show()

+---------------------------------------------------+
|array_distinct(array(1, 2, 3, 3, 1, 2, 3, 4, 5, 6))|
+---------------------------------------------------+
|                                 [1, 2, 3, 4, 5, 6]|
+---------------------------------------------------+



In [59]:
#Using array_intersects
spark.sql("select array_intersect(array(1,2,3), array(1,3,5))").show()
#Similarly we can use union of two arrays
#use array_except to return elements which are in array 1 but not in array 2

+-----------------------------------------------+
|array_intersect(array(1, 2, 3), array(1, 3, 5))|
+-----------------------------------------------+
|                                         [1, 3]|
+-----------------------------------------------+



In [None]:
#Using array_join
#Concatenates the elements of the given array using the delimiter and an optional string to replace nulls. 
#If no value is set for null replacement, any null value is filtered.
spark.sql("select array_join(array("a", "b", "c")," ",",")").show()

In [63]:
#Similarly we can find max ,min,position of an element,remove elements ,sort etc
#Array overlaps return true if any of the element matches in both the arrays

In [64]:
#We can use concat to concatenate the arrays or strings also we can use flatten to transform array of arrays into single array

In [65]:
#Using the zip function
spark.sql("select zip_with(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x))").show()
#Reversing the locations of x and y.

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|zip_with(array(1, 2, 3), array(a, b, c), lambdafunction(named_struct(y, namedlambdavariable(), x, namedlambdavariable()), namedlambdavariable(), namedlambdavariable()))|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                    [[a, 1], [b, 2], ...|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+



In [66]:
spark.sql("select zip_with(array('a', 'b', 'c'), array('1', '2', '2'), (x, y) -> concat(x, y))").show()

+------------------------------------------------------------------------------------------------------------------------------------------------------------+
|zip_with(array(a, b, c), array(1, 2, 2), lambdafunction(concat(namedlambdavariable(), namedlambdavariable()), namedlambdavariable(), namedlambdavariable()))|
+------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                [a1, b2, c2]|
+------------------------------------------------------------------------------------------------------------------------------------------------------------+



In [67]:
#Also there are several other array function to easy the things