
# Advanced Data Science Capstone

## Correlation of air pollution and Prevalence of Asthma bronchiale in Germany  

## ETL, Data cleansing

### The deliverables
The deliverables of the current stage:

 - current notebook as the process documentation
 - Spark data frame of the "wide" type, containing time series of pollutants concentrations for every available sensor
 - Spark data frame of the "long" type, containing time series of **selected** pollutants concentrations, **countyID** and a pollotant label
 - Spark data frame with disease prevalence column (Asthma bronchiale) and a county id
 
### ETL
 #### Data Sources
  -  The officially published data sets by **Geschäfts- und Koordinierungsstelle GovData**, the search engine is available at https://www.govdata.de/web/guest/suchen.
  - Data stream **E1a** contains measured (Link to Data stream **D**) values of gas phase pollutants (e.g. Ozone, NO2, SO2, CO), particle pollutants (e.g. dust) and dust constituants (e.g. heavy metals, PAK in PM10, PM2.5, TSP) as well es total deposition (BULK), wet deposition and meteorologic data (e.g. temperature, wind, pressure)for every measurement location.
  - The data for years 2013 - 2018 is currently available. For the project I will limit myself with 2016 data (due to limited availability of the health related data sets), however the method and the model are easily extendable for the data for other years.
  - Compressed dataset is available at https://datahub.uba.de/server/rest/directories/arcgisforinspire/INSPIRE/aqd_MapServer/Daten/AQD_DE_E1a_2016.zip .
 #### Data cleansing
  - The air quality data sets are claimed to be "validated", so most work for cleansing the data is already done.
  - The incomplete files from the datasets (not having "hour" in the name) are ignored.
  - Few missing values appearing in the time series as negative values of the pollutant concentrations will be imputed.
 
 #### Enterprise data storage
  - Saving Spark data frames in Parquet format

In [1]:
import urllib.request
import xml.etree.ElementTree as ET
from lxml import etree
import pandas as pd
import numpy as np

import re, collections
from io import StringIO
import os, fnmatch
#, fastparquet

import matplotlib.pyplot as plt

def SelectAllXMLsensorID():
    varFull = [s for s in AllTags if 'value' in s][0]
    return([re.sub(r'[^a-zA-Z0-9:]*\'{http(.*)$', r'', re.sub(r'^.*AQD\/SPO.DE_', r'', str(varr.attrib))) for varr in Eroot.iter(varFull) if 'AQD' in str(varr.attrib)]) 



Now the files with pollutant concentration time series for the given year will be loaded to the **dffAll** Pandas data frame of the **wide** format. During the load procedure **consistensy** of **files** and **column** names will be checked.

First, all the necessary files are downloaded:

In [2]:
!rm -rf ./Capstone.rawData
## Download and decompress the dataset itself:
!mkdir Capstone.rawData
#!ls -l Capstone.rawData/

##### Pollution 2016
!mkdir Capstone.rawData/AQD_DE_E1a_2016
urllib.request.urlretrieve("https://datahub.uba.de/server/rest/directories/arcgisforinspire/INSPIRE/aqd_MapServer/Daten/AQD_DE_E1a_2016.zip", "Capstone.rawData/AQD_DE_E1a_2016.zip")
!mv Capstone.rawData/AQD_DE_E1a_2016.zip Capstone.rawData/AQD_DE_E1a_2016/
!unzip Capstone.rawData/AQD_DE_E1a_2016/AQD_DE_E1a_2016.zip -d Capstone.rawData/
!rm Capstone.rawData/AQD_DE_E1a_2016/AQD_DE_E1a_2016.zip

##### Sensor locations 2016
urllib.request.urlretrieve("https://datahub.uba.de/server/rest/directories/arcgisforinspire/INSPIRE/aqd_MapServer/Daten/AQD_DE_D_2016.zip", "Capstone.rawData/AQD_DE_D_2016.zip")
!unzip Capstone.rawData/AQD_DE_D_2016.zip -d Capstone.rawData/
!rm Capstone.rawData/AQD_DE_D_2016.zip

##### Prevalence of Asthma bronchiale 2016 
!mkdir Capstone.rawData/Asthma_2016
urllib.request.urlretrieve("https://www.versorgungsatlas.de/fileadmin/excel/data_id_92_kreis11_1_j_1451606400.xlsx", "Capstone.rawData/Asthma_2016/data_id_92_kreis11_1_j_1451606400.xlsx")

##### Town-county dataset:
urllib.request.urlretrieve("https://www.destatis.de/DE/Themen/Laender-Regionen/Regionales/Gemeindeverzeichnis/Administrativ/Archiv/GV100ADQ/GV100AD3107.zip?__blob=publicationFile",
                           "Capstone.rawData/GV100AD3107.zip")
!mkdir Capstone.rawData/GV100AD3107
!unzip Capstone.rawData/GV100AD3107.zip -d Capstone.rawData/GV100AD3107/
!rm Capstone.rawData/GV100AD3107.zip

Archive:  Capstone.rawData/AQD_DE_E1a_2016/AQD_DE_E1a_2016.zip
  inflating: Capstone.rawData/AQD_DE_E1a_2016/DE_SH_2016_NO2_hour.xml  
  inflating: Capstone.rawData/AQD_DE_E1a_2016/DE_SH_2016_NOx_hour.xml  
  inflating: Capstone.rawData/AQD_DE_E1a_2016/DE_SH_2016_NO_hour.xml  
  inflating: Capstone.rawData/AQD_DE_E1a_2016/DE_SH_2016_O3_hour.xml  
  inflating: Capstone.rawData/AQD_DE_E1a_2016/DE_SH_2016_PM1_day.xml  
  inflating: Capstone.rawData/AQD_DE_E1a_2016/DE_SH_2016_PM1_hour.xml  
  inflating: Capstone.rawData/AQD_DE_E1a_2016/DE_SH_2016_PM2_day.xml  
  inflating: Capstone.rawData/AQD_DE_E1a_2016/DE_SH_2016_PM2_hour.xml  
  inflating: Capstone.rawData/AQD_DE_E1a_2016/DE_SH_2016_SO2_hour.xml  
  inflating: Capstone.rawData/AQD_DE_E1a_2016/DE_SL_2016_CO_hour.xml  
  inflating: Capstone.rawData/AQD_DE_E1a_2016/DE_SL_2016_NO2_hour.xml  
  inflating: Capstone.rawData/AQD_DE_E1a_2016/DE_SL_2016_NO_hour.xml  
  inflating: Capstone.rawData/AQD_DE_E1a_2016/DE_SL_2016_O3_hour.xml  
  inflat

In [2]:
AirE1aDir='Capstone.rawData/AQD_DE_E1a_2016/'

#!ls Capstone.rawData/AQD_DE_E1a_2016/*hour*
FilesHour=[]

for file in os.listdir(AirE1aDir):
    if fnmatch.fnmatch(file, '*hour*'):
        FilesHour.append(file)
print("Number of files in the dataset", len(FilesHour))

# shortening the process for debugging purposes
#FilesHour=FilesHour[0:3]        

dffAll=pd.DataFrame(index=range(0,8760))  # 8760 hours in the year

# add First column with Observation Times:
dff=[]  # Temporary list for DataFrames

file=FilesHour[0]
Etree = ET.parse(AirE1aDir+file)
Eroot = Etree.getroot()
Eroot.tag
Eroot.attrib
AllTags = [elem.tag for elem in Eroot.iter()]
varFull = [s for s in AllTags if 'values' in s][0]
for varr in Eroot.iter(varFull):
    dff.append(pd.read_csv(StringIO((varr.text).replace("@@","\n")), sep=",", header=None))
dffAll=pd.concat([dffAll, dff[0][[0]]], axis=1)
dffAll.columns=['observation_period']


# get all tags in xml file; Note, that the actual data is kept as a TEXT of *values* tags 
for file in FilesHour:
    Etree = ET.parse(AirE1aDir+file)
    Eroot = Etree.getroot()
    Eroot.tag
    Eroot.attrib
    AllTags = [elem.tag for elem in Eroot.iter()]
    
    ColNamesExp=SelectAllXMLsensorID()
# Compare column names with file names, they should encode same country, state and pollutant
    for ColName in ColNamesExp:
        if ((ColName[0:2]!=file[0:2]) or (ColName[2:4]!=file[3:5]) or (ColName[8:11]!=file[11:14])):
            print("Inconsistency in file and column names: ", file, ColName)
            exit()
    
    varFull = [s for s in AllTags if 'values' in s][0]
    
    dff=[] # Temporary list for DataFrames
# reading actual pollutant data fiom the text field:    
    for varr in Eroot.iter(varFull):
        dff.append(pd.read_csv(StringIO((varr.text).replace("@@","\n")), sep=",", header=None))

# checking, that measurment timestamps are identical in the files read       
    for s in range(0,len(dff)):
        if not (dffAll['observation_period']).equals(dff[s][0]):
            print("Inconsistency of observation times in the following files: ", file, FilesHour[0])
            exit()

        
# select column 4 - pollutant concentration:
    dff=pd.concat([dff[s][4] for s in range(0,len(dff))], axis=1)
    dff.columns=ColNamesExp
   
    dffAll=pd.concat([dffAll, dff], axis=1)

Number of files in the dataset 51


In [3]:
print("Memory usage: ", (dffAll.memory_usage(index=True).sum()/1048576.0), " MB")
dffAll.describe()

Memory usage:  35.25080871582031  MB


Unnamed: 0,DESL002_O3_dataGroup1,DESL003_O3_dataGroup1,DESL011_O3_dataGroup1,DESL012_O3_dataGroup1,DESL017_O3_dataGroup1,DESL018_O3_dataGroup1,DESL019_O3_dataGroup1,DESL020_O3_dataGroup1,DEST002_NOx_dataGroup1,DEST011_NOx_dataGroup1,...,DETH041_NOx_dataGroup1,DETH042_NOx_dataGroup1,DETH043_NOx_dataGroup1,DETH060_NOx_dataGroup1,DETH061_NOx_dataGroup1,DETH072_NOx_dataGroup1,DETH083_NOx_dataGroup1,DETH091_NOx_dataGroup1,DETH093_NOx_dataGroup1,DETH095_NOx_dataGroup1
count,8784.0,8784.0,8784.0,8784.0,8784.0,8784.0,8784.0,8784.0,8784.0,8784.0,...,8784.0,8784.0,8784.0,8784.0,8784.0,8784.0,8784.0,8784.0,8784.0,8784.0
mean,43.896763,40.238199,46.10014,35.718489,40.19125,37.712905,52.681432,23.858329,10.888974,14.562116,...,22.183709,1.380379,91.651819,27.095028,-12.45435,75.664406,47.300504,60.138941,12.090326,24.111151
std,30.773361,50.174665,58.471116,39.468144,29.991761,29.648842,66.185304,98.132662,85.650998,78.668791,...,64.029171,73.688854,107.31492,74.223164,135.40799,100.568808,78.42045,151.47413,142.454843,69.056018
min,0.19,-999.0,-999.0,-999.0,0.85,0.45,-999.0,-999.0,-999.0,-999.0,...,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0,-999.0
25%,18.4375,18.085,27.0775,9.43,12.71,10.8,36.98,9.09,5.831979,8.993116,...,7.920671,1.9125,33.973596,6.824235,1.9125,19.969051,20.649628,20.951626,8.268646,8.278996
50%,42.235,40.845,47.78,31.965,37.055,33.995,55.915,28.8,10.978419,13.728892,...,14.229478,4.943564,66.0523,12.092738,4.314944,48.292873,37.752516,48.175444,14.643343,14.772819
75%,63.2425,60.7225,66.1025,57.08,61.195,57.795,72.84,51.1,20.749995,23.07227,...,29.71635,8.187625,123.019032,28.32697,7.221885,104.310432,64.733919,99.112829,30.998374,31.944511
max,181.83,164.15,169.77,161.42,174.42,175.71,170.86,131.62,232.7519,256.3176,...,304.121277,58.210953,850.619751,616.160645,79.711472,711.213379,453.616028,746.290649,520.957947,565.63385


Now we have **wide** data frame, containing timeseries of all pollutant concentrations for all sensors. The pollutant type and the sensor ID are encoded in column names. The minimal value of pollutant concentrations *-999.0* is equivalent to *NA* and will be imputted, as well as all negative values (the concentration can not be negative). The limit for imputation will be set to 876, i.e. *NA* sequences exceeding 10% of the year will not be imputted. Since the number of heavily corrupted columns is below 3%, they will be dropped in favor to the information quality:

In [4]:
dffAll[dffAll.loc[:, dffAll.columns != 'observation_period'] < 0.0] = np.NaN
dffAll.interpolate(method='linear', inplace=True, axis=0, limit=876, limit_direction='both')
print('The number of corrupted columns is ', len(dffAll.isna().sum().nonzero()[0]), ' of ', len(dffAll.columns))
dffAll = dffAll.dropna(axis=1)
dffAll['observation_period']=pd.to_datetime(dffAll['observation_period'])
dffAll['observation_period']=dffAll['observation_period'].dt.to_period('H')
#dffAll['observation_period'][0].end_time
dffAll.tail(3)

  app.launch_new_instance()


The number of corrupted columns is  10  of  526




Unnamed: 0,observation_period,DESL002_O3_dataGroup1,DESL003_O3_dataGroup1,DESL011_O3_dataGroup1,DESL012_O3_dataGroup1,DESL017_O3_dataGroup1,DESL018_O3_dataGroup1,DESL019_O3_dataGroup1,DESL020_O3_dataGroup1,DEST002_NOx_dataGroup1,...,DETH041_NOx_dataGroup1,DETH042_NOx_dataGroup1,DETH043_NOx_dataGroup1,DETH060_NOx_dataGroup1,DETH061_NOx_dataGroup1,DETH072_NOx_dataGroup1,DETH083_NOx_dataGroup1,DETH091_NOx_dataGroup1,DETH093_NOx_dataGroup1,DETH095_NOx_dataGroup1
8781,2016-12-31 21:00,1.64,5.08,2.9,4.0,4.19,2.42,25.44,2.72,15.716239,...,77.027954,30.488693,90.341911,60.260674,16.560911,57.726231,132.491394,96.647903,142.340775,113.357315
8782,2016-12-31 22:00,2.02,10.67,3.1,3.91,4.29,2.35,28.11,2.41,14.546514,...,80.224121,31.68372,72.076004,61.261394,17.742168,67.070229,111.189499,100.438667,99.762787,88.717621
8783,2016-12-31 23:00,3.32,5.73,4.13,4.25,4.25,2.88,23.48,6.45,9.365071,...,71.562309,33.108055,79.259933,76.318123,18.049219,74.662857,101.533669,58.314323,52.971664,59.788193


### Saving Air Pollution DataFrame to COS
Now we can save the resulting dataset for further use:


In [6]:
# The code was removed by Watson Studio for sharing.

### Sensor Locations
In order to use the spatial data one should have coordinates of air pollution measurements sensors.
For the current study the county index for every individual sensor is needed. First all measurement stations IDs and the town names of the sensors locations are read to **SensorLocation** dataframe:

In [5]:
# pick all tags from the XML file
Etree = etree.parse("Capstone.rawData/DE_D_allInOne_metaMeasurements_2016.xml")
Eroot = Etree.getroot()
Eroot.tag
Eroot.attrib
AllTags = [elem.tag for elem in Eroot.iter()]

# get correct tag names for 'municipality', 'EUStationCode' and 'featureMember':
varMUN = [s for s in AllTags if 'municipality' in s][0]
varID  = [s for s in AllTags if 'EUStationCode' in s][0]
varFeatMem = [s for s in AllTags if 'featureMember' in s][0]

IDs=[]
MUNs=[]
# read 'municipality' and 'EUStationCode' to SensorLocation dataframe:
for varr in Eroot.iter(varFeatMem):
    for child in varr.iter(varMUN):
        MUNs.append(child.text)
        for child2 in varr.iter(varID):
            IDs.append(child2.text)
SensorLocation=pd.DataFrame({'SensorID': IDs, 'SensorTown': MUNs})
SensorLocation.tail(5)

Unnamed: 0,SensorID,SensorTown
762,DEUB005,Lüder
763,DEUB028,Zingst
764,DEUB029,Suhl
765,DEUB030,Stechlin
766,DEUB044,Garmisch-Partenkirchen


In order to map town names to county names, used in the health related datasets, the town-county table **dfCT** will be created. It contains 5-digit county-id (not unique, but characterizing counties in some vicinity), name of town and county: 

In [6]:
columns = [(10, 15), (22, 71), (72, 121)]
dfCT = pd.read_fwf("Capstone.rawData/GV100AD3107/GV100AD_310719.ASC", 
                     colspecs=columns, names=['CountyID','town','county'],
                     encoding="iso8859_1")
dfCT=dfCT.fillna(method='ffill')

dfCT['town'] = dfCT['town'].str.split(",").str[0]
dfCT.tail(5)

Unnamed: 0,CountyID,town,county
16116,16077,Starkenberg,Schmölln/Thür.
16117,16077,Thonhausen,Schmölln/Thür.
16118,16077,Treben,Schmölln/Thür.
16119,16077,Vollmershain,Schmölln/Thür.
16120,16077,Windischleuba,Schmölln/Thür.


### Prevalence of Asthma bronchiale
The central data frame of the model will contain list of counties, prevalence of disease(s) in this counties, and the set of air-pollution-based features. Let's load the *Prevalence of Asthma bronchiale* dataset: 

In [7]:
xlsx_file = pd.ExcelFile("Capstone.rawData/Asthma_2016/data_id_92_kreis11_1_j_1451606400.xlsx")
print("xls sheet names: ",xlsx_file.sheet_names)
dfAsthma = xlsx_file.parse('Daten', header=3, decimal=",") 
print(dfAsthma.head(3))
print("Number of duplicates in Regions-ID column: ", dfAsthma.duplicated(['Regions-ID']).sum())

xls sheet names:  ['Hintergrundinformationen', 'Daten']
      Region  Regions-ID  KV             Kreistyp  Wert  Bundeswert
0   Eisenach       16056  TH    Ländliches Umland   8.9         5.7
1  Sonneberg       16072  TH      Ländlicher Raum   8.7         5.7
2  Ammerland        3451  NI  Verdichtetes Umland   8.5         5.7
Number of duplicates in Regions-ID column:  0


In [8]:
dfAsthma = dfAsthma.drop(['Region', 'KV', 'Kreistyp', 'Bundeswert'], axis=1)
dfAsthma.columns=['CountyID','DiseaseR']
dfAsthma.head(5)

Unnamed: 0,CountyID,DiseaseR
0,16056,8.9
1,16072,8.7
2,3451,8.5
3,16073,8.3
4,3151,8.2


The mapping will start from setting the **CountyID** to every **sensorID** in the **SensorLocation** dataframe:


In [9]:
SensorLocation = (SensorLocation.join(dfCT[['CountyID','town']].set_index('town'),
                                      on='SensorTown')).drop_duplicates(subset=['SensorID'])

Checking the resulting table it was found, that 23 of 767 entries have not resolved **CountyID**:

In [10]:
print("Total number of sensors: ", SensorLocation.count())
print("Number of sensors with unresolved CountyID: ", SensorLocation[SensorLocation.isna().any(axis=1)].count())
#print("List of unresolved sensors:")
#SensorLocation[SensorLocation.isna().any(axis=1)]
#print("Number of duplicates in SensorID column: ", SensorLocation.duplicated(['SensorID']).sum())
#SensorLocation.loc[SensorLocation.duplicated(['SensorID'])==True]

Total number of sensors:  SensorID      767
SensorTown    767
CountyID      744
dtype: int64
Number of sensors with unresolved CountyID:  SensorID      23
SensorTown    23
CountyID       0
dtype: int64


At the moment it is easier to drop these 3% of sensor's data. Otherwise this table could be corrected manually, since it has reasonable size, and it's contents (sensor lables/county codes) hardly changes in time. 

In [11]:
SensorLocation=SensorLocation.dropna()
SensorLocation=SensorLocation.astype({'CountyID':int})
SensorLocation.head(5)

Unnamed: 0,SensorID,SensorTown,CountyID
0,DEBB007,Elsterwerda,12062
1,DEBB021,Potsdam,12054
2,DEBB026,Spremberg,12071
3,DEBB029,Schwedt/Oder,12073
4,DEBB032,Eisenhüttenstadt,12067


### Saving Sensor Locations and Asthma bronchiale Prevalence DataFrames to COS
Now we can save the resulting dataset for further use:

In [12]:
SensorLocationSpark = spark.createDataFrame(SensorLocation)
SensorLocationSpark.write.parquet(cos.url('SensorLocation.parquet', 'capstone-donotdelete-pr-zpykcz8f0kxuad'))

AsthmaSpark = spark.createDataFrame(dfAsthma)
AsthmaSpark.write.parquet(cos.url('Asthma.parquet', 'capstone-donotdelete-pr-zpykcz8f0kxuad'))

NameError: name 'spark' is not defined

### Constructing "Long" DataFrame

In [12]:
dffAllLong = pd.melt(dffAll, id_vars=['observation_period'], var_name='SensorPollID', value_name='PollutantConc')
dffAllLong.head()

Unnamed: 0,observation_period,SensorPollID,PollutantConc
0,2016-01-01 00:00,DESL002_O3_dataGroup1,7.01
1,2016-01-01 01:00,DESL002_O3_dataGroup1,6.53
2,2016-01-01 02:00,DESL002_O3_dataGroup1,6.94
3,2016-01-01 03:00,DESL002_O3_dataGroup1,19.91
4,2016-01-01 04:00,DESL002_O3_dataGroup1,30.75


In [13]:
import gc
del dffAll
del dff

gc.collect()

SensorCountyDict = dict(zip(SensorLocation.SensorID, SensorLocation.CountyID))

In [14]:
#dffAllLong['CountyID'] = dffAllLong.apply(lambda row: re.search('(^.{7})', row['SensorPollID']).group(1), axis=1)
ColumnCountyID = pd.DataFrame()
ColumnSensorID = pd.DataFrame()
ColumnSensorID['SensorID'] = dffAllLong.apply(lambda row: re.search('(^.{7})', row['SensorPollID']).group(1), axis=1)

In [15]:
print("Memory usage: ", (ColumnSensorID.memory_usage(index=True).sum()/1048576.0), " MB")

Memory usage:  34.51362609863281  MB


In [16]:
#ColumnCountyID.replace({'CountyID': SensorCountyDict})
ColumnCountyID['CountyID'] = ColumnSensorID['SensorID'].map(SensorCountyDict)
#.dropna().astype('int64')

In [17]:
#dffAllLong.replace({'CountyID': SensorCountyDict})
ColumnCountyID.head()

Unnamed: 0,CountyID
0,10045.0
1,10045.0
2,10045.0
3,10045.0
4,10045.0


In [18]:
ColumnCountyID.count()

CountyID    4453488
dtype: int64

In [19]:
ColumnCountyID.isna().sum()

CountyID    70272
dtype: int64

In [20]:
dffAllLong['Pollutant'] = dffAllLong.apply(lambda row: re.search('^.{8}(.*)_', row['SensorPollID']).group(1), axis=1)

In [21]:
dffAllLong['CountyID'] = ColumnCountyID['CountyID']

In [22]:
dffAllLong = dffAllLong.dropna().drop(['observation_period','SensorPollID'], axis=1)

In [23]:
dffAllLong.iloc[888555]

PollutantConc    22.141
Pollutant            NO
CountyID          14626
Name: 906123, dtype: object

In [26]:
dffAllLong['CountyID'] = dffAllLong['CountyID'].astype('int64')
dffAllLong.tail()

Unnamed: 0,PollutantConc,Pollutant,CountyID
4523755,91.521347,NOx,16064
4523756,68.096954,NOx,16064
4523757,113.357315,NOx,16064
4523758,88.717621,NOx,16064
4523759,59.788193,NOx,16064


In [28]:
print("Memory usage: ", (dffAllLong.memory_usage(index=True).sum()/1048576.0), " MB")

Memory usage:  135.90966796875  MB


In [22]:
SensorLocation.loc[SensorLocation['SensorID']=='DESL002']

Unnamed: 0,SensorID,SensorTown,CountyID
644,DESL002,Bexbach,10045


### Saving "Long" DataFrame to COS


In [None]:
dffAllLongSpark = spark.createDataFrame(dffAllLong)
dffAllLongSpark.write.parquet(cos.url('dffAllLong.parquet', 'capstone-donotdelete-pr-zpykcz8f0kxuad'))