# **IOT Devices Clustering Using Pyspark**


##### Load the data


In [None]:
# read the json file and create the dataframe
file_location = "/FileStore/tables/iot_devices.json"
file_type = "json"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

battery_level,c02_level,cca2,cca3,cn,device_id,device_name,humidity,ip,latitude,lcd,longitude,scale,temp,timestamp
8,868,US,USA,United States,1,meter-gauge-1xbYRYcj,51,68.161.225.1,38.0,green,-97.0,Celsius,34,1458444054093
7,1473,NO,NOR,Norway,2,sensor-pad-2n2Pea,70,213.161.254.1,62.47,red,6.15,Celsius,11,1458444054119
2,1556,IT,ITA,Italy,3,device-mac-36TWSKiT,44,88.36.5.1,42.83,red,12.83,Celsius,19,1458444054120
6,1080,US,USA,United States,4,sensor-pad-4mzWkz,32,66.39.173.154,44.06,yellow,-121.32,Celsius,28,1458444054121
4,931,PH,PHL,Philippines,5,therm-stick-5gimpUrBB,62,203.82.41.9,14.58,green,120.97,Celsius,25,1458444054122
3,1210,US,USA,United States,6,sensor-pad-6al7RTAobR,51,204.116.105.67,35.93,yellow,-85.46,Celsius,27,1458444054122
3,1129,CN,CHN,China,7,meter-gauge-7GeDoanM,26,220.173.179.1,22.82,yellow,108.32,Celsius,18,1458444054123
0,1536,JP,JPN,Japan,8,sensor-pad-8xUD6pzsQI,35,210.173.177.1,35.69,red,139.69,Celsius,27,1458444054123
3,807,JP,JPN,Japan,9,device-mac-9GcjZ2pw,85,118.23.68.227,35.69,green,139.69,Celsius,13,1458444054124
7,1470,US,USA,United States,10,sensor-pad-10BsywSYUF,56,208.109.163.218,33.61,red,-111.89,Celsius,26,1458444054125


In [None]:
# Create a view or table

temp_table_name = "iot_devices_json"

df.createOrReplaceTempView(temp_table_name)

In [None]:
df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 198164 entries, 0 to 198163
Data columns (total 15 columns):
 #   Column         Non-Null Count   Dtype  
---  ------         --------------   -----  
 0   battery_level  198164 non-null  int64  
 1   c02_level      198164 non-null  int64  
 2   cca2           198164 non-null  object 
 3   cca3           198164 non-null  object 
 4   cn             198164 non-null  object 
 5   device_id      198164 non-null  int64  
 6   device_name    198164 non-null  object 
 7   humidity       198164 non-null  int64  
 8   ip             198164 non-null  object 
 9   latitude       198164 non-null  float64
 10  lcd            198164 non-null  object 
 11  longitude      198164 non-null  float64
 12  scale          198164 non-null  object 
 13  temp           198164 non-null  int64  
 14  timestamp      198164 non-null  int64  
dtypes: float64(2), int64(6), object(7)
memory usage: 22.7+ MB



##### Count how many sensor pads are from Poland --> 1413

In [None]:
from pyspark.sql.functions import col

sensor_pads_in_poland = df.filter(
    (col("cn") == "Poland") &
    col("device_name").contains("sensor-pad")
)

display(sensor_pads_in_poland)

battery_level,c02_level,cca2,cca3,cn,device_id,device_name,humidity,ip,latitude,lcd,longitude,scale,temp,timestamp
7,1036,PL,POL,Poland,170,sensor-pad-1703fywiW,63,212.87.11.82,52.25,yellow,21.0,Celsius,32,1458444054223
0,1572,PL,POL,Poland,378,sensor-pad-378SgqAQkyHZG,71,83.238.122.21,52.23,red,21.02,Celsius,15,1458444054306
3,1067,PL,POL,Poland,566,sensor-pad-566TPTBfbc08,31,149.156.8.73,50.08,yellow,19.92,Celsius,20,1458444054360
8,1556,PL,POL,Poland,790,sensor-pad-790HqVUEJ7aUj,61,79.139.16.1,50.26,red,19.03,Celsius,24,1458444054409
2,835,PL,POL,Poland,794,sensor-pad-7944KFOZraAwh,66,83.142.138.254,52.23,green,21.02,Celsius,31,1458444054409
4,975,PL,POL,Poland,892,sensor-pad-892TMLKM,45,86.111.204.1,52.23,green,21.02,Celsius,24,1458444054427
9,1462,PL,POL,Poland,1220,sensor-pad-1220FrKtAyl0,46,87.204.248.97,52.23,red,21.02,Celsius,17,1458444054477
6,1074,PL,POL,Poland,1520,sensor-pad-1520qJvCcerX8,64,62.121.129.133,52.23,yellow,21.02,Celsius,24,1458444054531
4,841,PL,POL,Poland,1602,sensor-pad-1602OBB01C,37,212.191.229.61,51.75,green,19.47,Celsius,21,1458444054550
8,981,PL,POL,Poland,1684,sensor-pad-1684gz1MPZD6w,56,80.52.227.177,52.23,green,21.02,Celsius,31,1458444054563


In [None]:
# Count the sensor pads from portland
count_sensor_pads = sensor_pads_in_poland.count()
print(f"Number of sensor pads from Poland: {count_sensor_pads}")

Number of sensor pads from Poland: 1413


In [None]:
# Ensure that there is no duplicates
unique_sensor_pads_in_poland = sensor_pads_in_poland.dropDuplicates(["device_name"])
count_unique_sensor_pads = unique_sensor_pads_in_poland.count()
print(f"Number of unique sensor pads from Poland: {count_unique_sensor_pads}")

Number of unique sensor pads from Poland: 1413



##### Find out how many different lcd in the dataset --> 3 (Green, yellow, and red)

In [None]:
distinct_lcd = df.select("lcd").distinct()
count_distinct_lcd = distinct_lcd.count()
print(f"Unique values in 'lcd' column: ", distinct_lcd.show())
print(f"Number of unique values in 'lcd' column: {count_distinct_lcd}")

+------+
|   lcd|
+------+
| green|
|yellow|
|   red|
+------+

Unique values in 'lcd' column:  None
Number of unique values in 'lcd' column: 3



##### Find 5 countries that have the largest number of MAC devices used:
###### United States (11509), China (2300), Japan (2002), Republic of Korea (1999), and Germany (1314)

In [None]:
mac_devices = df.filter(col("device_name").contains("mac"))
mac_devices.show()

+-------------+---------+----+----+-----------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|               cn|device_id|         device_name|humidity|             ip|latitude|   lcd|longitude|  scale|temp|    timestamp|
+-------------+---------+----+----+-----------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|            2|     1556|  IT| ITA|            Italy|        3| device-mac-36TWSKiT|      44|      88.36.5.1|   42.83|   red|    12.83|Celsius|  19|1458444054120|
|            3|      807|  JP| JPN|            Japan|        9| device-mac-9GcjZ2pw|      85|  118.23.68.227|   35.69| green|   139.69|Celsius|  13|1458444054124|
|            9|     1259|  US| USA|    United States|       15|  device-mac-15se6mZ|      70|    67.185.72.1|   47.41|yellow|   -122.0|Celsius|  13|1458444054128|
|            5|      9

In [None]:
top_countries_with_mac = (
    mac_devices.groupBy("cn")
    .count()
    .orderBy(col("count").desc())
    .limit(5)
)
top_countries_with_mac.show()

+-----------------+-----+
|               cn|count|
+-----------------+-----+
|    United States|11509|
|            China| 2300|
|            Japan| 2002|
|Republic of Korea| 1999|
|          Germany| 1314|
+-----------------+-----+



## Classify countries based on environmental attributes - CO2 levels, humidity, and temperature, using PCA for reducing dimensions and K-means for clustering.
###### Result:
 - PCA Analysis: The PCA plot demonstrates distinct cluster separations, implying effective capture of environmental variations through dimension reduction.
 - Geographical Mapping: The choropleth map shows many countries sharing similar environmental traits. This observation suggests a need for further investigation into regional environmental differences, which can inform tailored environmental policy-making.

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
import matplotlib.pyplot as plt
import plotly.express as px
from pyspark.sql import functions as F

In [None]:
# Group by country and calculate average of selected features
grouped_data = df.groupBy("cca3").agg(
    F.avg("c02_level").alias("c02_level"),
    F.avg("humidity").alias("humidity"),
    F.avg("temp").alias("temp")
)

# Vector Assembler to combine features into a single vector column
assembler = VectorAssembler(inputCols=["c02_level", "humidity", "temp"], outputCol="features")
featured_data = assembler.transform(grouped_data)

# Normalize the features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scaled_data = scaler.fit(featured_data).transform(featured_data)

In [None]:
# Set K = 10 as KMeans for Clustering
k = 10
kmeans = KMeans(featuresCol='scaledFeatures', k=k)
model = kmeans.fit(scaled_data)

# PCA for Dimensionality Reduction
pca = PCA(k=3, inputCol="scaledFeatures", outputCol="pcaFeatures")
pca_model = pca.fit(model.transform(scaled_data))
transformed_data = pca_model.transform(model.transform(scaled_data))

# Convert to Pandas DataFrame for visualization
pandas_df = transformed_data.select("cca3", "prediction", "pcaFeatures").toPandas()

# Extract PCA features for 3D plotting
pandas_df["pca_x"] = pandas_df["pcaFeatures"].apply(lambda x: x[0])
pandas_df["pca_y"] = pandas_df["pcaFeatures"].apply(lambda x: x[1])
pandas_df["pca_z"] = pandas_df["pcaFeatures"].apply(lambda x: x[2])

# 3D Scatter Plot
fig = px.scatter_3d(pandas_df, x='pca_x', y='pca_y', z='pca_z',
                    color='prediction',
                    labels={'pca_x': 'PCA Feature 1', 'pca_y': 'PCA Feature 2', 'pca_z': 'PCA Feature 3'},
                    title='3D Visualization of Country Clustering')
fig.show()

  Unable to convert the field pcaFeatures. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Direct cause: Unsupported type in conversion to Arrow: VectorUDT()
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


In [None]:
# Create a Choropleth Map with Plotly
fig = px.choropleth(
    pandas_df,
    locations='cca3',  # Use the ISO Alpha-3 codes
    color='prediction',  # Color regions by the mode of cluster assignment
    hover_name='cca3',  # cca3 code will appear when hovering over regions
    projection='natural earth',  # Choose a projection style
    title='Country Clustering Based on Environmental Factors'
)

fig.show()