In [1]:
from pyspark.sql import DataFrame, SparkSession

spark = SparkSession \
       .builder \
       .appName("iot_devices") \
       .getOrCreate()

spark

## 1.Read the data into a Dataframe.

In [2]:
path = "iot_devices.json"
iotDF = spark.read.json(path)

In [3]:
iotDF.show(5, False)

+-------------+---------+----+----+-------------+---------+---------------------+--------+-------------+--------+------+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|cn           |device_id|device_name          |humidity|ip           |latitude|lcd   |longitude|scale  |temp|timestamp    |
+-------------+---------+----+----+-------------+---------+---------------------+--------+-------------+--------+------+---------+-------+----+-------------+
|8            |868      |US  |USA |United States|1        |meter-gauge-1xbYRYcj |51      |68.161.225.1 |38.0    |green |-97.0    |Celsius|34  |1458444054093|
|7            |1473     |NO  |NOR |Norway       |2        |sensor-pad-2n2Pea    |70      |213.161.254.1|62.47   |red   |6.15     |Celsius|11  |1458444054119|
|2            |1556     |IT  |ITA |Italy        |3        |device-mac-36TWSKiT  |44      |88.36.5.1    |42.83   |red   |12.83    |Celsius|19  |1458444054120|
|6            |1080     |US  |USA |United States|4  

In [4]:
# extracting number of rows from the Dataframe
row = iotDF.count()
   
# extracting number of columns from the Dataframe
col = len(iotDF.columns)
 
# printing
print(f'Dimension of the Dataframe is: {(row,col)}')
print(f'Number of Rows are: {row}')
print(f'Number of Columns are: {col}')

Dimension of the Dataframe is: (198164, 15)
Number of Rows are: 198164
Number of Columns are: 15


In [5]:
iotDF.printSchema()

root
 |-- battery_level: long (nullable = true)
 |-- c02_level: long (nullable = true)
 |-- cca2: string (nullable = true)
 |-- cca3: string (nullable = true)
 |-- cn: string (nullable = true)
 |-- device_id: long (nullable = true)
 |-- device_name: string (nullable = true)
 |-- humidity: long (nullable = true)
 |-- ip: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- lcd: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- scale: string (nullable = true)
 |-- temp: long (nullable = true)
 |-- timestamp: long (nullable = true)



## 2.Convert the Dataframe into a temporary view called iot.

In [6]:
iotDF.createOrReplaceTempView("iot")

## 3.Count how many devices are there from each country and display the output.

In [7]:
devicesfromcountryDF = spark.sql("SELECT cn, COUNT(device_name) FROM iot GROUP BY cn")

In [8]:
# extracting number of rows from the Dataframe
row = devicesfromcountryDF.count()
   
# extracting number of columns from the Dataframe
col = len(devicesfromcountryDF.columns)
 
# printing
print(f'Dimension of the Dataframe is: {(row,col)}')
print(f'Number of Rows are: {row}')
print(f'Number of Columns are: {col}')

Dimension of the Dataframe is: (209, 2)
Number of Rows are: 209
Number of Columns are: 2


In [9]:
devicesfromcountryDF.show(209, False)

+--------------------------------+------------------+
|cn                              |count(device_name)|
+--------------------------------+------------------+
|Russia                          |5989              |
|Paraguay                        |32                |
|Anguilla                        |7                 |
|Macao                           |33                |
|U.S. Virgin Islands             |51                |
|Yemen                           |19                |
|British Indian Ocean Territory  |2                 |
|Senegal                         |25                |
|Sweden                          |2880              |
|Republic of Korea               |11879             |
|Philippines                     |681               |
|Jersey                          |34                |
|Singapore                       |1076              |
|Malaysia                        |726               |
|Fiji                            |11                |
|Turkey                     

## 4.Display all the countries whose carbon dioxide level is more than 1400. Sort the output in descending order.

In [10]:
countriesCO2DF = spark.sql("SELECT cn, COUNT(c02_level) FROM iot WHERE c02_level > 1400 GROUP BY cn ORDER BY COUNT(c02_level) DESC")

In [11]:
# extracting number of rows from the Dataframe
row = countriesCO2DF.count()
   
# extracting number of columns from the Dataframe
col = len(countriesCO2DF.columns)
 
# printing
print(f'Dimension of the Dataframe is: {(row,col)}')
print(f'Number of Rows are: {row}')
print(f'Number of Columns are: {col}')

Dimension of the Dataframe is: (190, 2)
Number of Rows are: 190
Number of Columns are: 2


In [12]:
countriesCO2DF.show(190, False)

+--------------------------------+----------------+
|cn                              |count(c02_level)|
+--------------------------------+----------------+
|United States                   |17050           |
|China                           |3616            |
|Republic of Korea               |2942            |
|Japan                           |2935            |
|Germany                         |1966            |
|United Kingdom                  |1660            |
|Canada                          |1564            |
|Russia                          |1508            |
|France                          |1353            |
|Brazil                          |856             |
|Australia                       |769             |
|Sweden                          |724             |
|Italy                           |713             |
|Poland                          |664             |
|Netherlands                     |646             |
|Spain                           |586             |
|Taiwan     

## 5.Select all countries' devices with high-levels of C02 and group by cca3 and order by device_ids

#### Hint: For high CO2 level, the LCD status will be RED

In [13]:
LCD = spark.sql("SELECT cca3, COUNT(device_id) FROM iot WHERE lcd LIKE '%red%' GROUP BY cca3 ORDER BY COUNT(device_id) DESC")

In [14]:
# extracting number of rows from the Dataframe
row = LCD.count()
   
# extracting number of columns from the Dataframe
col = len(LCD.columns)
 
# printing
print(f'Dimension of the Dataframe is: {(row,col)}')
print(f'Number of Rows are: {row}')
print(f'Number of Columns are: {col}')

Dimension of the Dataframe is: (186, 2)
Number of Rows are: 186
Number of Columns are: 2


In [15]:
LCD.show(186, False)

+----+----------------+
|cca3|count(device_id)|
+----+----------------+
|USA |17489           |
|CHN |3616            |
|KOR |2942            |
|JPN |2935            |
|DEU |1966            |
|GBR |1660            |
|CAN |1564            |
|RUS |1508            |
|FRA |1353            |
|BRA |856             |
|AUS |769             |
|SWE |724             |
|ITA |713             |
|POL |664             |
|NLD |646             |
|ESP |586             |
|TWN |542             |
|IND |446             |
|NOR |399             |
|UKR |373             |
|AUT |360             |
|CZE |359             |
|HKG |350             |
|CHE |343             |
|BGR |307             |
|ROU |306             |
|MEX |297             |
|DNK |288             |
|SGP |285             |
|THA |228             |
|IDN |226             |
|ARG |223             |
|HUN |210             |
|MYS |187             |
|TUR |171             |
|ISR |169             |
|FIN |157             |
|BEL |153             |
|PHL |145       

## 6.Find out all devices in countries whose batteries need replacements.

In [16]:
iotDF.select("battery_level").distinct().show()

+-------------+
|battery_level|
+-------------+
|            0|
|            7|
|            6|
|            9|
|            5|
|            1|
|            3|
|            8|
|            2|
|            4|
+-------------+



In [17]:
battery_replacement = spark.sql("SELECT cn, device_name, device_id FROM iot WHERE battery_level = 0")

In [18]:
# extracting number of rows from the Dataframe
row = battery_replacement.count()
   
# extracting number of columns from the Dataframe
col = len(battery_replacement.columns)
 
# printing
print(f'Dimension of the Dataframe is: {(row,col)}')
print(f'Number of Rows are: {row}')
print(f'Number of Columns are: {col}')

Dimension of the Dataframe is: (19851, 3)
Number of Rows are: 19851
Number of Columns are: 3


In [19]:
battery_replacement.show(19851, False)

+------------------------------+----------------------------+---------+
|cn                            |device_name                 |device_id|
+------------------------------+----------------------------+---------+
|Japan                         |sensor-pad-8xUD6pzsQI       |8        |
|United States                 |sensor-pad-12Y2kIm0o        |12       |
|United States                 |meter-gauge-17zb8Fghhl      |17       |
|Germany                       |sensor-pad-448DeWGL         |44       |
|Canada                        |sensor-pad-80TY4dWSMH       |80       |
|Republic of Korea             |sensor-pad-92vxuq7e         |92       |
|United States                 |sensor-pad-98mJQAfJpfW      |98       |
|Japan                         |meter-gauge-1075KSUDRjPa    |107      |
|Australia                     |device-mac-111WYtjxe1b      |111      |
|United States                 |sensor-pad-11663yUf         |116      |
|China                         |device-mac-117mccYyRo       |117