In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext

As soon as we will do most of the work on the cluster, here we will show how we deal with data on a small subsample (`data_min`)

In [2]:
# create the session
spark = SparkSession.builder.getOrCreate()

# create the context
sc = spark.sparkContext

In [3]:
from pyspark.sql.types import *

Load the data according to a schema and delete the columns we do not need:

In [4]:
schema = StructType([StructField('Country', StringType(), True),
                     StructField('Namespace', StringType(), True),
                     StructField('AirQualityNetwork', StringType(), True),
                     StructField('AirQualityStation', StringType(), True),
                     StructField('EoICode', StringType(), True),
                     StructField('SamplingPoint', StringType(), True),
                     StructField('SamplingProcess', StringType(), True),
                     StructField('Sample', StringType(), True),
                     StructField('Pollutant', StringType(), True),
                     StructField('AirPollutantCode', StringType(), True),
                     StructField('AveragingTime', StringType(), True),
                     StructField('Concentration', FloatType(), True),
                     StructField('Unit', StringType(), True),
                     StructField('DatetimeBegin', TimestampType(), True),
                     StructField('DatetimeEnd', TimestampType(), True),
                     StructField('Validity', IntegerType(), True),
                     StructField('Verification', IntegerType(), True)])

In [5]:
df = spark.read.format("com.databricks.spark.csv").option("inferSchema", "true") \
.option("timestampFormat", "yyyy-mm-dd hh:mm:ss XXX") \
.csv('./data_min/*/*', schema=schema, header=True) \
.drop("Namespace", "AirQualityStation", "AirQualityNetwork", "SamplingPoint", "SamplingProcess", "Sample", "AirPollutantCode")

Now the schema is the following:

| Field         | Type      | Description                                                    |
|---------------|-----------|----------------------------------------------------------------|
| Country       | String    | 2-letter country code                                          |
| EoICode       | String    | Unique station identifier                                      |
| Pollutant     | String    | Short name of pollutant                                        |
| AveragingTime | String    | Time for which the measurement has been taken (hour, day, etc) |
| Concentration | Float     | The measured value/concentration                               |
| Unit          | String    | Defines the unit of the concentration                          |
| DatetimeBegin | Timestamp | Start time (yyyy-mm-dd hh:mm:ss Z) of the measurement          |
| DatetimeEnd   | Timestamp | End time (yyyy-mm-dd hh:mm:ss Z) of the measurement            |
| Validity      | Integer   | The validity flag for the measurement                          |
| Verification  | Integer   | The verification flag for the measurement.                     |

In [8]:
df.show(10)

+-------+-------+---------+-------------+-------------+-----+-------------------+-------------------+--------+------------+
|Country|EoICode|Pollutant|AveragingTime|Concentration| Unit|      DatetimeBegin|        DatetimeEnd|Validity|Verification|
+-------+-------+---------+-------------+-------------+-----+-------------------+-------------------+--------+------------+
|     GB|GB0906A|       CO|         hour|     0.038925|mg/m3|2018-01-01 00:00:00|2018-01-01 01:00:00|       2|           1|
|     GB|GB0906A|       CO|         hour|     0.048175|mg/m3|2018-01-01 01:00:00|2018-01-01 02:00:00|       2|           1|
|     GB|GB0906A|       CO|         hour|     0.048175|mg/m3|2018-01-01 02:00:00|2018-01-01 03:00:00|       2|           1|
|     GB|GB0906A|       CO|         hour|     0.038911|mg/m3|2018-01-01 03:00:00|2018-01-01 04:00:00|       2|           1|
|     GB|GB0906A|       CO|         hour|     0.052807|mg/m3|2018-01-01 04:00:00|2018-01-01 05:00:00|       2|           1|
|     GB

*Validity* can take 5 different values:  
**-99** - Not valid due to station maintenance or calibration  
**-1** - Not valid  
**1** - Valid  
**2** - Valid, but below detection limit measurement value given  
**3** - Valid, but below detection limit and number replaced by 0.5 * detection_limit

*Verificaton* can take 3 different values:  
**1** - Verified  
**2** - Preliminary verified  
**3** - Not verified

Total number of lines:

In [9]:
df.count()

323784

We found one type of missing and incorrect values which is not covered by validity-verification pair: measurement value below zero.

In [14]:
df.select("*").where(df.Concentration < 0).show(5)

+-------+-------+---------+-------------+-------------+-----+-------------------+-------------------+--------+------------+
|Country|EoICode|Pollutant|AveragingTime|Concentration| Unit|      DatetimeBegin|        DatetimeEnd|Validity|Verification|
+-------+-------+---------+-------------+-------------+-----+-------------------+-------------------+--------+------------+
|     IE|IE0028A|      SO2|         hour|        -0.53|µg/m3|2018-01-24 08:00:00|2018-01-24 09:00:00|       1|           1|
|     IE|IE001CM|      SO2|         hour|       -0.266|µg/m3|2018-01-20 15:00:00|2018-01-20 16:00:00|       1|           1|
|     IE|IE001CM|      SO2|         hour|       -0.532|µg/m3|2018-01-20 16:00:00|2018-01-20 17:00:00|       1|           1|
|     IE|IE001CM|      SO2|         hour|       -0.266|µg/m3|2018-01-21 04:00:00|2018-01-21 05:00:00|       1|           1|
|     IE|IE001CM|      SO2|         hour|       -0.266|µg/m3|2018-01-16 15:00:00|2018-01-16 16:00:00|       1|           1|
+-------

Other inconsistencies, including `null` values are covered:

In [22]:
df.select("*").where(df.Concentration.isNull()).show(5)

+-------+-------+---------+-------------+-------------+-----+-------------------+-------------------+--------+------------+
|Country|EoICode|Pollutant|AveragingTime|Concentration| Unit|      DatetimeBegin|        DatetimeEnd|Validity|Verification|
+-------+-------+---------+-------------+-------------+-----+-------------------+-------------------+--------+------------+
|     GB|GB0906A|       CO|         hour|         null|mg/m3|2018-01-19 15:00:00|2018-01-19 16:00:00|      -1|           1|
|     GB|GB0906A|       CO|         hour|         null|mg/m3|2018-01-19 16:00:00|2018-01-19 17:00:00|      -1|           1|
|     GB|GB0906A|       CO|         hour|         null|mg/m3|2018-01-06 15:00:00|2018-01-06 16:00:00|      -1|           1|
|     GB|GB0906A|       CO|         hour|         null|mg/m3|2018-01-06 16:00:00|2018-01-06 17:00:00|      -1|           1|
|     GB|GB0906A|       CO|         hour|         null|mg/m3|2018-01-07 11:00:00|2018-01-07 00:00:00|      -1|           1|
+-------

So, we have to filter those values in each query.