In [2]:
from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark import SparkFiles


spark = SparkSession \
       .builder \
       .appName("Analysing IoT Data with Spark Sql") \
       .getOrCreate()

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/09 06:35:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
#Read the data into a Dataframe.
iot_df = spark.read.json("iot_devices.json")

                                                                                

In [15]:
#top 10 records to check data
iot_df.select("*").show(10)

+---------------+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|_corrupt_record|battery_level|c02_level|cca2|cca3|           cn|device_id|         device_name|humidity|             ip|latitude|   lcd|longitude|  scale|temp|    timestamp|
+---------------+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|           NULL|            8|      868|  US| USA|United States|        1|meter-gauge-1xbYRYcj|      51|   68.161.225.1|    38.0| green|    -97.0|Celsius|  34|1458444054093|
|           NULL|            7|     1473|  NO| NOR|       Norway|        2|   sensor-pad-2n2Pea|      70|  213.161.254.1|   62.47|   red|     6.15|Celsius|  11|1458444054119|
|           NULL|            2|     1556|  IT| ITA|        Italy|        3| device-mac-36TWSKiT|      44|      88.36.5.1|   4

In [4]:
#Convert the Dataframe into a temporary view called iot.
iot_df.createOrReplaceTempView("iot")

24/06/09 06:35:27 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [5]:
columns_list = iot_df.columns
print("List of columns:")
for col in columns_list:
    print(col)

List of columns:
_corrupt_record
battery_level
c02_level
cca2
cca3
cn
device_id
device_name
humidity
ip
latitude
lcd
longitude
scale
temp
timestamp


In [6]:
# Count Devices by Country
count_by_country = spark.sql("""
    SELECT Cca3 AS Country, COUNT(*) AS DeviceCount 
    FROM iot 
    GROUP BY Cca3
""")
count_by_country.show()

[Stage 1:>                                                          (0 + 2) / 2]

+-------+-----------+
|Country|DeviceCount|
+-------+-----------+
|    HTI|         11|
|    PSE|         23|
|    POL|       2583|
|    LVA|        322|
|    BRB|         36|
|    JAM|         42|
|    ZMB|          7|
|    BRA|       3024|
|    ARM|         33|
|    MOZ|         21|
|    JOR|         43|
|    CUB|         15|
|    FRA|       4989|
|    ABW|          6|
|    TCA|          1|
|    BRN|         17|
|    COD|          7|
|    FSM|          2|
|    BOL|         98|
|    URY|        105|
+-------+-----------+
only showing top 20 rows



                                                                                

In [7]:
#High CO2 Level Countries
high_co2_countries = spark.sql("""
    SELECT DISTINCT c02_level, Cca3 AS Country 
    FROM iot 
    WHERE c02_level > 1400 
    ORDER BY c02_level DESC
""")
high_co2_countries.show()

[Stage 4:>                                                          (0 + 2) / 2]

+---------+-------+
|c02_level|Country|
+---------+-------+
|     1599|    ESP|
|     1599|    NOR|
|     1599|    KAZ|
|     1599|    CHN|
|     1599|    DEU|
|     1599|    BMU|
|     1599|    KOR|
|     1599|    FRA|
|     1599|    POL|
|     1599|    NLD|
|     1599|    CZE|
|     1599|    IND|
|     1599|    MEX|
|     1599|    FIN|
|     1599|    HUN|
|     1599|    JPN|
|     1599|    ROU|
|     1599|    BGR|
|     1599|    CAN|
|     1599|    KNA|
+---------+-------+
only showing top 20 rows



                                                                                

In [12]:
# Select Devices with High CO2 and Group by Country
high_co2_devices = spark.sql("""
    SELECT Cca3 AS Country, Device_ID, LCD
    FROM iot 
    WHERE LCD = 'red' 
    GROUP BY Cca3, Device_ID, LCD 
    ORDER BY Device_ID
""")
high_co2_devices.show()



+-------+---------+---+
|Country|Device_ID|LCD|
+-------+---------+---+
|    NOR|        2|red|
|    ITA|        3|red|
|    JPN|        8|red|
|    USA|       10|red|
|    ITA|       11|red|
|    USA|       16|red|
|    USA|       17|red|
|    USA|       19|red|
|    JPN|       22|red|
|    CAN|       24|red|
|    KOR|       27|red|
|    KOR|       28|red|
|    UKR|       47|red|
|    SWE|       53|red|
|    USA|       54|red|
|    USA|       57|red|
|    USA|       64|red|
|    CZE|       66|red|
|    IND|       77|red|
|    KOR|       78|red|
+-------+---------+---+
only showing top 20 rows



                                                                                

In [9]:
#Find Devices Needing Battery Replacements
devices_battery_replacement = spark.sql("""
    SELECT * 
    FROM iot 
    WHERE Battery_Level < 20
""")
devices_battery_replacement.show()


+---------------+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|_corrupt_record|battery_level|c02_level|cca2|cca3|           cn|device_id|         device_name|humidity|             ip|latitude|   lcd|longitude|  scale|temp|    timestamp|
+---------------+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|           NULL|            8|      868|  US| USA|United States|        1|meter-gauge-1xbYRYcj|      51|   68.161.225.1|    38.0| green|    -97.0|Celsius|  34|1458444054093|
|           NULL|            7|     1473|  NO| NOR|       Norway|        2|   sensor-pad-2n2Pea|      70|  213.161.254.1|   62.47|   red|     6.15|Celsius|  11|1458444054119|
|           NULL|            2|     1556|  IT| ITA|        Italy|        3| device-mac-36TWSKiT|      44|      88.36.5.1|   4

                                                                                