In [None]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

# Initialize findspark
import findspark
findspark.init()

# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

0% [Working]            Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Waiting for headers] [Waiting for headers] [                                                                                                    Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Waiting for headers] [Connected to ppa.launc0% [Waiting for headers] [Waiting for headers] [Connected to ppa.launchpadcontent.net (185.125.190.8                                                                                                    Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc

In [None]:
# Create a Spark session
spark = SparkSession.builder.appName("IoT Data Analysis").getOrCreate()

In [None]:
# Load the JSON data into a DataFrame
iot_df = spark.read.json("/content/iot_devices.json")


In [None]:
# Create a temporary view called 'iot'
iot_df.createOrReplaceTempView("iot")

In [None]:
# Task 1: Count how many devices are there from each country and display the output
devices_per_country = spark.sql("""
    SELECT Cn AS Country, COUNT(Device_ID) AS Device_Count
    FROM iot
    GROUP BY Cn
    ORDER BY Device_Count DESC
""")
devices_per_country.show()

+-----------------+------------+
|          Country|Device_Count|
+-----------------+------------+
|    United States|        4559|
|            China|         964|
|            Japan|         806|
|Republic of Korea|         782|
|          Germany|         561|
|   United Kingdom|         480|
|           Canada|         429|
|           Russia|         408|
|           France|         378|
|           Brazil|         249|
|            Italy|         210|
|        Australia|         208|
|           Sweden|         197|
|      Netherlands|         159|
|           Poland|         154|
|            Spain|         149|
|           Taiwan|         145|
|            India|         139|
|          Austria|         139|
|                 |         128|
+-----------------+------------+
only showing top 20 rows



In [None]:
# Check the schema of the DataFrame
iot_df.printSchema()


root
 |-- _corrupt_record: string (nullable = true)
 |-- battery_level: long (nullable = true)
 |-- c02_level: long (nullable = true)
 |-- cca2: string (nullable = true)
 |-- cca3: string (nullable = true)
 |-- cn: string (nullable = true)
 |-- device_id: long (nullable = true)
 |-- device_name: string (nullable = true)
 |-- humidity: long (nullable = true)
 |-- ip: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- lcd: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- scale: string (nullable = true)
 |-- temp: long (nullable = true)
 |-- timestamp: long (nullable = true)



In [None]:
# Task 2: Display all the countries whose carbon dioxide level is more than 1400. Sort the output in descending order.
high_co2_countries = spark.sql("""
    SELECT Country
    FROM (
        SELECT DISTINCT cn AS Country, c02_level
        FROM iot
        WHERE c02_level > 1400
        ORDER BY c02_level DESC
    )
""")
high_co2_countries.show()


+-----------------+
|          Country|
+-----------------+
|   Czech Republic|
|           Sweden|
|    United States|
|        Australia|
|            Japan|
|           France|
|Republic of Korea|
|           Canada|
|      Philippines|
|          Germany|
|    United States|
|            Japan|
|           Brazil|
|            China|
|          Romania|
|Republic of Korea|
|    United States|
|           Taiwan|
|           Canada|
|      Philippines|
+-----------------+
only showing top 20 rows



In [None]:
# Task 3: Select all countries' devices with high levels of CO2 and group by cca3 and order by device_ids
# (For high CO2 level, the LCD status will be RED)
high_co2_devices = spark.sql("""
    SELECT cca3, COUNT(device_id) AS Device_Count
    FROM iot
    WHERE lcd = 'red'
    GROUP BY cca3
    ORDER BY Device_Count DESC
""")
high_co2_devices.show()

+----+------------+
|cca3|Device_Count|
+----+------------+
| USA|        1212|
| CHN|         247|
| JPN|         224|
| KOR|         206|
| DEU|         157|
| CAN|         118|
| RUS|         109|
| FRA|         105|
| GBR|         105|
| BRA|          73|
| AUS|          61|
| ITA|          44|
| SWE|          44|
| NLD|          41|
| ESP|          40|
| POL|          35|
| IND|          34|
| TWN|          33|
| AUT|          33|
| SGP|          30|
+----+------------+
only showing top 20 rows



In [None]:
# Task 4: Find out all devices in countries whose batteries need replacements
# Assuming "batteries need replacements" means Battery Level is less than a certain threshold, e.g., 20%
low_battery_devices = spark.sql("""
    SELECT *
    FROM iot
    WHERE Battery_Level < 20
""")
low_battery_devices.show()


+---------------+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|_corrupt_record|battery_level|c02_level|cca2|cca3|           cn|device_id|         device_name|humidity|             ip|latitude|   lcd|longitude|  scale|temp|    timestamp|
+---------------+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|           null|            8|      868|  US| USA|United States|        1|meter-gauge-1xbYRYcj|      51|   68.161.225.1|    38.0| green|    -97.0|Celsius|  34|1458444054093|
|           null|            7|     1473|  NO| NOR|       Norway|        2|   sensor-pad-2n2Pea|      70|  213.161.254.1|   62.47|   red|     6.15|Celsius|  11|1458444054119|
|           null|            2|     1556|  IT| ITA|        Italy|        3| device-mac-36TWSKiT|      44|      88.36.5.1|   4