# MSc in CSTE, CIDA option Machine learning & Big Data Assignment

### Analysis of data from an environmental sensor network using Hadoop/Spark

In [None]:
# Start a timer:
import time
start_time = time.time()

In [None]:
import numpy as np
import pandas as pd
from urllib.request import urlopen
from pyspark.sql.session import SparkSession
from pyspark import SparkFiles
from pyspark.sql.functions import *
import urllib.request, json, datetime
from pyspark.ml.clustering import KMeans
from itertools import groupby
from pyspark.ml.feature import VectorAssembler
import branca
from folium.plugins import HeatMap
import folium
import altair as alt

In [None]:
# SETTINGS=
MAX_MEMORY= "8g"

# Spark session builder:
spark = SparkSession.builder.config("spark.executor.memory", MAX_MEMORY).config("spark.driver.memory", MAX_MEMORY).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("OFF")
sc.uiWebUrl

In [None]:
# Locally save instances of the data:
def save_data(wait):
    # URLs of the data:
    url5min = 'https://data.sensor.community/static/v2/data.json'
    url24h = 'https://data.sensor.community/static/v2/data.24h.json'

    # If wait=True, sleep until specific time (e.g. 5pm) before running the next line
    if wait:
        today = datetime.datetime.now()
        exactImportTime = datetime.datetime(today.year, today.month, today.day, 17, 0, 0)
        awaitingTime = exactImportTime - today
        time.sleep(awaitingTime.total_seconds())

    # Download the data, saved as json files:
    today = datetime.datetime.now()
    with urllib.request.urlopen(url5min) as url:
        data5min = json.load(url)
    with open('output/5min/data5min_{}-{}-{}_{}h{}.json'.format(today.year, today.month, today.day, today.hour, str(today.minute).zfill(2)), 'w') as outfile:
        json.dump(data5min, outfile)
    with urllib.request.urlopen(url24h) as url:
        data24h = json.load(url)
    with open('output/24h/data24h_{}-{}-{}_{}h{}.json'.format(today.year, today.month, today.day, today.hour, str(today.minute).zfill(2)), 'w') as outfile:
        json.dump(data24h, outfile)

In [None]:
# Load data from local files and load them into Spark DataFrames:
def load_data(all):
    path = 'output/24h/'
    file1 = 'data24h_2022-11-3_17h00.json'
    file2 = 'data24h_2022-11-4_17h00.json'
    file3 = 'data24h_2022-11-5_17h00.json'
    file4 = 'data24h_2022-11-6_17h00.json'
    file5 = 'data24h_2022-11-7_17h00.json'
    file6 = 'data24h_2022-11-8_17h00.json'
    file7 = 'data24h_2022-11-9_17h00.json'
    file8 = 'data24h_2022-11-10_17h00.json'
    file9 = 'data24h_2022-11-11_17h00.json'
    file10 = 'data24h_2022-11-12_17h00.json'
    file11 = 'data24h_2022-11-13_17h00.json'
    file12 = 'data24h_2022-11-14_17h00.json'
    file13 = 'data24h_2022-11-15_17h00.json'
    file14 = 'data24h_2022-11-16_17h00.json'
    file15 = 'data24h_2022-11-17_17h00.json'

    files = [file1, file2, file3, file4, file5, file6, file7, file8, file9, file10, file11, file12, file13, file14, file15]
    dfs = []

    if all:
        for file in files:
            spark.sparkContext.addFile(path + file)
            filename = SparkFiles.get(file)
            df = spark.read.json(filename)
            dfs.append(df)
    else:
        for file in files[len(files)-2:]:
            spark.sparkContext.addFile(path + file)
            filename = SparkFiles.get(file)
            df = spark.read.json(filename)
            dfs.append(df)
    return dfs

# Spark implementation & tasks:

### Task 0: Data pre-processing, filtering and cleaning:

In [None]:
# Preprocessing (P1 and P2 filtering) (Common to all tasks, the returned dataframe will be used for all three tasks):
def preprocessing(dfs):
    for i in range(len(dfs)):
        # Remove columns that are not needed for all three tasks:
        dfs[i] = dfs[i].drop('sampling_rate', 'timestamp').withColumn('country', dfs[i].location.country).withColumn('latitude', dfs[i].location.latitude.cast('float')).withColumn('longitude', dfs[i].location.longitude.cast('float')).withColumn('sensor_id', dfs[i].sensor.id).drop('location', 'sensor')
        # Explode sensordatavalues using pyspark.sql.functions.explode
        df_ = dfs[i].withColumn('sensordatavalues', explode('sensordatavalues'))
        # Remove rows that aren't P1 or P2:
        df_ = df_[df_.sensordatavalues.value_type.isin(['P1', 'P2'])]
        # Remove rows that have negative values:
        df_ = df_[df_.sensordatavalues.value >= 0]
        # Regroup sensordatavalues by record id:
        df_ = df_.groupby('id').agg(collect_list('sensordatavalues').alias('sensordatavalues'))
        # Remove the old sensordatavalues column still containing values different from P1 and P2:
        dfs[i] = dfs[i].drop('sensordatavalues')
        # Link the new sensordatavalues column to the old dataframe, on id:
        dfs[i] = dfs[i].join(df_, on='id', how='inner')
    return dfs

### Task 1: Identify the top 10 countries in terms of average air quality improvement over the previous 24 hours as well as the current averaged air quality indices of each. As far as possible use the country field in the sensor data to identify the country.

In [None]:
def filter_by_country(dfs):
    for i in range(len(dfs)):
        # Explode sensordatavalues column:
        dfs[i] = dfs[i].withColumn('sensordatavalues', explode('sensordatavalues'))
        # Create a column only for P1 and P2 values:
        dfs[i] = dfs[i].withColumn('P1', when(dfs[i].sensordatavalues.value_type == 'P1', dfs[i].sensordatavalues.value).otherwise(None))
        dfs[i] = dfs[i].withColumn('P2', when(dfs[i].sensordatavalues.value_type == 'P2', dfs[i].sensordatavalues.value).otherwise(None))
        # Group by country and calculate the mean of P1 and P2 for each cluster, and keep the sensor_amount:
        dfs[i] = dfs[i].groupBy('country').agg(mean('P1').alias('avgP1'), mean('P2').alias('avgP2'))
        # Map the average P1 and P2 values to the AQI scale using when statements:
        dfs[i] = dfs[i].withColumn('avgP1_AQI', when(dfs[i].avgP1 < 17, 1).otherwise(
            when(dfs[i].avgP1 < 34, 2).otherwise(
                when(dfs[i].avgP1 < 51, 3).otherwise(
                    when(dfs[i].avgP1 < 59, 4).otherwise(
                        when(dfs[i].avgP1 < 67, 5).otherwise(
                            when(dfs[i].avgP1 < 76, 6).otherwise(
                                when(dfs[i].avgP1 < 84, 7).otherwise(
                                    when(dfs[i].avgP1 < 92, 8).otherwise(
                                        when(dfs[i].avgP1 < 101, 9).otherwise(10))))))))))
        dfs[i] = dfs[i].withColumn('avgP2_AQI', when(dfs[i].avgP2 < 12, 1).otherwise(
            when(dfs[i].avgP2 < 24, 2).otherwise(
                when(dfs[i].avgP2 < 36, 3).otherwise(
                    when(dfs[i].avgP2 < 42, 4).otherwise(
                        when(dfs[i].avgP2 < 48, 5).otherwise(
                            when(dfs[i].avgP2 < 54, 6).otherwise(
                                when(dfs[i].avgP2 < 59, 7).otherwise(
                                    when(dfs[i].avgP2 < 65, 8).otherwise(
                                        when(dfs[i].avgP2 < 71, 9).otherwise(10))))))))))
        # Calculate the max AQI value for each cluster:
        dfs[i] = dfs[i].withColumn('maxAQI', when(dfs[i].avgP1_AQI > dfs[i].avgP2_AQI, dfs[i].avgP1_AQI).otherwise(dfs[i].avgP2_AQI))
        dfs[i] = dfs[i].select('country', 'maxAQI')
    return dfs

In [None]:
def task1(dfs):
    # Choosing the last two dataframes, identify the top 10 countries in terms of average air quality improvement over the previous 24 hours as well as the current averaged air quality indices of each.
    df1 = dfs[len(dfs)-2].select('country', 'maxAQI').withColumnRenamed('maxAQI', 'maxAQI_1')
    df2 = dfs[len(dfs)-1].select('country', 'maxAQI').withColumnRenamed('maxAQI', 'maxAQI_2')
    df_diff = df1.join(df2, on='country', how='inner')
    df_diff = df_diff.withColumn('diffAQI', df_diff.maxAQI_2 - df_diff.maxAQI_1).select('country', 'diffAQI')
    df_diff = df_diff.sort('diffAQI', ascending=True)
    return df_diff

#

## Task 2: Using the geo-coordinates from the sensor data, group the data into smaller regions using an appropriate clustering algorithm. Then determine the top 50 regions in terms of air quality improvement over the previous 24 hours.

### Task 2-1: Pre-filter the pre-processed data by creating clusters and grouping data by cluster:

In [None]:
def filter_by_cluster(dfs, amount):
    # Create a KMeans model fitting on the first dataframe, using the latitude and logitude columns, combined together in a features column used to predict the future coordinates:
    model = KMeans(k=amount, seed=23, featuresCol='features', predictionCol='cluster_id').fit(VectorAssembler(inputCols=['latitude', 'longitude'], outputCol='features').transform(dfs[0]))
    # (This model will be used to predict the cluster for each sensor id in the second dataframe.)
    for i in range(len(dfs)):
        # Transform the dataframe:
        dfs[i] = VectorAssembler(inputCols=['latitude', 'longitude'], outputCol='features').transform(dfs[i])
        # Predict the cluster for each sensor id:
        dfs[i] = model.transform(dfs[i])
        # Explode sensordatavalues column:
        dfs[i] = dfs[i].withColumn('sensordatavalues', explode('sensordatavalues'))
        # Create a column only for P1 and P2 values:
        dfs[i] = dfs[i].withColumn('P1', when(dfs[i].sensordatavalues.value_type == 'P1', dfs[i].sensordatavalues.value).otherwise(None))
        dfs[i] = dfs[i].withColumn('P2', when(dfs[i].sensordatavalues.value_type == 'P2', dfs[i].sensordatavalues.value).otherwise(None))
        # Create a column for the coordinates, an array of latitude and longitude:
        dfs[i] = dfs[i].withColumn('coordinates', array('latitude', 'longitude'))
        # Group by cluster_id and calculate the mean of P1 and P2 for each cluster, and keep the sensor_amount and a list of sensor_ids as well as the coordinates:
        dfs[i] = dfs[i].groupBy('cluster_id').agg(mean('P1').alias('avgP1'), mean('P2').alias('avgP2'), count('sensor_id').alias('sensor_amount'), collect_list('sensor_id').alias('sensor_ids'), collect_list('coordinates').alias('coordinates'))

        # Store each cluster's center in the dataframe:
        cluster_centers = []
        for k in range(len(model.clusterCenters())):
            cluster_centers.append([float(np.round(model.clusterCenters()[k][0], 2)), float(np.round(model.clusterCenters()[k][1], 2))])
        cluster_ids = [i for i in range(len(cluster_centers))]
        # Create a dataframe containing the cluster_id and the cluster_center:
        df_cluster_centers = spark.createDataFrame(list(zip(cluster_ids, cluster_centers)), ['cluster_id', 'cluster_center'])
        # Join the two dataframes:
        dfs[i] = dfs[i].join(df_cluster_centers, on='cluster_id', how='inner')

        # Map the average P1 and P2 values to the AQI scale using when statements:
        dfs[i] = dfs[i].withColumn('avgP1_AQI', when(dfs[i].avgP1 < 17, 1).otherwise(
            when(dfs[i].avgP1 < 34, 2).otherwise(
                when(dfs[i].avgP1 < 51, 3).otherwise(
                    when(dfs[i].avgP1 < 59, 4).otherwise(
                        when(dfs[i].avgP1 < 67, 5).otherwise(
                            when(dfs[i].avgP1 < 76, 6).otherwise(
                                when(dfs[i].avgP1 < 84, 7).otherwise(
                                    when(dfs[i].avgP1 < 92, 8).otherwise(
                                        when(dfs[i].avgP1 < 101, 9).otherwise(10))))))))))
        dfs[i] = dfs[i].withColumn('avgP2_AQI', when(dfs[i].avgP2 < 12, 1).otherwise(
            when(dfs[i].avgP2 < 24, 2).otherwise(
                when(dfs[i].avgP2 < 36, 3).otherwise(
                    when(dfs[i].avgP2 < 42, 4).otherwise(
                        when(dfs[i].avgP2 < 48, 5).otherwise(
                            when(dfs[i].avgP2 < 54, 6).otherwise(
                                when(dfs[i].avgP2 < 59, 7).otherwise(
                                    when(dfs[i].avgP2 < 65, 8).otherwise(
                                        when(dfs[i].avgP2 < 71, 9).otherwise(10))))))))))
        # Calculate the max AQI value for each cluster:
        dfs[i] = dfs[i].withColumn('maxAQI', when(dfs[i].avgP1_AQI > dfs[i].avgP2_AQI, dfs[i].avgP1_AQI).otherwise(dfs[i].avgP2_AQI))
        dfs[i] = dfs[i].select('cluster_id', 'cluster_center', 'sensor_amount', 'sensor_ids', 'coordinates', 'maxAQI').sort('cluster_id')
    return dfs

### Task 2-2: Select last two dataframes and compare their AQI to sort clusters by the AQI difference over the last 24 hours:

In [None]:
def task2(dfs):
    # Select the last two dataframes, to compare the evolution of the air quality between the last 24 hours:
    df1 = dfs[len(dfs)-2].select('cluster_id', 'cluster_center', 'sensor_amount', 'maxAQI').withColumnRenamed('maxAQI', 'maxAQI_1')
    df2 = dfs[len(dfs)-1].select('cluster_id', 'maxAQI').withColumnRenamed('maxAQI', 'maxAQI_2')
    # Join both dataframes on cluster_id:
    df_diff = df1.join(df2, on='cluster_id', how='inner')
    # Create a column named diffAQI, whose value is the relative difference between today's maxAQI, and yesterday maxAQI:
    df_diff = df_diff.withColumn('diffAQI', df_diff.maxAQI_2 - df_diff.maxAQI_1).select('cluster_id', 'cluster_center', 'sensor_amount', 'diffAQI')
    # Sort the dataframe by diffAQI, starting with the lowest diffAQIs:
    df_diff = df_diff.sort('diffAQI', ascending=True)
    return df_diff

###

### Task 3: Calculate the longest streaks of good air quality (ie low index values) and display as a histogram.

In [None]:
def task3(dfs):
    for i in range(len(dfs)):
        dfs[i] = dfs[i].select('cluster_id', 'cluster_center', 'sensor_amount', 'maxAQI').sort('cluster_id')
    # Create a RDD with the cluster id, and a list containing 0s or 1s if the maxAQI is respectively lower or higher than 3:
    rdd = dfs[0].rdd.map(lambda x: (x[0], [0 if x[3] < 4 else 1]))
    # Create a RDD with the cluster id, the list of 0s/1s, the current streak of repetitive 0s and the max streak:
    rdd_streaks = rdd.map(lambda x: (x[0], x[1], (1 if x[1][0] == 0 else 0), (1 if x[1][0] == 0 else 0)))
    # Convert the RDD to a dataframe:
    df = rdd_streaks.toDF(['cluster_id', 'streaks', 'current_streak', 'max_streak'])
    # For all the following days (each dataframes following the first stored one)
    for i in range(1, len(dfs)):
        # Create a RDD with the cluster id, and a list containing 0s or 1s if the maxAQI is respectively lower or higher than 3:
        rdd = dfs[i].rdd.map(lambda x: (x[0], [0 if x[3] < 4 else 1]))
        # Create a RDD with the cluster id, the list of 0s/1s and the current streak:
        rdd_streaks = rdd.map(lambda x: (x[0], x[1], (1 if x[1][len(x[1])-1] == 0 else 0)))
        # Convert the RDD containing streak information to a dataframe, and join it to the previous dataframe:
        df = df.join(rdd_streaks.toDF(['cluster_id', 'streak', 'previous_streak']), on='cluster_id', how='inner')
        # Concatenate the 0s/1s values list with the current df 0s/1s value into a single list, and drop the colomn with only one value:
        df = df.withColumn('streaks', concat('streaks', 'streak'))
        df = df.drop('streak')
        # Update the current_streak column using the previous_streak value:
        df = df.withColumn('current_streak', when(df.previous_streak == 1, df.current_streak + 1).otherwise(0))
        # Update the max_streak value using the previous_streak and the max_streak:
        df = df.withColumn('max_streak', when(df.current_streak > df.max_streak, df.current_streak).otherwise(df.max_streak))
        # Drop the current streak value:
        df = df.drop('previous_streak')
    # Show the current state of streaks for each cluster id (for verification):
    df = df.sort('cluster_id')
    return df

### Task 3: Two histogram methods:
Method 1: A single histogram showing longest (maximum or average) streaks across all regions.

In [None]:
def method_one_histogram(df):
    # Group by max_streak, show clusters ids with such max streak, as well as their amount:
    df = df.groupBy('max_streak').agg(count('cluster_id').alias('cluster_amount'), collect_list('cluster_id').alias('cluster_ids'))
    # Cast the max_streak and cluster_amount columns to int:
    df = df.withColumn('max_streak', df.max_streak.cast('int')).withColumn('cluster_amount', df.cluster_amount.cast('int'))
    # Sort the dataframe by max_streak:
    df = df.sort('max_streak', ascending=False)
    pdf = df.toPandas()
    pdf.plot.bar(x='max_streak', y='cluster_amount', rot=0)
    return pdf

Method 2: A histogram for each region/cluster, showing the distribution of continuous good AQI streaks.

In [None]:
def method_two_histogram(df):
    # Calculate successive 0s for each cluster:
    pdf = df.toPandas()
    max_streak = len(dataframes)
    # Create a list of lists, where each list contains the successive 0s for each cluster:
    pdf['successive_0s'] = pdf['streaks'].apply(lambda x: [len(list(g)) for k, g in groupby(x) if k == 0])
    # Count the amount of clusters with a certain amount of successive 0s:
    pdf['successive_0s_hist'] = pdf['successive_0s'].apply(lambda x: [x.count(i) for i in range(1, max_streak+1)])
    # Deduce the amount of time the cluster didn't came back to a good air quality:
    pdf['successive_0s_hist'] = pdf.apply(lambda x: [max_streak - np.sum(x['successive_0s'])] + x['successive_0s_hist'], axis=1)
    # Remove useless columns:
    pdf = pdf.drop('streaks', axis=1).drop('current_streak', axis=1).drop('max_streak', axis=1)
    return pdf

### PROGRAM EXECUTION:

In [None]:
# Load and pre-process the data:
dataframes = preprocessing(load_data(True))

# Task 1:
df_tsk1 = task1(filter_by_country(dataframes[len(dataframes)-2:]))

# Task 2:
df_tsk2 = task2(filter_by_cluster(dataframes[len(dataframes)-2:], 200))

# Task 3:
dfs_fbcr = filter_by_cluster(dataframes[:], 50)
df_tsk3 = task3(dfs_fbcr[:])

# df1 = method_one_histogram(df_tsk3)           # Currently not using it directly, as it'll be used on every cluster on the Folium map.
df2 = method_two_histogram(df_tsk3)

In [None]:
# Gather back the last day of data, filtered by clusters, in order to get the coordinates of the clusters:
df = dfs_fbcr[-1]
# Split cluster center coordinates into two columns, and drop the old cluster_center column:
df = df.withColumn('center_latitude', df['cluster_center'][0]).withColumn('center_longitude', df['cluster_center'][1]).drop('cluster_center')
# Convert the dataframe to a pandas dataframe:
pdf = df.toPandas()
# Gather back the pdf containing the successive 0s for each cluster, used for the second method histogram:
pdf2 = df2.drop('successive_0s', axis=1)
# Inner join the two dataframes:
pdf = pdf.merge(pdf2, on='cluster_id', how='inner')

pdf.head(5)

In [None]:
# Create a Folium World Map:
map = folium.Map(location=[30,0], zoom_start=3.2, tiles='OpenStreetMap', width='100%', height='100%')

# Default markers:
markers = folium.FeatureGroup(name='markers')
for i in range(0, len(pdf)):
    chart = alt.Chart(pd.DataFrame({'x': range(0, len(dataframes)+1), 'y': pdf.iloc[i]['successive_0s_hist']})).mark_bar(size=20).encode(
        x='x',
        y='y'
    )
    chart_json = json.loads(chart.to_json())
    folium.Marker(
        location=[pdf.iloc[i]['center_latitude'], pdf.iloc[i]['center_longitude']],
        popup=folium.Popup(max_width=450).add_child(
            folium.VegaLite(chart_json, width=450, height=250))
    ).add_to(markers)
markers.add_to(map)

# AQI heatmap:
heatmapgroup = folium.FeatureGroup(name='heatmap')
heatmap = HeatMap(list(zip(pdf['center_latitude'], pdf['center_longitude'], pdf['maxAQI'])),
                  min_opacity=0.2,
                  radius=17, blur=15,
                  max_zoom=1,
                  gradient={0.2: 'blue', 0.4: 'lime', 0.6: 'orange', 1: 'red'})
heatmap.add_to(heatmapgroup)
heatmapgroup.add_to(map)

# Branca colormap:
colormap = branca.colormap.LinearColormap(
    colors=['blue', 'lime', 'orange', 'red'],
    vmin=1,
    vmax=10
).to_step(index=[i for i in range(1,11)])
colormap.caption = 'Air quality index'
colormap.add_to(map)

# Add a layer control to the map:
folium.LayerControl().add_to(map)

# Show the map:
map

In [None]:
# Stop the timer and the spark session:
print("--- %s seconds ---" % (time.time() - start_time))
# spark.stop()