# Part-I

# Objective: Analysing IoT Data with Spark Sql

The objective of this case study is to analyse sensor data, which is presented in JSON format, using Spark SQL.


Dataset (iot_devices.json): The dataset has the following attributes

* Device ID
* Device Name
* IP Address
* Cca2 - country code
* Cca3 - country name
* Cn - Full name of country
* Latitude
* Longitude
* Scale
* Temperature
* Humidity
* Battery Level
* CO2 level
* LCD Status
* Timestamp

Task:

1. Read the data into a Dataframe.
2. Convert the Dataframe into a temporary view called iot.
3. Count how many devices are there from each country and display the output.
4. Display all the countries whose carbon dioxide level is more than 1400. Sort the output in descending order.
5. Select all countries' devices with high-levels of C02 and group by cca3 and order by device_ids (Hint: For high CO2 level, the LCD status will be RED).
find out all devices in countries whose batteries need replacements.


In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz

[33m0% [Working][0m            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
[33m0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn[0m                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
[33m0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.18[0m                                                                               Hit:3 http://security.ubuntu.com/ubuntu focal-security InRelease
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
Hit:5 http://archive.ubuntu.com/ubuntu focal InRelease
Get:6 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Hit:7 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Hit:8 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease
Get:9 http://archive.ubuntu.com/ubuntu focal-backports

In [None]:
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark



In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
findspark.find()

'/content/spark-3.1.1-bin-hadoop3.2'

In [None]:
# Import necessary libraries

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark import SparkFiles

In [None]:
spark = SparkSession.builder.appName("Analysing Iot Data").getOrCreate()
spark

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
## 1. Read the data into a Dataframe

# Set the output file path
output_file_path = '/content/drive/MyDrive/Colab Notebooks/iot_devices.json'

# Read the file into a Spark DataFrame
df = spark.read.json(output_file_path)

# Display the DataFrame
df.show(10)

+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|           cn|device_id|         device_name|humidity|             ip|latitude|   lcd|longitude|  scale|temp|    timestamp|
+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|            8|      868|  US| USA|United States|        1|meter-gauge-1xbYRYcj|      51|   68.161.225.1|    38.0| green|    -97.0|Celsius|  34|1458444054093|
|            7|     1473|  NO| NOR|       Norway|        2|   sensor-pad-2n2Pea|      70|  213.161.254.1|   62.47|   red|     6.15|Celsius|  11|1458444054119|
|            2|     1556|  IT| ITA|        Italy|        3| device-mac-36TWSKiT|      44|      88.36.5.1|   42.83|   red|    12.83|Celsius|  19|1458444054120|
|            6|     1080|  US| USA|United Stat

In [None]:
df.printSchema()

root
 |-- battery_level: long (nullable = true)
 |-- c02_level: long (nullable = true)
 |-- cca2: string (nullable = true)
 |-- cca3: string (nullable = true)
 |-- cn: string (nullable = true)
 |-- device_id: long (nullable = true)
 |-- device_name: string (nullable = true)
 |-- humidity: long (nullable = true)
 |-- ip: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- lcd: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- scale: string (nullable = true)
 |-- temp: long (nullable = true)
 |-- timestamp: long (nullable = true)



In [None]:
# 2. Convert the Dataframe into a temporary view called iot.

df.createOrReplaceTempView("IOT")

In [None]:
df1 = spark.sql("SELECT * FROM IOT")
df1.show()

+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|           cn|device_id|         device_name|humidity|             ip|latitude|   lcd|longitude|  scale|temp|    timestamp|
+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|            8|      868|  US| USA|United States|        1|meter-gauge-1xbYRYcj|      51|   68.161.225.1|    38.0| green|    -97.0|Celsius|  34|1458444054093|
|            7|     1473|  NO| NOR|       Norway|        2|   sensor-pad-2n2Pea|      70|  213.161.254.1|   62.47|   red|     6.15|Celsius|  11|1458444054119|
|            2|     1556|  IT| ITA|        Italy|        3| device-mac-36TWSKiT|      44|      88.36.5.1|   42.83|   red|    12.83|Celsius|  19|1458444054120|
|            6|     1080|  US| USA|United Stat

In [None]:
df1.columns

['battery_level',
 'c02_level',
 'cca2',
 'cca3',
 'cn',
 'device_id',
 'device_name',
 'humidity',
 'ip',
 'latitude',
 'lcd',
 'longitude',
 'scale',
 'temp',
 'timestamp']

In [None]:
# 3. Count how many devices are there from each country and display the output

spark.sql(" SELECT cn, COUNT(device_name) AS no_of_devices FROM IOT GROUP BY cn ORDER BY cn").show(50)

+--------------------+-------------+
|                  cn|no_of_devices|
+--------------------+-------------+
|                    |         1810|
|         Afghanistan|           19|
|             Albania|           32|
|             Algeria|           34|
|      American Samoa|            3|
|             Andorra|            4|
|              Angola|           28|
|            Anguilla|            7|
| Antigua and Barbuda|           60|
|           Argentina|          978|
|             Armenia|           34|
|               Aruba|            8|
|           Australia|         3119|
|             Austria|         1462|
|          Azerbaijan|           49|
|             Bahamas|           18|
|             Bahrain|           55|
|          Bangladesh|          153|
|            Barbados|           38|
|             Belarus|          123|
|             Belgium|          666|
|              Belize|           13|
|               Benin|            3|
|             Bermuda|           53|
|

In [None]:
spark.sql("SELECT cn, COUNT(cn) FROM IOT GROUP BY cn").show(5)

+-------------------+---------+
|                 cn|count(cn)|
+-------------------+---------+
|             Russia|     5989|
|           Paraguay|       32|
|           Anguilla|        7|
|              Macao|       33|
|U.S. Virgin Islands|       51|
+-------------------+---------+
only showing top 5 rows



In [None]:
spark.sql("select cca3, count(distinct device_id) as device_id from IOT group by cca3 order by device_id desc limit 50").show();


+----+---------+
|cca3|device_id|
+----+---------+
| USA|    70405|
| CHN|    14455|
| JPN|    12100|
| KOR|    11879|
| DEU|     7942|
| GBR|     6486|
| CAN|     6041|
| RUS|     5989|
| FRA|     5305|
| BRA|     3224|
| AUS|     3119|
| ITA|     2915|
| SWE|     2880|
| POL|     2744|
| NLD|     2488|
| ESP|     2310|
| TWN|     2128|
| IND|     1867|
| CZE|     1507|
| NOR|     1487|
+----+---------+
only showing top 20 rows



In [None]:
# 4. Display all the countries whose carbon dioxide level is more than 1400. Sort the output in descending order.

count = "Count of Devices with CO2 more than 1400"

df3 = spark.sql("SELECT cn, COUNT(distinct device_id) AS count FROM IOT WHERE c02_level > 1400 GROUP BY cn ORDER BY count DESC")

df3.show()


+-----------------+-----+
|               cn|count|
+-----------------+-----+
|    United States|17050|
|            China| 3616|
|Republic of Korea| 2942|
|            Japan| 2935|
|          Germany| 1966|
|   United Kingdom| 1660|
|           Canada| 1564|
|           Russia| 1508|
|           France| 1353|
|           Brazil|  856|
|        Australia|  769|
|           Sweden|  724|
|            Italy|  713|
|           Poland|  664|
|      Netherlands|  646|
|            Spain|  586|
|           Taiwan|  542|
|            India|  446|
|                 |  427|
|           Norway|  399|
+-----------------+-----+
only showing top 20 rows



In [None]:
spark.sql("SELECT cn,c02_level FROM IOT WHERE c02_level > 1400 ORDER BY c02_level DESC").show(20)

+--------------+---------+
|            cn|c02_level|
+--------------+---------+
|        Poland|     1599|
|         Spain|     1599|
| United States|     1599|
|         Japan|     1599|
|       Germany|     1599|
|   Philippines|     1599|
|        Canada|     1599|
|        Canada|     1599|
| United States|     1599|
| United States|     1599|
| United States|     1599|
|        France|     1599|
| United States|     1599|
|Czech Republic|     1599|
|         China|     1599|
|         Japan|     1599|
|        Turkey|     1599|
| United States|     1599|
|Czech Republic|     1599|
| United States|     1599|
+--------------+---------+
only showing top 20 rows



In [None]:
# Select all countries' devices with high-levels of C02 and group by cca3 and order by device_ids
# (Hint: For high CO2 level, the LCD status will be RED).

In [None]:
spark.sql("SELECT cca3, count(distinct device_id) as device_id FROM IOT WHERE lcd == 'red' GROUP BY cca3 ORDER BY device_id").show()

+----+---------+
|cca3|device_id|
+----+---------+
| LBR|        1|
| TLS|        1|
| AND|        1|
| CIV|        1|
| MHL|        1|
| SLB|        1|
| TJK|        1|
| FSM|        1|
| IOT|        1|
| TKM|        1|
| GGY|        1|
| GIB|        1|
| FLK|        1|
| REU|        1|
| GAB|        1|
| TGO|        2|
| LCA|        2|
| FJI|        2|
| YEM|        2|
| IRQ|        2|
+----+---------+
only showing top 20 rows



In [None]:
spark.sql("SELECT cn,device_name,c02_level,lcd FROM IOT WHERE lcd == 'red' ").show(50)

+-----------------+--------------------+---------+---+
|               cn|         device_name|c02_level|lcd|
+-----------------+--------------------+---------+---+
|           Norway|   sensor-pad-2n2Pea|     1473|red|
|            Italy| device-mac-36TWSKiT|     1556|red|
|            Japan|sensor-pad-8xUD6p...|     1536|red|
|    United States|sensor-pad-10Bsyw...|     1470|red|
|            Italy|meter-gauge-11dlM...|     1544|red|
|    United States|sensor-pad-16aXmI...|     1425|red|
|    United States|meter-gauge-17zb8...|     1466|red|
|    United States|meter-gauge-19eg1...|     1531|red|
|            Japan|  sensor-pad-22oWV2D|     1522|red|
|           Canada|sensor-pad-24Pytz...|     1511|red|
|Republic of Korea|  device-mac-27P5wf2|     1597|red|
|Republic of Korea|sensor-pad-28Tsud...|     1502|red|
|          Ukraine|meter-gauge-47WsF9s8|     1454|red|
|           Sweden|meter-gauge-534fD...|     1446|red|
|    United States|sensor-pad-5410CW...|     1436|red|
|    Unite

In [None]:
 # find out all devices in countries whose batteries need replacements.

df.select("battery_level").show(10)

+-------------+
|battery_level|
+-------------+
|            8|
|            7|
|            2|
|            6|
|            4|
|            3|
|            3|
|            0|
|            3|
|            7|
+-------------+
only showing top 10 rows



In [None]:
spark.sql("SELECT DISTINCT battery_level FROM IOT ASC").show()

+-------------+
|battery_level|
+-------------+
|            0|
|            7|
|            6|
|            9|
|            5|
|            1|
|            3|
|            8|
|            2|
|            4|
+-------------+



In [None]:
spark.sql("SELECT cn, COUNT(DISTINCT device_id) as device_id FROM IOT where battery_level == 0 group by cn order by device_id desc").show(50)


+--------------------+---------+
|                  cn|device_id|
+--------------------+---------+
|       United States|     6858|
|               China|     1415|
|   Republic of Korea|     1217|
|               Japan|     1210|
|             Germany|      760|
|      United Kingdom|      650|
|              Canada|      612|
|              Russia|      600|
|              France|      582|
|              Brazil|      374|
|           Australia|      322|
|              Sweden|      293|
|               Italy|      287|
|              Poland|      278|
|         Netherlands|      251|
|               Spain|      223|
|              Taiwan|      207|
|               India|      189|
|                    |      177|
|           Hong Kong|      149|
|             Ukraine|      149|
|         Switzerland|      143|
|            Bulgaria|      143|
|             Austria|      132|
|      Czech Republic|      130|
|           Argentina|      129|
|              Norway|      122|
|         

In [None]:
spark.sql("SELECT cn,device_name,battery_level FROM IOT WHERE battery_level == 0").show()

+-----------------+--------------------+-------------+
|               cn|         device_name|battery_level|
+-----------------+--------------------+-------------+
|            Japan|sensor-pad-8xUD6p...|            0|
|    United States|sensor-pad-12Y2kIm0o|            0|
|    United States|meter-gauge-17zb8...|            0|
|          Germany| sensor-pad-448DeWGL|            0|
|           Canada|sensor-pad-80TY4d...|            0|
|Republic of Korea| sensor-pad-92vxuq7e|            0|
|    United States|sensor-pad-98mJQA...|            0|
|            Japan|meter-gauge-1075K...|            0|
|        Australia|device-mac-111WYt...|            0|
|    United States| sensor-pad-11663yUf|            0|
|            China|device-mac-117mcc...|            0|
|    United States|device-mac-123zvY...|            0|
|    United States|meter-gauge-131Lb...|            0|
|    United States|sensor-pad-1344UG...|            0|
|          Germany|sensor-pad-140tq1s6t|            0|
|         