# Madrid's Air Quality Analysis (2018-2019)

Throughout the following Jupyter Notebook you're going to analyze the **air quality in Madrid** from **January 2018** to **November 2019**, by using the **Spark skills** you've learned during the **Spark Course**.

Data set used is coming from the [Madrid Town's Open Data Portal](https://datos.madrid.es/), specifically, from the [Real-time Air Quality](https://datos.madrid.es/portal/site/egob/menuitem.c05c1f754a33a9fbe4b2e4b284f1a5a0/?vgnextoid=41e01e007c9db410VgnVCM2000000c205a0aRCRD&vgnextchannel=374512b9ace9f310VgnVCM100000171f5a0aRCRD&vgnextfmt=default) section. **Documentation on the data set is available** in this [link](https://datos.madrid.es/FWProjects/egob/Catalogo/MedioAmbiente/Aire/Ficheros/Interprete_ficheros_%20calidad_%20del_%20aire_global.pdf), although *it's in Spanish*. The document will be used as a reference and *translated as needed*.

## 1. PySpark environment setup

In [22]:
import findspark
findspark.init()

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType
from pyspark.sql import Row
from pyspark.sql.functions import upper, col

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

## 2. Data source and Spark data abstraction (DataFrame) setup
To-do:
- Download file `airqualitydata_madrid1819.zip` file from "Additional Documentation" to your laptop.
- Unzip the downloaded file. This creates a folder called `airqualitydata` in your laptop containing *23 CSV files* (Dec'19 is not in there).
- Create a folder (ex. `airqualitydata`) in Jupyter.
- Go into that folder and upload the 23 files.
- Create a **custom schema** manually to translate CSV files headers:
  - PROVINCE => `IntegerType`
  - TOWN => `IntegerType`
  - STATION => `IntegerType`
  - MAGNITUDE => `IntegerType`
  - SAMPLING_LOCATION => `StringType`
  - YEAR => `IntegerType`
  - MONTH => `IntegerType`
  - DAY => `IntegerType`
  - H01 => `DoubleType`
  - V01 => `StringType`
  - H02 => `DoubleType`
  - V02 => `StringType`
  - H03 => `DoubleType`
  - V03 => `StringType`
  - H04 => `DoubleType`
  - V04 => `StringType`
  - H05 => `DoubleType`
  - V05 => `StringType`
  - H06 => `DoubleType`
  - V06 => `StringType`
  - H07 => `DoubleType`
  - V07 => `StringType`
  - H08 => `DoubleType`
  - V08 => `StringType`
  - H09 => `DoubleType`
  - V09 => `StringType`
  - H10 => `DoubleType`
  - V10 => `StringType`
  - H11 => `DoubleType`
  - V11 => `StringType`
  - H12 => `DoubleType`
  - V12 => `StringType`
  - H13 => `DoubleType`
  - V13 => `StringType`
  - H14 => `DoubleType`
  - V14 => `StringType`
  - H15 => `DoubleType`
  - V15 => `StringType`
  - H16 => `DoubleType`
  - V16 => `StringType`
  - H17 => `DoubleType`
  - V17 => `StringType`
  - H18 => `DoubleType`
  - V18 => `StringType`
  - H19 => `DoubleType`
  - V19 => `StringType`
  - H20 => `DoubleType`
  - V20 => `StringType`
  - H21 => `DoubleType`
  - V21 => `StringType`
  - H22 => `DoubleType`
  - V22 => `StringType`
  - H23 => `DoubleType`
  - V23 => `StringType`
  - H24 => `DoubleType`
  - V24 => `StringType`


In [9]:
aqSchema = StructType(\
                     [StructField("PROVINCE", IntegerType(),True),\
                     StructField("TOWN", IntegerType(),True),\
                     StructField("STATION", IntegerType(),True),\
                     StructField("MAGNITUDE", IntegerType(),True),\
                     StructField("SAMPLING_LOCATION", StringType(),True),\
                     StructField("YEAR", IntegerType(),True),\
                     StructField("MONTH", IntegerType(),True),\
                     StructField("DAY", IntegerType(),True),\
                     StructField("H01", DoubleType(),True),\
                     StructField("V01", StringType(),True),\
                     StructField("H02", DoubleType(),True),\
                     StructField("V02", StringType(),True),\
                     StructField("H03", DoubleType(),True),\
                     StructField("V03", StringType(),True),\
                     StructField("H04", DoubleType(),True),\
                     StructField("V04", StringType(),True),\
                     StructField("H05", DoubleType(),True),\
                     StructField("V05", StringType(),True),\
                     StructField("H06", DoubleType(),True),\
                     StructField("V06", StringType(),True),\
                     StructField("H07", DoubleType(),True),\
                     StructField("V07", StringType(),True),\
                     StructField("H08", DoubleType(),True),\
                     StructField("V08", StringType(),True),\
                     StructField("H09", DoubleType(),True),\
                     StructField("V09", StringType(),True),\
                     StructField("H10", DoubleType(),True),\
                     StructField("V10", StringType(),True),\
                     StructField("H11", DoubleType(),True),\
                     StructField("V11", StringType(),True),\
                     StructField("H12", DoubleType(),True),\
                     StructField("V12", StringType(),True),\
                     StructField("H13", DoubleType(),True),\
                     StructField("V13", StringType(),True),\
                     StructField("H14", DoubleType(),True),\
                     StructField("V14", StringType(),True),\
                     StructField("H15", DoubleType(),True),\
                     StructField("V15", StringType(),True),\
                     StructField("H16", DoubleType(),True),\
                     StructField("V16", StringType(),True),\
                     StructField("H17", DoubleType(),True),\
                     StructField("V17", StringType(),True),\
                     StructField("H18", DoubleType(),True),\
                     StructField("V18", StringType(),True),\
                     StructField("H19", DoubleType(),True),\
                     StructField("V19", StringType(),True),\
                     StructField("H20", DoubleType(),True),\
                     StructField("V20", StringType(),True),\
                     StructField("H21", DoubleType(),True),\
                     StructField("V21", StringType(),True),\
                     StructField("H22", DoubleType(),True),\
                     StructField("V22", StringType(),True),\
                     StructField("H23", DoubleType(),True),\
                     StructField("V23", StringType(),True),\
                     StructField("H24", DoubleType(),True),\
                     StructField("V24", StringType(),True),\
                     ])

In [47]:
aqDF = spark.read.schema(aqSchema).option("header","true").option("sep", ";").csv("../Session 8/Data/*.csv")

- Use the `read` method to create the DataFrame (called `aqDF`).
- Display the first Row of the DataFrame and the number of rows in the DataFrame.


Hints:
- `read` method/function can handle multiple CSV files inside a folder at once (see [this](https://stackoverflow.com/questions/37639956/how-to-import-multiple-csv-files-in-a-single-load))
- Pay attention to the columns delimiter. Fields are not always separated by commas and that has to be handled accordingly via options (see [this](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=csv#pyspark.sql.DataFrameReader.csv))
- In the end you should get **56 fields/columns** and **105,050 rows**.

In [12]:
#aqDF.columns

In [48]:
aqDF.first()

Row(PROVINCE=28, TOWN=79, STATION=4, MAGNITUDE=1, SAMPLING_LOCATION='28079004_1_38', YEAR=2019, MONTH=10, DAY=1, H01=10.0, V01='V', H02=10.0, V02='V', H03=9.0, V03='V', H04=9.0, V04='V', H05=9.0, V05='V', H06=9.0, V06='V', H07=9.0, V07='V', H08=12.0, V08='V', H09=15.0, V09='V', H10=19.0, V10='V', H11=16.0, V11='V', H12=13.0, V12='V', H13=10.0, V13='V', H14=8.0, V14='V', H15=6.0, V15='V', H16=7.0, V16='V', H17=7.0, V17='V', H18=7.0, V18='V', H19=7.0, V19='V', H20=7.0, V20='V', H21=7.0, V21='V', H22=8.0, V22='V', H23=8.0, V23='V', H24=9.0, V24='V')

In [18]:
len(aqDF.columns)

56

In [14]:
aqDF.count()

105050

- Create a DataFrame manually (call it `magnitudesDF`) with the contents in the table **"Magnitudes, units and measurement techniques"**, available in **page 5/Appendix II** from the [aforementioned documentation](https://datos.madrid.es/FWProjects/egob/Catalogo/MedioAmbiente/Aire/Ficheros/Interprete_ficheros_%20calidad_%20del_%20aire_global.pdf).


In [21]:
magnitudes = [
    Row(1, "Sulphur dioxide", "SO2", "ug/m^3", 38, "Ultraviolet fluorescence"),
    Row(6, "Carbon monoxide", "CO2", "ug/m^3", 48, "Infrared absorption"),
    Row(7, "Nitrogen monoxide", "NO", "ug/m^3", 8, "Chemiluminescence"),
    Row(8, "Nitrogen dioxide", "NO2", "ug/m^3", 8, "Id."),
    Row(9, "Particles", "PM2.5", "ug/m^3", 47, "Ultraviolet fluorescence"),
    Row(10, "Particles", "PM10", "ug/m^3", 47, "Id."),
    Row(12, "Nitrogen oxides", "NOx", "ug/m^3", 8, "Ultraviolet fluorescence"),
    Row(14, "Ozone", "O3", "ug/m^3", 6, "Ultraviolet fluorescence"),
    Row(20, "Toluene", "TOL", "ug/m^3", 59, "Ultraviolet fluorescence"),
    Row(30, "Benzene", "BEN", "ug/m^3", 59, "Id."),
    Row(35, "Ethylbenzene", "EBE", "ug/m^3", 59, "Id."),
    Row(37, "Mexaxylene", "MXY", "ug/m^3", 59, "Id."),
    Row(38, "Paraxylene", "PXY", "ug/m^3", 59, "Id."),
    Row(39, "Orthoxylene", "OXY", "ug/m^3", 59, "Id."),
    Row(42, "Total hydrocarbons (hexane)", "TCH", "ug/m^3", 2, "Ultraviolet fluorescence"),
    Row(43, "Methane", "CH4", "ug/m^3", 2, "Id."),
    Row(44, "Non-methane hydrocarbons (hexane)", "NMHC", "ug/m^3", 2, "Id."),
]
magnitudesSchema = StructType([
    StructField("magnitudeId", IntegerType(),False),
    StructField("magnitudeName", StringType(),True),
    StructField("formula", StringType(),True),
    StructField("unit", StringType(),True),
    StructField("techniqueId", IntegerType(),True),
    StructField("techniqueName", StringType(),True)  
])
magnitudesDf = spark.createDataFrame(magnitudes,magnitudesSchema)
magnitudesDf.show()

+-----------+--------------------+-------+------+-----------+--------------------+
|magnitudeId|       magnitudeName|formula|  unit|techniqueId|       techniqueName|
+-----------+--------------------+-------+------+-----------+--------------------+
|          1|     Sulphur dioxide|    SO2|ug/m^3|         38|Ultraviolet fluor...|
|          6|     Carbon monoxide|    CO2|ug/m^3|         48| Infrared absorption|
|          7|   Nitrogen monoxide|     NO|ug/m^3|          8|   Chemiluminescence|
|          8|    Nitrogen dioxide|    NO2|ug/m^3|          8|                 Id.|
|          9|           Particles|  PM2.5|ug/m^3|         47|Ultraviolet fluor...|
|         10|           Particles|   PM10|ug/m^3|         47|                 Id.|
|         12|     Nitrogen oxides|    NOx|ug/m^3|          8|Ultraviolet fluor...|
|         14|               Ozone|     O3|ug/m^3|          6|Ultraviolet fluor...|
|         20|             Toluene|    TOL|ug/m^3|         59|Ultraviolet fluor...|
|   

- Create a Dataframe called `enrichedAqDF` combining `aqDF` and `magnitudesDF`, which displays all the columns in `aqDF`but replacing column `MAGNITUDE` by column `magnitudeName`available in `magnitudesDF`, and removing all pair columns from `H01`, `V01` to `H24`, `V24`.

In [49]:
magnitudesDF = magnitudesDf.select("magnitudeId", "magnitudeName")
magnitudesDF.show()

+-----------+--------------------+
|magnitudeId|       magnitudeName|
+-----------+--------------------+
|          1|     Sulphur dioxide|
|          6|     Carbon monoxide|
|          7|   Nitrogen monoxide|
|          8|    Nitrogen dioxide|
|          9|           Particles|
|         10|           Particles|
|         12|     Nitrogen oxides|
|         14|               Ozone|
|         20|             Toluene|
|         30|             Benzene|
|         35|        Ethylbenzene|
|         37|          Mexaxylene|
|         38|          Paraxylene|
|         39|         Orthoxylene|
|         42|Total hydrocarbon...|
|         43|             Methane|
|         44|Non-methane hydro...|
+-----------+--------------------+



In [50]:
aqDF = aqDF.join(magnitudesDF,aqDF.MAGNITUDE == magnitudesDF.magnitudeId,how='left')

In [51]:
aqDF = aqDF.withColumnRenamed("magnitudeName","MAGNITUDENAME")

In [52]:
aqDF = aqDF.drop("MAGNITUDE","magnitudeId","H01","V01","H02","V02","H03","V03","H04","V04","H05","V05","H06","V06","H07",
                "V07","H08","V08","H09","V09","H10","V10","H11","V11","H12","V12","H13","V13"
                ,"H14","V14","H15","V15","H16","V16","H17","V17","H18","V18","H19","V19","H20",
                "V20","H21","V21","H22","V22","H23","V23","H24","V24").show()

+--------+----+-------+-----------------+----+-----+---+--------------------+
|PROVINCE|TOWN|STATION|SAMPLING_LOCATION|YEAR|MONTH|DAY|       MAGNITUDENAME|
+--------+----+-------+-----------------+----+-----+---+--------------------+
|      28|  79|      8|    28079008_44_2|2019|   10|  1|Non-methane hydro...|
|      28|  79|      8|    28079008_44_2|2019|   10|  2|Non-methane hydro...|
|      28|  79|      8|    28079008_44_2|2019|   10|  3|Non-methane hydro...|
|      28|  79|      8|    28079008_44_2|2019|   10|  4|Non-methane hydro...|
|      28|  79|      8|    28079008_44_2|2019|   10|  5|Non-methane hydro...|
|      28|  79|      8|    28079008_44_2|2019|   10|  6|Non-methane hydro...|
|      28|  79|      8|    28079008_44_2|2019|   10|  7|Non-methane hydro...|
|      28|  79|      8|    28079008_44_2|2019|   10|  8|Non-methane hydro...|
|      28|  79|      8|    28079008_44_2|2019|   10|  9|Non-methane hydro...|
|      28|  79|      8|    28079008_44_2|2019|   10| 10|Non-meth

In [54]:
aqDF.select(["PROVINCE","TOWN","STATION","MAGNITUDENAME","SAMPLING_LOCATION","YEAR","MONTH","DAY"]).show()

+--------+----+-------+--------------------+-----------------+----+-----+---+
|PROVINCE|TOWN|STATION|       MAGNITUDENAME|SAMPLING_LOCATION|YEAR|MONTH|DAY|
+--------+----+-------+--------------------+-----------------+----+-----+---+
|      28|  79|      8|Non-methane hydro...|    28079008_44_2|2019|   10|  1|
|      28|  79|      8|Non-methane hydro...|    28079008_44_2|2019|   10|  2|
|      28|  79|      8|Non-methane hydro...|    28079008_44_2|2019|   10|  3|
|      28|  79|      8|Non-methane hydro...|    28079008_44_2|2019|   10|  4|
|      28|  79|      8|Non-methane hydro...|    28079008_44_2|2019|   10|  5|
|      28|  79|      8|Non-methane hydro...|    28079008_44_2|2019|   10|  6|
|      28|  79|      8|Non-methane hydro...|    28079008_44_2|2019|   10|  7|
|      28|  79|      8|Non-methane hydro...|    28079008_44_2|2019|   10|  8|
|      28|  79|      8|Non-methane hydro...|    28079008_44_2|2019|   10|  9|
|      28|  79|      8|Non-methane hydro...|    28079008_44_2|20