# ETL of air pollution time series
## The deliverables
Data set containing all available recordings of hourly averaged pollutant concentrations measured in Hamburg in years 2013-2019

In [5]:
import urllib.request
import xml.etree.ElementTree as ET
from lxml import etree
import pandas as pd
import numpy as np

import re, collections
from io import StringIO
import os, fnmatch

import matplotlib.pyplot as plt

import geopandas as gpd
import mplleaflet

%matplotlib inline

In [None]:
## Download and decompress the dataset (2019) itself:
#!mkdir Correlaid.rawData
#!mkdir Correlaid.rawData/AQD_DE_E1a_2019
#!ls -l Correlaid.rawData/
#urllib.request.urlretrieve("https://datahub.uba.de/server/rest/directories/arcgisforinspire/INSPIRE/aqd_MapServer/Daten/AQD_DE_E1a_2019.zip", "Correlaid.rawData/AQD_DE_E1a_2019.zip")
#!mv Correlaid.rawData/AQD_DE_E1a_2019.zip Correlaid.rawData/AQD_DE_E1a_2019/
#!unzip Correlaid.rawData/AQD_DE_E1a_2019/AQD_DE_E1a_2019.zip -d Correlaid.rawData/
#!rm Correlaid.rawData/AQD_DE_E1a_2019/AQD_DE_E1a_2019.zip
#!unzip Correlaid.rawData/DISKO.zip -d Correlaid.rawData/AQD_DE_E1a_2019/
#!unzip Correlaid.rawData/KONTI.zip -d Correlaid.rawData/AQD_DE_E1a_2019/
#!rm Correlaid.rawData/DISKO.zip Correlaid.rawData/KONTI.zip

#Download the rdf
#urllib.request.urlretrieve("https://www.govdata.de/ckan/dataset/luftqualitatsdaten-datenstrom-e1a-validierte-einzelwerte-2019-datensatz.rdf", "Correlaid.rawData/AQD_DE_E1a_2019/luftqualitatsdaten-datenstrom-e1a-validierte-einzelwerte-2019-datensatz.rdf")

#Download Sensor positions
#urllib.request.urlretrieve("https://datahub.uba.de/server/rest/directories/arcgisforinspire/INSPIRE/aqd_MapServer/Daten/AQD_DE_D_2019.zip", "Correlaid.rawData/AQD_DE_D_2019.zip")
#!unzip Correlaid.rawData/AQD_DE_D_2019.zip -d Correlaid.rawData/
#!rm Correlaid.rawData/AQD_DE_D_2019.zip

# Download Town-county dataset:
#urllib.request.urlretrieve("https://www.destatis.de/DE/Themen/Laender-Regionen/Regionales/Gemeindeverzeichnis/Administrativ/Archiv/GV100ADQ/GV100AD3107.zip?__blob=publicationFile",
#                           "Correlaid.rawData/GV100AD3107.zip")
#!mkdir Correlaid.rawData/GV100AD3107
#!unzip Correlaid.rawData/GV100AD3107.zip -d Correlaid.rawData/GV100AD3107/
#!rm Correlaid.rawData/GV100AD3107.zip

#!mkdir Correlaid.rawData/Geo
#urllib.request.urlretrieve("https://biogeo.ucdavis.edu/data/diva/adm/DEU_adm.zip", "Correlaid.rawData/Geo/DEU_adm.zip" 
#!unzip Correlaid.rawData/Geo/DEU_adm.zip -d Correlaid.rawData/Geo/

#!ls -la Correlaid.rawData/
#!ls -la Correlaid.rawData/AQD_DE_E1a_2019/
#!ls -la Correlaid.rawData/GV100AD3107/

#!pwd

In [106]:
#Download and decompress the dataset (2018) itself:
#!mkdir Correlaid.rawData/AQD_DE_E1a_2018
#!ls -l Correlaid.rawData/
#urllib.request.urlretrieve("https://datahub.uba.de/server/rest/directories/arcgisforinspire/INSPIRE/aqd_MapServer/Daten/AQD_DE_E1a_2018.zip", "Correlaid.rawData/AQD_DE_E1a_2018.zip")
#!mv Correlaid.rawData/AQD_DE_E1a_2018.zip Correlaid.rawData/AQD_DE_E1a_2018/
#!unzip Correlaid.rawData/AQD_DE_E1a_2018/AQD_DE_E1a_2018.zip -d Correlaid.rawData/
#!rm Correlaid.rawData/AQD_DE_E1a_2018/AQD_DE_E1a_2018.zip
#!mv Correlaid.rawData/E1a/* Correlaid.rawData/AQD_DE_E1a_2018/
#!rm -rf Correlaid.rawData/E1a

In [11]:
def etl_concentrations_timeseries_from_file(input_file):
    # pick all tags from the XML file
    Etree = ET.parse(input_file)
    Eroot = Etree.getroot()
    Eroot.tag
    Eroot.attrib
    AllTags = [elem.tag for elem in Eroot.iter()]
  
    varFull = [s for s in AllTags if 'value' in s][0]
    ColNamesExp = [re.sub(r'[^a-zA-Z0-9:]*\'{http(.*)$', r'', re.sub(r'^.*AQD\/SPO.DE_', r'', str(varr.attrib))) for varr in Eroot.iter(varFull) if 'AQD' in str(varr.attrib)] 

    
    varFull = [s for s in AllTags if 'values' in s][0]

    dff=[]
    for varr in Eroot.iter(varFull):
        dff.append(pd.read_csv(StringIO((varr.text).replace("@@","\n")), sep=",", header=None))
    
    out_df=pd.concat([dff[s][4] for s in range(0,len(dff))], axis=1)
    out_df.columns=ColNamesExp
    out_df.insert(loc=0, column="observation_period", value=dff[0][0])
    return(out_df)    

def etl_concentrations_timeseries_from_dir_and_mask(input_dir, file_mask):
    files_hour = []
    for file in os.listdir(input_dir):
        if fnmatch.fnmatch(file, file_mask):
            files_hour.append(file)

    # pick all tags from the XML file
    Etree = ET.parse(input_dir + files_hour[0])
    Eroot = Etree.getroot()
    Eroot.tag
    Eroot.attrib
    AllTags = [elem.tag for elem in Eroot.iter()]

#    ColNamesExp=SelectAllXMLsensorID(AllTags)
    varFull = [s for s in AllTags if 'values' in s][0]

    dff=[]
    for varr in Eroot.iter(varFull):
        dff.append(pd.read_csv(StringIO((varr.text).replace("@@","\n")), sep=",", header=None))

    out_df = dff[0][[0]]
    out_df.columns=['observation_period']

# get all tags in xml file; Note, that the actual data is kept as a TEXT of *values* tags 
    for file in files_hour:
        Etree = ET.parse(input_dir + file)
        Eroot = Etree.getroot()
        Eroot.tag
        Eroot.attrib
        AllTags = [elem.tag for elem in Eroot.iter()]
           
        varFull = [s for s in AllTags if 'value' in s][0]
        ColNamesExp = [re.sub(r'[^a-zA-Z0-9:]*\'{http(.*)$', r'', re.sub(r'^.*AQD\/SPO.DE_', r'', str(varr.attrib))) for varr in Eroot.iter(varFull) if 'AQD' in str(varr.attrib)] 

        
        # Compare column names with file names, they should encode same country, state and pollutant
        for ColName in ColNamesExp:
            if ((ColName[0:2]!=file[0:2]) or (ColName[2:4]!=file[3:5]) or (ColName[8:11]!=file[11:14])):
                print("Inconsistency in file and column names: ", file, ColName)
                exit()
    
        varFull = [s for s in AllTags if 'values' in s][0]
    
        dff=[] # Temporary list for DataFrames
        # reading actual pollutant data fiom the text field:    
        for varr in Eroot.iter(varFull):
            dff.append(pd.read_csv(StringIO((varr.text).replace("@@","\n")), sep=",", header=None))

        # checking, that measurment timestamps are identical in the files read    
        bad_s = []
        for s in range(0,len(dff)):
            if not (out_df['observation_period']).equals(dff[s][0]):
                print("Inconsistency of observation times in the following files: ", file, files_hour[0])
                print(out_df['observation_period'])
                print(dff[s][0])
                print(s)
                print(ColNamesExp[s])
                bad_s.append(s) 
                
#                exit()

        for s in bad_s:
            del ColNamesExp[s]
            del dff[s]

        # select column 4 - pollutant concentration:
        dff=pd.concat([dff[s][4] for s in range(0,len(dff))], axis=1)
        dff.columns=ColNamesExp
   
        out_df=pd.concat([out_df, dff], axis=1)    
#    out_df=pd.concat([dff[s][4] for s in range(0,len(dff))], axis=1)
#    out_df.columns=ColNamesExp
#    out_df.insert(loc=0, column="observation_period", value=dff[0][0])
    return(out_df)    

In [93]:
wide_df19 = etl_concentrations_timeseries_from_file("Correlaid.rawData/AQD_DE_E1a_2019/DE_HH_2019_hour.xml")

Now we have wide data frame, containing timeseries of all pollutant concentrations for all sensors. The pollutant type and the sensor ID are encoded in column names. The minimal value of pollutant concentrations -999.0 is equivalent to NA and will be imputted, as well as all negative values (the concentration can not be negative). The limit for imputation will be set to 876, i.e. NA sequences exceeding 10% of the year will not be imputted. Since the number of heavily corrupted columns is below 2%, they will be dropped in favor to the information quality:

In [94]:
wide_df19.head(5)

Unnamed: 0,observation_period,DEHH068_CHB_dataGroup1,DEHH070_CHB_dataGroup1,DEHH008_NO2_dataGroup1,DEHH015_NO2_dataGroup1,DEHH016_NO2_dataGroup1,DEHH026_NO2_dataGroup1,DEHH033_NO2_dataGroup1,DEHH047_NO2_dataGroup1,DEHH050_NO2_dataGroup1,...,DEHH016_SO2_dataGroup1,DEHH059_SO2_dataGroup1,DEHH079_SO2_dataGroup1,DEHH081_SO2_dataGroup1,DEHH008_PM2_dataGroup1,DEHH015_PM2_dataGroup1,DEHH033_PM2_dataGroup1,DEHH059_PM2_dataGroup1,DEHH064_PM2_dataGroup1,DEHH068_PM2_dataGroup1
0,2019-01-01T00:00:00+01:00,2.182,0.977,23.896,16.787,13.292,30.217,14.883,13.441,10.037,...,2.5,2.5,2.5,2.5,98.733,116.412,51.636,88.387,216.47,602.38
1,2019-01-01T01:00:00+01:00,0.693,0.773,13.698,11.791,16.222,19.486,6.349,6.496,4.0,...,2.5,2.5,2.5,9.531,33.534,96.405,75.457,65.468,161.832,80.708
2,2019-01-01T02:00:00+01:00,0.454,0.675,7.991,6.998,15.669,12.586,6.243,4.708,2.0,...,2.5,2.5,2.5,11.27,24.592,25.195,15.651,13.072,18.958,36.882
3,2019-01-01T03:00:00+01:00,0.2,-999.0,7.322,5.273,14.999,12.025,4.714,4.13,2.0,...,2.5,2.5,2.5,2.5,22.92,16.258,11.641,12.416,13.909,36.853
4,2019-01-01T04:00:00+01:00,-999.0,-999.0,6.211,5.665,13.821,9.234,5.18,2.0,2.0,...,5.926,2.5,2.5,2.5,30.757,19.862,15.598,17.161,17.068,47.537


In [15]:
def clean_wide_df(df):
    out_df = df.copy()
    df_observation_period = df["observation_period"]
    out_df.drop(["observation_period"], axis=1, inplace=True)
    out_df[out_df.loc[:, out_df.columns != 'observation_period'] < 0.0] = np.NaN # concentration cannot be negative
    out_df.interpolate(method='linear', inplace=True, axis=0, limit=876, limit_direction='both')
    out_df.insert(loc=0, column="observation_period", value=pd.to_datetime(df_observation_period))
    return(out_df)

In [102]:
clean_df19 = clean_wide_df(wide_df19)

In [17]:
wide_df18 = etl_concentrations_timeseries_from_dir_and_mask("Correlaid.rawData/AQD_DE_E1a_2018/", "DE_HH*hour*")

Inconsistency of observation times in the following files:  DE_HH_2018_PM2_hour.xml DE_HH_2018_O3_hour.xml
0       2018-01-01T00:00:00+01:00
1       2018-01-01T01:00:00+01:00
2       2018-01-01T02:00:00+01:00
3       2018-01-01T03:00:00+01:00
4       2018-01-01T04:00:00+01:00
                  ...            
8755    2018-12-31T19:00:00+01:00
8756    2018-12-31T20:00:00+01:00
8757    2018-12-31T21:00:00+01:00
8758    2018-12-31T22:00:00+01:00
8759    2018-12-31T23:00:00+01:00
Name: observation_period, Length: 8760, dtype: object
0       2018-04-01T00:00:00+01:00
1       2018-04-01T01:00:00+01:00
2       2018-04-01T02:00:00+01:00
3       2018-04-01T03:00:00+01:00
4       2018-04-01T04:00:00+01:00
                  ...            
5851    2018-11-30T19:00:00+01:00
5852    2018-11-30T20:00:00+01:00
5853    2018-11-30T21:00:00+01:00
5854    2018-11-30T22:00:00+01:00
5855    2018-11-30T23:00:00+01:00
Name: 0, Length: 5856, dtype: object
2
DEHH033_PM2_dataGroup1


In [18]:
clean_df18 = clean_wide_df(wide_df18)

In [19]:
clean_df18

Unnamed: 0,observation_period,DEHH008_O3_dataGroup1,DEHH033_O3_dataGroup1,DEHH047_O3_dataGroup1,DEHH050_O3_dataGroup1,DEHH008_NO2_dataGroup1,DEHH015_NO2_dataGroup1,DEHH016_NO2_dataGroup1,DEHH026_NO2_dataGroup1,DEHH033_NO2_dataGroup1,...,DEHH033_CO_dataGroup1,DEHH068_CO_dataGroup1,DEHH070_CO_dataGroup1,DEHH008_PM2_dataGroup1,DEHH015_PM2_dataGroup1,DEHH059_PM2_dataGroup1,DEHH064_PM2_dataGroup1,DEHH068_PM2_dataGroup1,DEHH068_CHB_dataGroup1,DEHH070_CHB_dataGroup1
0,2018-01-01 00:00:00+01:00,5.733,48.233,10.995,50.542,80.490,34.222,11.322,77.654,27.589,...,0.21172,0.43220,0.37622,549.640,162.097,99.554,488.917,743.743,2.415,1.995
1,2018-01-01 01:00:00+01:00,53.963,57.160,37.245,75.750,28.161,22.282,11.840,31.900,19.627,...,0.20650,0.38507,0.38644,84.099,247.070,148.521,369.854,127.531,0.953,1.363
2,2018-01-01 02:00:00+01:00,73.001,73.601,70.628,79.439,10.369,7.401,5.179,18.802,5.665,...,0.10000,0.23405,0.25444,22.791,56.550,26.843,21.878,21.889,0.560,0.731
3,2018-01-01 03:00:00+01:00,67.785,72.184,71.419,76.205,13.777,6.856,4.511,19.693,5.671,...,0.10000,0.25125,0.24649,19.285,10.776,15.367,11.376,14.496,0.475,0.667
4,2018-01-01 04:00:00+01:00,61.937,69.518,68.152,74.942,18.196,5.681,2.000,23.576,5.711,...,0.10000,0.20816,0.28465,19.095,8.729,9.942,12.469,12.429,0.200,0.543
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
8755,2018-12-31 19:00:00+01:00,43.933,54.628,52.422,55.125,15.400,13.197,8.648,25.456,6.922,...,0.10000,0.30992,0.32258,10.605,11.059,15.343,19.100,12.254,0.881,0.809
8756,2018-12-31 20:00:00+01:00,47.734,50.970,50.816,56.675,11.987,12.335,11.623,21.631,9.540,...,0.10000,0.27090,0.24942,16.148,19.236,19.265,13.946,13.029,0.574,0.768
8757,2018-12-31 21:00:00+01:00,47.771,54.388,51.727,52.691,10.930,11.573,16.141,16.907,7.086,...,0.10000,0.21563,0.27449,21.532,29.982,51.946,14.733,24.327,0.536,0.733
8758,2018-12-31 22:00:00+01:00,51.835,59.942,57.694,58.781,10.401,11.521,13.631,16.352,5.245,...,0.10000,0.10000,0.25466,16.163,16.609,34.029,16.310,22.890,0.452,0.666
