In [None]:
import findspark
findspark.init()

import folium
import datetime
import ipywidgets as ipy
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import pyparsing as pp

from folium.plugins import MarkerCluster
from pyspark import sql, SparkConf, SparkContext
from pyspark.sql.functions import avg, col, to_date, lit
from pyspark.sql.functions import max as sparkMax
from pyspark.sql.functions import min as sparkMin
from IPython.display import display, clear_output
from ipywidgets import Output, VBox, widgets, interact

In [None]:
conf = SparkConf().setAppName("Read_CSV")
sc = SparkContext(conf=conf)
sql_context = sql.SQLContext(sc)

df_cities = sql_context.read.csv("Steden.csv", header=True)
df_pollution = sql_context.read.csv("Luchtvervuiling.csv", header=True)

coordinates = (20.593684, 78.96288)
pollution_map = folium.Map(location=coordinates, zoom_start=4)

In [None]:
MONTHS = ["january", "february", "march", "april", "may", "june", "july",\
          "august", "september", "october","november", "december"]
MONTHS_WITH_OPTION_ALL = ["all months", "january", "february", "march", "april", "may", "june", "july",\
                          "august", "september", "october", "november", "december"]

TYPES_P = ["so2", "no2", "rspm", "spm"]
HIGH_P = [10, 18, 200, 300]
LOW_P = [5, 10, 80, 200]

STRONG_BLUE = "#3186cc"
RED = "#ff0000"
GREEN = "#7CFC00"
LIGHT_ORANGE = "#ffd27f"

In [None]:
#Haal alle waarden uit date kolom met als doel alle verschillende jaren in een lijst te steken voor filter
date_rows = df_pollution.select(to_date(df_pollution.date).alias("to_date")).collect()
years = []
for row in date_rows:
    try:
        if row.to_date.year not in years:
            years.append(row.to_date.year)
    except:
        pass
years.sort(reverse=True)

#Haal alle waarden uit location kolom met als doel alle verschillende locations in een lijst te steken voor filter
location_rows = df_pollution.select("location").distinct().collect()
locations = [str(row["location"]) for row in location_rows]
locations.sort()

In [None]:
df_map_cities = df_cities.select("lat", "lng", "city")
df_map_pollution = df_pollution.select("location", "so2", "no2", "rspm", "spm")
color_point = STRONG_BLUE

@interact(types=TYPES_P)
def get_pollution(types):
    index = 0
    join_df = df_map_pollution.join(df_map_cities, df_map_pollution.location == df_map_cities.city, how="right")
    join_df = join_df.filter(col(types) != "NA").dropDuplicates(["city"]).collect()
    
    type_index = TYPES_P.index(types)
    high_p_value = HIGH_P[type_index]
    low_p_value = LOW_P[type_index]
    type_index_on_join_df = type_index + 1
    
    for line in join_df:
        pollution = join_df[index][type_index_on_join_df]
        if float(pollution) > high_p_value:
            color_point = RED
        elif float(pollution) < low_p_value:
            color_point = GREEN
        else:
            color_point = LIGHT_ORANGE

        folium.CircleMarker(
            location = [join_df[index][5], join_df[index][6]],
            radius = 4,
            popup = pollution,
            color = color_point,
            fill = True,
            fill_color = STRONG_BLUE
        ).add_to(pollution_map)
        index = index + 1
    return pollution_map

In [None]:
city_filter = widgets.Dropdown(
    options = locations,
    value = locations[0],
    description = "Location:",
    disabled = False,
)
year_filter = widgets.Dropdown(
    options = years,
    value = years[0],
    description = "Year:",
    disabled = False,
)
types_p_monthly_filter = widgets.Dropdown(
    options = TYPES_P,
    value = TYPES_P[0],
    description = "Type:",
    disabled = False,
)
display(widgets.HBox((city_filter, year_filter, types_p_monthly_filter)))

def update_monthly_plot():
    #Get filter waarden
    selected_city = city_filter.value
    selected_year = str(year_filter.value)
    selected_p_type = types_p_monthly_filter.value
    
    x_axis = MONTHS
    y_axis = []
    #Per maand het gemiddelde berekenen a.d.h.v. filters voor jaar, locatie en type vervuiling
    for i in range(1,13):
        if i < 10:
            month = "0" + str(i)
        else:
            month = str(i)
        try:
            month_rows = df_pollution.filter((col("date").between(selected_year + "-" + month + "-01",\
                                                                 selected_year + "-" + month + "-31"))\
                                            & (col("location") == selected_city)\
                                            & (col(selected_p_type) != "NA"))
            if month_rows.count() > 0:
                month_p_type_rows = month_rows.select(selected_p_type).collect()
                try:
                    month_p_type = [float(row[selected_p_type]) for row in month_p_type_rows]
                except:
                    pass
                monthly_p_type_avg = sum(month_p_type) / len(month_p_type)
                y_axis.append(monthly_p_type_avg)
            else:
                y_axis.append(0)
        except:
            pass
    #Plot
    fig1 = plt.figure(figsize=(12, 7))
    fig1.suptitle("Air Pollution measurements in " + selected_city + " (" + selected_year + ")", fontsize=14)
    ax = fig1.add_subplot(111)
    ax.set_xlabel("month")
    fig1_y_label = "Concentration of " + selected_p_type + " (μg/m³)"
    ax.set_ylabel(fig1_y_label)
    ax.plot(x_axis, y_axis, "go-", linewidth=2)
    plt.show()

update_monthly_plot()
#Update na filter changes
def on_change_monthly_plot_filter(change):
    if change["name"] == "value" and (change["new"] != change["old"]):
        clear_output()
        display(widgets.HBox((city_filter, year_filter, types_p_monthly_filter)))
        update_monthly_plot()

#Filters linken aan update functie
city_filter.observe(on_change_monthly_plot_filter)
year_filter.observe(on_change_monthly_plot_filter)
types_p_monthly_filter.observe(on_change_monthly_plot_filter)

In [None]:
types_p_yearly_filter = widgets.Dropdown(
    options = TYPES_P,
    value = TYPES_P[0],
    description = "Type:",
    disabled = False,
)
display(widgets.HBox((types_p_yearly_filter,)))

def update_yearly_plot():
    #Get filter waarden
    selected_p_type = types_p_yearly_filter.value
    
    x_axis = years
    industry_y_axis = []
    residential_y_axis = []
    #Per jaar het gemiddelde berekenen voor industrie en residentieel gebied a.d.h.v. de filter voor type vervuiling
    for year in years:
        industry_year_rows = df_pollution.filter((col("date").between(str(year) + "-01-01",\
                                                                     str(year) + "-31-12"))\
                                                & (col(selected_p_type) != "NA")\
                                                & ((col("type") == "Industrial") | (col("type") == "Industrial Area")\
                                                   | (col("type") == "Industrial Areas")))
        industry_year_p_type_rows = industry_year_rows.select(selected_p_type).collect()
        residential_year_rows = df_pollution.filter((col("date").between(str(year) + "-01-01",\
                                                                        str(year) + "-31-12"))\
                                                   & (col(selected_p_type) != "NA")\
                                                   & ((col("type") == "Residential") | (col("type") == "Residential and others")\
                                                      | (col("type") == "Residential, Rural and other Areas")))
        residential_year_p_type_rows = residential_year_rows.select(selected_p_type).collect()
        try:
            industry_year_p_type = [float(row[selected_p_type]) for row in industry_year_p_type_rows]
            residential_year_p_type = [float(row[selected_p_type]) for row in residential_year_p_type_rows]
        except:
            pass
        if len(industry_year_p_type) != 0:
            industry_year_p_type_avg = sum(industry_year_p_type) / len(industry_year_p_type)
        else:
            industry_year_p_type_avg = 0
        if len(residential_year_p_type) != 0:
            residential_year_p_type_avg = sum(residential_year_p_type) / len(residential_year_p_type)
        else:
            residential_year_p_type_avg = 0
        if industry_year_p_type_avg != 0:
            industry_y_axis.append(industry_year_p_type_avg)
        else:
            industry_y_axis.append(0)
        if residential_year_p_type_avg != 0:
            residential_y_axis.append(residential_year_p_type_avg)
        else:
            residential_y_axis.append(0)
    
    #Plot
    fig2 = plt.figure(figsize=(12, 7))
    fig2.suptitle("industrial vs residential " + selected_p_type + " measurements (yearly averages)", fontsize=14)
    ax2 = fig2.add_subplot(111)
    ax2.set_xlabel("year")
    fig2_y_label = "Concentration of " + selected_p_type + " (μg/m³)"
    ax2.set_ylabel(fig2_y_label)
    ax2.plot(x_axis, industry_y_axis, "ro-", linewidth=2, label = "industrial")
    ax2.plot(x_axis, residential_y_axis, "bo-", linewidth=2, label = "residential")
    ax2.legend()
    plt.show()

update_yearly_plot()
#Update na filter changes
def on_change_yearly_plot_filter(change):
    if change["name"] == "value" and (change["new"] != change["old"]):
        clear_output()
        display(widgets.HBox((types_p_yearly_filter,)))
        update_yearly_plot()

#Filters linken aan update functie
types_p_yearly_filter.observe(on_change_yearly_plot_filter)

In [None]:
#Filters
city_table_filter = widgets.Dropdown(
    options = locations,
    value = locations[0],
    description = "City:",
    disabled = False,
)
year_table_filter = widgets.Dropdown(
    options = years,
    value = years[0],
    description = "Year:",
    disabled = False,
)
month_table_filter = widgets.Dropdown(
    options = MONTHS_WITH_OPTION_ALL,
    value = MONTHS_WITH_OPTION_ALL[0],
    description = "Month:",
    disabled = False,
)
display(widgets.HBox((city_table_filter, year_table_filter, month_table_filter)))

def update_table():
    #Get filter waarden
    selected_city = city_table_filter.value
    selected_year = str(year_table_filter.value)
    selected_month = month_table_filter.value
    
    #Data van df_pollution die getoond moet worden overzetten naar df_table
    df_table = df_pollution.select(df_pollution["date"], df_pollution["state"], df_pollution["location"], df_pollution["type"],\
                                 df_pollution["so2"], df_pollution["no2"], df_pollution["rspm"], df_pollution["spm"])
    if str(selected_month) == "all months" :
        df_table = df_table.filter((col("date").between(selected_year + "-01-01",\
                                                      selected_year + "-12-31"))\
                                & (col("location") == selected_city))
    else :
        month = MONTHS_WITH_OPTION_ALL.index(selected_month)
        if month < 10 :
            month = "0" + str(month)
        month = str(month)
        df_table = df_table.filter((col("date").between(selected_year + "-" + month + "-01",\
                                                      selected_year + "-" + month + "-31"))\
                                 & (col("location") == selected_city))
    
    df_table = df_table.sort(col("date"))
    count = df_table.count()
    if count == 0 :
        no_measurements_error = "No measurements have taken place at " + selected_city + " during "
        error_ending_str = "the year " + selected_year + "."
        if str(selected_month) == "all months" :
            no_measurements_error += error_ending_str
        else :
            no_measurements_error += selected_month + " of " + error_ending_str
        print(no_measurements_error)
    else :
        df_table.show(df_table.count())

update_table()
#Update bij aanpassing van filters
def on_change_table_filter(change):
    if change["name"] == "value" and (change["new"] != change["old"]):
        clear_output()
        display(widgets.HBox((city_table_filter, year_table_filter, month_table_filter)))
        update_table()

#Filters linken aan on_change_table_filter functie
city_table_filter.observe(on_change_table_filter)
year_table_filter.observe(on_change_table_filter)
month_table_filter.observe(on_change_table_filter)

In [None]:
#Filters
city_short_table_filter = widgets.Dropdown(
    options = locations,
    value = locations[0],
    description = "City:",
    disabled = False,
)
yearshort_table_filter = widgets.Dropdown(
    options = years,
    value = years[0],
    description = "Year:",
    disabled = False,
)
monthshort_table_filter = widgets.Dropdown(
    options = MONTHS_WITH_OPTION_ALL,
    value = MONTHS_WITH_OPTION_ALL[0],
    description = "Month:",
    disabled = False,
)
types_p_short_table_filter = widgets.Dropdown(
    options = TYPES_P,
    value = TYPES_P[0],
    description = "Type:",
    disabled = False,
)
display(widgets.HBox((city_short_table_filter, types_p_short_table_filter)))
display(widgets.HBox((yearshort_table_filter, monthshort_table_filter)))

def update_short_table():
    #Get filter waarden
    selected_city = city_short_table_filter.value
    selected_year = str(yearshort_table_filter.value)
    selected_month = monthshort_table_filter.value
    selected_p_type = types_p_short_table_filter.value
    
    #Data van df_pollution die getoond moet worden overzetten naar df_short_table
    df_short_table = df_pollution.select(df_pollution["date"], df_pollution["state"], df_pollution["location"],\
                                      df_pollution["type"], df_pollution[selected_p_type])
    if str(selected_month) == "all months" :
        df_short_table = df_short_table.filter((col("date").between(selected_year + "-01-01",\
                                                                selected_year + "-12-31"))\
                                           & (col("location") == selected_city)\
                                           & (col(selected_p_type) != "NA"))
    else :
        month = MONTHS_WITH_OPTION_ALL.index(selected_month)
        if month < 10 :
            month = "0" + str(month)
        month = str(month)
        df_short_table = df_short_table.filter((col("date").between(selected_year + "-" + month + "-01",\
                                                                selected_year + "-" + month + "-31"))\
                                           & (col("location") == selected_city)\
                                           & (col(selected_p_type) != "NA"))
    
    df_short_table = df_short_table.sort(col("date"))
    count = df_short_table.count()
    if count == 0 :
        no_measurements_error = "No measurements of type " + selected_p_type + " have taken place at " + selected_city + " during "
        error_ending_str = "the year " + selected_year + "."
        if selected_month == "all months" :
            no_measurements_error += error_ending_str
        else :
            no_measurements_error += selected_month + " of " + error_ending_str
        print(no_measurements_error)
    else :
        short_table = df_short_table.agg(avg(col(selected_p_type)),\
                                      sparkMax(col(selected_p_type)),\
                                      sparkMin(col(selected_p_type)))
        short_table = short_table.select(col("avg(" + selected_p_type + ")")\
                                       .alias("Average " + selected_p_type),\
                                       col("max(" + selected_p_type + ")")\
                                       .alias("Maximum " + selected_p_type),\
                                       col("min(" + selected_p_type + ")")\
                                       .alias("Minimum " + selected_p_type))
        short_table.show()

update_short_table()
#Update bij aanpassing van filters
def on_change_short_table_filter(change):
    if change["name"] == "value" and (change["new"] != change["old"]):
        clear_output()
        display(widgets.HBox((city_short_table_filter, types_p_short_table_filter)))
        display(widgets.HBox((yearshort_table_filter, monthshort_table_filter)))
        update_short_table()

#Filters linken aan on_change_short_table_filter functie
city_short_table_filter.observe(on_change_short_table_filter)
yearshort_table_filter.observe(on_change_short_table_filter)
monthshort_table_filter.observe(on_change_short_table_filter)
types_p_short_table_filter.observe(on_change_short_table_filter)