### MLBD

In [1]:
# !pip install --upgrade pyspark
# !pip install --upgrade matplotlib==3.6.1

In [2]:
# !pip install sklearn numpy pandas datasist
# !pip install pyspark spark spark-nlp
# !pip install geopandas shapely
# !pip install matplotlib mpl_toolkits
# !pip install plotly kaleido
# !pip install -U kaleido
# !pip install folium
# !pip install geopy
# !pip install branca
# !pip install altair
# !pip install pycountry

In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles, SparkContext, SparkConf
from functools import reduce  # For Python 3.x
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
import numpy as np
import geopandas as gpd
from shapely.geometry import Point
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
import glob
import folium
import warnings
import geopy
import branca
import altair as alt
import pycountry
import datetime

In [4]:
warnings.filterwarnings("ignore", category=FutureWarning)
conf_spark = SparkConf().set("spark.driver.host", "127.0.0.1")
sc = SparkContext(conf=conf_spark)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/29 13:37:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark = SparkSession.builder.appName('airscholar').getOrCreate()

In [6]:
# spark._sc.stop()

In [105]:
def extract_computation_dates():
    json_dates = glob.glob("datasets/*.json")
    json_dates.sort()
    json_dates = [json.replace('datasets/data.24h-', '').replace('.json', '') for json in json_dates]
    return json_dates

def read_all_jsons():
    """
    This functions reads all the datasets downloaded into datasets folder
    """
    # add all files in datasets directory to spark
    spark.sparkContext.addFile('datasets', recursive=True)
    #get all the data in the dataset directory as json
    jsons = glob.glob("datasets/*.json")
    #order them by name
    jsons.sort()
    base_data_df = []
    for i in range(0, len(jsons)):
        json_obj = jsons[i]
        #add the file to spark for processing
        spark.sparkContext.addFile(json_obj)
        # print(json_obj)
        #read the json into dataframe
        t_df = spark.read.json("file://"+SparkFiles.get(json_obj))
        #sort by timestamp
        t_df = t_df.sort(t_df.timestamp)
        #add the dataframe to the array
        base_data_df.append(t_df)
    return base_data_df

def clean_data(base_data_df):
    """
    This function extracts the values in the sensor field
    """
    #convert timestamp datatype from string to timestamp type
    base_data_df = base_data_df.withColumn('timestamp', base_data_df.timestamp.cast(TimestampType()))
    #extract sensor data values, id and types
    sensor_df = base_data_df\
    .withColumn('sensorId', base_data_df.sensor.getField('id').cast(StringType()))\
    .withColumn('sdvID', base_data_df.sensordatavalues.getField('id').cast(StringType()))\
    .withColumn('sdvValue', base_data_df.sensordatavalues.getField('value'))\
    .withColumn('sdvValue_type', base_data_df.sensordatavalues.getField('value_type'))
    #drop unused columns after extraction
    sensor_df = sensor_df.drop('sampling_rate', 'sensor', 'sensordatavalues')
    #filter only records with P1 and P2
    sensor_df = sensor_df.filter(array_contains(sensor_df['sdvValue_type'], 'P1') & array_contains(sensor_df['sdvValue_type'], 'P2'))
    #extract the position of P1 and P2 in the array
    sensor_df = sensor_df.withColumn('P1_Pos', array_position(sensor_df.sdvValue_type, "P1")-1).withColumn('P2_Pos', array_position(sensor_df.sdvValue_type, "P2")-1)
    #extract the value of the P1 and P2 in the array using the array position. Then convert to integer datatype from string
    sensor_df = sensor_df.withColumn('P1', sensor_df['sdvValue'][sensor_df.P1_Pos.cast(IntegerType())])\
    .withColumn('P2', sensor_df['sdvValue'][sensor_df.P2_Pos.cast(IntegerType())])
    #extract and convert P1 and P2 data to Float
    sensor_df.withColumn('P1', sensor_df.P1.cast(FloatType())).withColumn('P2', sensor_df.P2.cast(FloatType()))
    #extract the country information with latitude and longitude
    sensor_df = sensor_df.withColumn('country', base_data_df.location.getField('country'))\
    .withColumn('latitude', sensor_df.location.getField('latitude').cast(FloatType()))\
    .withColumn('longitude', sensor_df.location.getField('longitude').cast(FloatType())).drop('location')
    sensor_df = sensor_df.withColumn('country', regexp_replace('country', 'UK', 'GB'))
    #drop the unused columns
    sensor_df = sensor_df.drop('sdvID','sdvValue_type', 'sdvValue', 'P1_Pos', 'P2_Pos')
    # sensor_df = sensor_df[(sensor_df.latitude != 0) & (sensor_df.longitude != 0)]
    
    return sensor_df

def compute_country_names(df):
    country_names = []
    [country_names.append([list(country)[0], (pycountry.countries.get(alpha_2=(list(country)[0]).replace('UK', 'GB'))).name]) for country in df[['country']].collect()]
    df = pd.DataFrame(np.array(country_names), columns=['country', 'country_name'])
    df = spark.createDataFrame(df)
    
    return df

def generate_model(df):
    """
    This function generates the model of the first day which is used for the prediction of other days
    """
    vecAssembler = VectorAssembler(inputCols=['latitude', 'longitude'], outputCol="features")
    X = df.select('latitude', 'longitude')
    new_df = vecAssembler.transform(X)
    kmeans = KMeans(k=100, maxIter=10, seed=1)
    model = kmeans.fit(new_df)
    
    return model

def clean_and_format_dataframe1(df):
    # clean the required each day data by extracting the sensor information
    df = clean_data(df)
    # extract location information with the corresponding P1 and P2
    # df = df.select('latitude', 'longitude', 'P1', 'P2')
    # .groupby(col('latitude'), col('longitude')).avg().withColumnRenamed('avg(P1)', 'P1').withColumnRenamed('avg(P2)', 'P2').drop('avg(latitude)').drop('avg(longitude)')
    # generate prediction
    df = compute_predictions(model, df)
    # generate AQI for each day of data
    df = generate_AQI(df)
    
    return df

def clean_and_format_dataframe(df):
    # clean the required each day data by extracting the sensor information
    df = clean_data(df)
    # extract location information with the corresponding P1 and P2
    df = df.select('latitude', 'longitude', 'country', 'P1', 'P2').withColumn('P1', round(col('P1'), 4).cast(FloatType()))\
    .withColumn('P2', round(col('P2'), 4).cast(FloatType()))\
    .groupby(col('latitude'), col('longitude'), col('country')).avg().withColumnRenamed('avg(P1)', 'P1').withColumnRenamed('avg(P2)', 'P2').drop('avg(latitude)').drop('avg(longitude)')
    # generate prediction
    df = compute_predictions(model, df)
    # generate AQI for each day of data
    df = generate_AQI(df)
    
    return df

def compute_predictions(model, df):    
    """
    This function uses the generated model of the first day to predict each of the other days
    """
    vecAssembler = VectorAssembler(inputCols=['latitude', 'longitude'], outputCol="features")
    x = df.select('latitude', 'longitude')
    vec = vecAssembler.transform(x)
    prediction = model.transform(vec)
    
    prediction = prediction.drop('features')
    # prediction.sort(col('latitude'),  col('longitude')).show()
    df = df.join(prediction, ['latitude', 'longitude'], 'inner')
    
    # df.show()
    return df

def generate_cluster(model, df):
    """
    This function generates the cluster center for each of the predictions
    """
    XX = pd.DataFrame(model.clusterCenters(), columns=['longitude', 'latitude'])
    XX['cluster_index'] = XX.index.map(lambda x: format(x))
    XX = spark.createDataFrame(XX)
    XX = XX.withColumn('cluster_center', concat(array(col('longitude'), col('latitude')))).drop('longitude', 'latitude')
    
    df = df.join(XX, df.prediction == XX.cluster_index, 'inner')

    df = df.drop('cluster_index', 'latitude', 'longitude', 'country')
    
    return df

def mergedTwoDates(dfs):
    """
    This function merges two dates together
    """
    df1 = dfs[0]
    df2 = dfs[1]
    
    #group data by cluster
    
    merged = df1.withColumnRenamed('AQI', 'AQI_1')\
        .join(df2.withColumnRenamed('AQI', 'AQI_2'),\
          ['latitude', 'longitude', 'country', 'prediction' ], 'inner')
    
    merged = merged.withColumn('AQI_1', round(merged.AQI_1.cast(IntegerType()))).withColumn('AQI_2', round(merged.AQI_2.cast(IntegerType())))
      
    # merged = merged.groupBy('prediction', 'cluster_center' ).avg().drop('avg(prediction)')
    
    # merged = merged.withColumn('Improvement', lit(col('AQI_1') - col('AQI_2')))
    
    # merged = merged.withColumn('Improvement', lit(col('AQI_1') - col('AQI_2')))
   
    return merged

def mergeDates(model, dfs):
    """
    This function merges two dates together
    """
    result = []
    
    for i in range(0, len(dfs)):
        curr = dfs[i]
        # curr = curr.drop('latitude', 'longitude', 'country') 
        # curr = curr.sort(col('prediction'))
        # curr.show()
        curr = generate_cluster(model, curr)

        curr = curr.withColumnRenamed('AQI', f'AQI_{i+1}').withColumn(f'AQI_{i+1}', col(f'AQI_{i+1}').cast(IntegerType()))
        curr = curr.groupBy('prediction', 'cluster_center').avg().drop('avg(prediction)').withColumnRenamed(f'avg(AQI_{i+1})', f'AQI_{i+1}')
        curr = curr.withColumn(f'AQI_{i+1}', ceil(f'AQI_{i+1}'))
        # df.sort(col('prediction')).show()
        if result:
            result = result.join(curr, ['prediction', 'cluster_center' ], 'leftouter')
        else:
            result = curr

        # if clustered_locs:
        #     clustered_locs = clustered_locs.join(clustered, ['cluster_center' ], 'leftouter')
        # else:
        #     clustered_locs = clustered
        
    return result
    

def generate_AQI(df):
    """
    This function generates the Air Quality Indeces of the each of the prediction
    """
    df = df.withColumn('P1_AQI', \
                     when((df.P1 >= 0) & (df.P1 < 17), lit(1))\
                     .when((df.P1 >= 17) & (df.P1 < 34), lit(2))\
                     .when((df.P1 >= 34) & (df.P1 < 51), lit(3))\
                     .when((df.P1 >= 51) & (df.P1 < 59), lit(4))\
                     .when((df.P1 >= 59) & (df.P1 < 67), lit(5))\
                     .when((df.P1 >= 67) & (df.P1 < 76), lit(6))\
                     .when((df.P1 >= 76) & (df.P1 < 84), lit(7))\
                     .when((df.P1 >= 84) & (df.P1 < 92), lit(8))\
                     .when((df.P1 >= 92) & (df.P1 <= 100), lit(9))\
                     .when((df.P1 > 100), lit(10))\
                    )
    df = df.withColumn('P2_AQI', \
                         when((df.P2 >= 0) & (df.P2 < 12), lit(1))\
                         .when((df.P2 >= 12) & (df.P2 < 24), lit(2))\
                         .when((df.P2 >= 24) & (df.P2 < 36), lit(3))\
                         .when((df.P2 >= 36) & (df.P2 < 42), lit(4))\
                         .when((df.P2 >= 42) & (df.P2 < 48), lit(5))\
                         .when((df.P2 >= 48) & (df.P2 < 54), lit(6))\
                         .when((df.P2 >= 54) & (df.P2 < 59), lit(7))\
                         .when((df.P2 >= 59) & (df.P2 < 65), lit(8))\
                         .when((df.P2 >= 65) & (df.P2 <= 70), lit(9))\
                         .when((df.P2 > 70), lit(10))\
                        )
    df = df.withColumn('AQI', when((df.P1_AQI < df.P2_AQI), lit(df.P2_AQI)).otherwise(lit(df.P1_AQI)))
    df = df.withColumn('AQI_Range', \
                         when((df.AQI >= 1) & (df.AQI <= 3), lit('Low'))\
                         .when((df.AQI >= 4) & (df.AQI <= 6), lit('Medium'))\
                         .when((df.AQI >= 7) & (df.AQI <= 9), lit('High'))\
                         .when((df.AQI == 10), lit('Very High'))\
                        )
    df = df.drop('P1', 'P2', 'P1_AQI', 'P2_AQI', 'AQI_Range')
    # df = df.withColumn('AQI_Improv', lit(df.Day2_AQI - df.Day1_AQI))
    return df

def formatData(df):
    df = df.groupBy('prediction', 'cluster_center').avg()
    df = df.drop('avg(prediction)').withColumnRenamed('avg(AQI_1)', 'AQI_1').withColumnRenamed('avg(AQI_2)', 'AQI_2').withColumnRenamed('avg(Improvement)', 'Improvement')\
        .withColumn('AQI_1', round(col('AQI_1'))).withColumn('AQI_2', round(col('AQI_2'))).withColumn('Improvement', lit(col('AQI_1') - col('AQI_2'))) 
    return df

def compute_streak(dataFrame):
    newRow = []
    for row in dataFrame.collect():
        tempCol = []
        x = 1
        for col in range(0, len(row)):
            if col > 1:
                if(row[col] > 4):
                    x = 0
                    tempCol.append(x)
                else:
                    if x == 0:
                        x = 1
                    tempCol.append(x)
                    x+=1
            else:
                tempCol.append(row[col])
        newRow.append(tempCol)
    return newRow

def task1(dataframe):
    arrs = []
    for idx in range(0, len(dataframe[0:2])):
        df = dataframe[idx]
        #clean the data
        print(f'cleaning data for day {idx+1}...')
        df = clean_data(df)
        df = df.select('timestamp', 'country', 'P1', 'P2').withColumn('P1', col('P1').cast(FloatType()))\
        .withColumn('P2', col('P2').cast(FloatType())).groupby(col('country')).avg().withColumnRenamed('avg(P1)', 'P1').withColumnRenamed('avg(P2)', 'P2')
        print(f'generating Air Quality Index for day {idx+1}...')
        df = generate_AQI(df)
        # df.show()
        arrs.append(df)

    for idx in range(0,2, 2):
        curr_df = arrs[idx]
        if idx < len(arrs)-1:
            next_df = arrs[idx+1]
            
        df = curr_df.select('country', 'AQI').withColumnRenamed('AQI', json_dates[idx]).join(next_df.select('country', 'AQI').withColumnRenamed('AQI', json_dates[idx+1]), ['country'], 'inner')
        #get the difference between day1 AQI and day2 AQI
        print('computing Air Quality Index improvement information...')
        df = df.withColumn(f'Improvement', lit(col(json_dates[idx]) - col(json_dates[idx+1])))
        print('computing country names')
        country_names = compute_country_names(df)
        df = df.join(country_names, ['country'], 'right')
        df = df.select('country', 'country_name', col(json_dates[idx]), col(json_dates[idx+1]), 'Improvement')
        print('ranking top 10 AQI improvement in the last 24 hours')
        df = df.sort(col('Improvement').desc())
        return df

def process_multiple_days(base_df, no_of_days):
    arrs = []
    days_to_process = no_of_days if no_of_days else len(base_df)
    print(f'cleaning and generating Air Quality Index data for {days_to_process} days')
    for idx in range(0, len(base_df[0:days_to_process])):
        df = base_df[idx]
        #clean the data
        
        df = clean_data(df)
        #extract required fields for computation
        df = df.select('longitude', 'latitude', 'P1', 'P2').withColumn('P1', col('P1').cast(FloatType()))\
            .withColumn('P2', col('P2').cast(FloatType()))
        #calculating the averge
        df = df.groupby(col('longitude'), col('latitude')).avg('P1', 'P2')\
            .withColumnRenamed('avg(P1)', 'P1').withColumnRenamed('avg(P2)', 'P2')
        df = generate_AQI(df)
        arrs.append(df)
    return arrs

def task2(base_df, no_of_days):
    #combine all the datasets to a single dataframe to create a model
    arrs = process_multiple_days(base_df, no_of_days)
    for idx in range(0,2, 2):
        curr_df = arrs[idx]
        if idx < len(arrs)-1:
            next_df = arrs[idx+1]

        df = curr_df.select('latitude', 'longitude', 'AQI').withColumnRenamed('AQI', json_dates[idx])\
            .join(next_df.select('latitude', 'longitude', 'AQI').withColumnRenamed('AQI', json_dates[idx+1]), ['latitude', 'longitude'], 'inner')
        #get the difference between day1 AQI and day2 AQI
        # df.sort(col('latitude'), col('longitude')).show()
        df = compute_predictions(model, df)
        # df = clean_and_format_dataframe(df)
        df = generate_cluster(model, df)
        df = df.groupby(col('cluster_center'), col('prediction')).avg(json_dates[idx], json_dates[idx+1])\
            .withColumnRenamed(f'avg({json_dates[idx]})', json_dates[idx]).withColumnRenamed(f'avg({json_dates[idx+1]})', json_dates[idx+1])\
            .withColumn(json_dates[idx], ceil(col(json_dates[idx])))\
            .withColumn(json_dates[idx+1], ceil(col(json_dates[idx+1])))
        # print('computing Air Quality Index improvement information...')
        df = df.withColumn(f'Improvement', lit(col(json_dates[idx]) - col(json_dates[idx+1])))
        df = df.sort(col('Improvement').desc())
        # df.show(50, truncate=False)
        return df

def task3(base_df, model):
    arrs = process_multiple_days(base_df, len(base_df))
    arrs = [compute_predictions(model, df) for df in arrs]
    newdf = mergeDates(model, arrs)
    newdf = newdf.fillna(0)
    # newdf.sort(col('prediction')).show(100, truncate=False)
    streakdf = compute_streak(newdf)
    streakdf = pd.DataFrame(streakdf)
    streakdf.columns = newdf.columns
    streakdf = spark.createDataFrame(streakdf)

    for i in range(0, len(streakdf.columns[2:])):
        streakdf = streakdf.withColumnRenamed(f'AQI_{i+1}', json_dates[i])
    
    return streakdf
    
def get_marker_color(arr):
    color = ''
    size = np.max(arr)
    if size >= 11:
        color = 'green'
    elif size >= 8 and size < 11:
        color = 'purple'
    elif size >= 5 and size < 8:
        color = 'orange'
    elif size >= 0 and size < 5:
        color = 'red'
    return color
    
def plotMap(df):
    df = df[['cluster_center', 'Improvement']]
    # marker_colors = ["darkgreen", "green","lightblue",  "orange", "red"]
    marker_colors = ["orange","lightblue",  "green", "red"]
    colormap = branca.colormap.LinearColormap(
        vmin=1,
        vmax=10,
        colors=marker_colors,
        caption="Air Quality Index")
    m = folium.Map(zoom_start=4, location=df.collect()[0][0])

    for row in df.collect():
        location = row[0]
        folium.Marker(location, icon=folium.Icon(color=marker_colors[row[1]])).add_to(m)   

    colormap.add_to(m)

    return m

def plotMapStreak(streakdf, plot_type):
    colormap = branca.colormap.LinearColormap(
        vmin=1,
        vmax=10,
        colors=["darkgreen", "green","lightblue",  "orange", "red"],
        caption="Air Quality Index")

    # create a map and instantiate it to the first cluster center
    m = folium.Map(zoom_start=4, location=streakdf.collect()[0][1])

    #create a marker for each cluster
    for row in streakdf.collect():
        title, location = row[0:2]
        source = pd.DataFrame(
            {
                'Days': streakdf.columns[2:],
                'AQI': row[2:],
            }
        )
        # title = row[0]
        # create an altair chart, then convert to JSON
        if plot_type == 'streak':
            chart = alt.Chart(source, title=f'Cluster_{title}').mark_bar().encode(x='Days', y='AQI')
        else:
            chart = alt.Chart(source, title=f'Cluster_{title}').mark_bar().encode(alt.X('AQI'), y='count()')
            
        folium.Marker(location, icon=folium.Icon(icon='spinner', prefix='fa', color='green'), 
                      popup=folium.Popup(max_width=400).add_child(folium.VegaLite(chart.to_json())),
        ).add_to(m)   

    colormap.add_to(m)

    return m

In [8]:
json_dates = extract_computation_dates()
base_df = read_all_jsons()

## MODEL FOR THE DATASET

In [None]:
main_base_df = reduce(DataFrame.unionAll, base_df)
main_base_df = clean_data(main_base_df)
model = generate_model(main_base_df)

# TASK 1

### Top 10 country in terms of average AQI over the previous 24 hours

In [None]:
start_time = datetime.datetime.now()
df = task1(base_df)
df.where(col('Improvement')>0).show(10, truncate=False)
print(f'Average computation time (HH:MM:SS)', datetime.datetime.now() - start_time)

cleaning data for day 1...
generating Air Quality Index for day 1...
cleaning data for day 2...
generating Air Quality Index for day 2...
computing Air Quality Index improvement information...
computing country names
ranking top 10 AQI improvement in the last 24 hours


                                                                                

+-------+------------------+----------+----------+-----------+
|country|country_name      |15-11-2022|16-11-2022|Improvement|
+-------+------------------+----------+----------+-----------+
|PK     |Pakistan          |7         |4         |3          |
|AZ     |Azerbaijan        |3         |1         |2          |
|LT     |Lithuania         |2         |1         |1          |
|EE     |Estonia           |2         |1         |1          |
|CH     |Switzerland       |2         |1         |1          |
|LV     |Latvia            |3         |2         |1          |
|ZA     |South Africa      |2         |1         |1          |
|SE     |Sweden            |3         |2         |1          |
|DO     |Dominican Republic|2         |1         |1          |
|TH     |Thailand          |2         |1         |1          |
+-------+------------------+----------+----------+-----------+
only showing top 10 rows

Average computation time (HH:MM:SS) 0:00:03.643846


# TASK 2

In [12]:
start_time = datetime.datetime.now()
# process only 2 days
df = task2(base_df, 2)
# selecting the top 50 data
df = spark.createDataFrame(df.head(50))
df.show(50, truncate=False)
m = plotMap(df)
print(f'Average computation time (HH:MM:SS)', datetime.datetime.now() - start_time, '\n')
m

cleaning and generating Air Quality Index data for 2 days


                                                                                

+------------------------------------------+----------+----------+----------+-----------+
|cluster_center                            |prediction|15-11-2022|16-11-2022|Improvement|
+------------------------------------------+----------+----------+----------+-----------+
|[51.269410080875815, 12.027613207803867]  |46        |4         |2         |2          |
|[51.95735015951243, 10.077312606491322]   |91        |4         |2         |2          |
|[75.78012084960938, -74.86083984375]      |77        |4         |3         |1          |
|[54.35791050068154, -1.9672783966335754]  |29        |4         |3         |1          |
|[-26.525959902796252, -49.81258773803711] |3         |2         |1         |1          |
|[29.463634258363307, 74.35604151283823]   |90        |8         |7         |1          |
|[48.75616433973613, 2.197483108768429]    |84        |3         |2         |1          |
|[58.363695055046335, 15.140260049960757]  |98        |2         |1         |1          |
|[12.75696

# Task 3

In [16]:
streakdf = task3(base_df, model)

cleaning and generating Air Quality Index data for 14 days


                                                                                

22/11/29 13:40:25 WARN DAGScheduler: Broadcasting large task binary with size 1166.8 KiB


                                                                                

In [106]:
map = plotMapStreak(newdf, 'histogram')
map

22/11/29 15:56:59 WARN DAGScheduler: Broadcasting large task binary with size 1166.8 KiB
22/11/29 15:57:00 WARN DAGScheduler: Broadcasting large task binary with size 1166.8 KiB


In [103]:
map = plotMapStreak(streakdf, 'streak')
map

In [104]:
import jovian
jovian.commit(filename='MLBD.ipynb')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

[jovian] Updating notebook "airscholar/mlbd-f2034" on https://jovian.ai/[0m
[jovian] Committed successfully! https://jovian.ai/airscholar/mlbd-f2034[0m


'https://jovian.ai/airscholar/mlbd-f2034'