#### Import some required libraries

In [38]:
import pandas as pd
import zipfile
import numpy as np

#### Read in the source data

In [39]:
df0 = pd.read_csv('../data/archive.zip', compression='zip')
df0.head()

Unnamed: 0.1,Unnamed: 0,State Code,County Code,Site Num,Address,State,County,City,Date Local,NO2 Units,...,SO2 Units,SO2 Mean,SO2 1st Max Value,SO2 1st Max Hour,SO2 AQI,CO Units,CO Mean,CO 1st Max Value,CO 1st Max Hour,CO AQI
0,0,4,13,3002,1645 E ROOSEVELT ST-CENTRAL PHOENIX STN,Arizona,Maricopa,Phoenix,2000-01-01,Parts per billion,...,Parts per billion,3.0,9.0,21,13.0,Parts per million,1.145833,4.2,21,
1,1,4,13,3002,1645 E ROOSEVELT ST-CENTRAL PHOENIX STN,Arizona,Maricopa,Phoenix,2000-01-01,Parts per billion,...,Parts per billion,3.0,9.0,21,13.0,Parts per million,0.878947,2.2,23,25.0
2,2,4,13,3002,1645 E ROOSEVELT ST-CENTRAL PHOENIX STN,Arizona,Maricopa,Phoenix,2000-01-01,Parts per billion,...,Parts per billion,2.975,6.6,23,,Parts per million,1.145833,4.2,21,
3,3,4,13,3002,1645 E ROOSEVELT ST-CENTRAL PHOENIX STN,Arizona,Maricopa,Phoenix,2000-01-01,Parts per billion,...,Parts per billion,2.975,6.6,23,,Parts per million,0.878947,2.2,23,25.0
4,4,4,13,3002,1645 E ROOSEVELT ST-CENTRAL PHOENIX STN,Arizona,Maricopa,Phoenix,2000-01-02,Parts per billion,...,Parts per billion,1.958333,3.0,22,4.0,Parts per million,0.85,1.6,23,


#### Drop some redundant columns
Also parse the date

In [40]:
df1 = df0.drop(columns=['Unnamed: 0', 'State Code', 'County Code', 'Site Num', 'NO2 Units', 'O3 Units', 'SO2 Units', 'CO Units'])
df1['Date Local'] = pd.to_datetime(df1['Date Local'])
df1.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1746661 entries, 0 to 1746660
Data columns (total 21 columns):
 #   Column             Dtype         
---  ------             -----         
 0   Address            object        
 1   State              object        
 2   County             object        
 3   City               object        
 4   Date Local         datetime64[ns]
 5   NO2 Mean           float64       
 6   NO2 1st Max Value  float64       
 7   NO2 1st Max Hour   int64         
 8   NO2 AQI            int64         
 9   O3 Mean            float64       
 10  O3 1st Max Value   float64       
 11  O3 1st Max Hour    int64         
 12  O3 AQI             int64         
 13  SO2 Mean           float64       
 14  SO2 1st Max Value  float64       
 15  SO2 1st Max Hour   int64         
 16  SO2 AQI            float64       
 17  CO Mean            float64       
 18  CO 1st Max Value   float64       
 19  CO 1st Max Hour    int64         
 20  CO AQI             float

In [None]:
n = 0

class MeasureAggregator:
    def __init__(self, measure_column, hour_column=None):
        self.measure = measure_column
        self.hour_column = hour_column
        self.worst_value = None
        self.worst_hour = None

    def reset(self):
        self.worst_value = None
        self.worst_hour = None

    def consider(self, row):
        value = row[self.measure]
        if not np.isnan(value):
            if self.worst_value is None or value > self.worst_value:
                self.worst_value = value
                self.worst_row = row
                if self.hour_column is not None:
                    hour = row[self.hour_column]
                    self.worst_hour = hour

    def update(self, dict):
        if self.worst_value is not None:
            dict[self.measure] = self.worst_value
        else:
            dict[self.measure] = np.nan
        if self.hour_column is not None:
            dict[self.hour_column] = self.worst_hour


measures = [
    MeasureAggregator('NO2 Mean'),
    MeasureAggregator('NO2 1st Max Value', 'NO2 1st Max Hour'),
    MeasureAggregator('NO2 AQI'),
    MeasureAggregator('O3 Mean'),
    MeasureAggregator('O3 1st Max Value', 'O3 1st Max Hour'),
    MeasureAggregator('O3 AQI'),
    MeasureAggregator('SO2 Mean'),
    MeasureAggregator('SO2 1st Max Value', 'SO2 1st Max Hour'),
    MeasureAggregator('SO2 AQI'),
    MeasureAggregator('CO Mean'),
    MeasureAggregator('CO 1st Max Value', 'CO 1st Max Hour'),
    MeasureAggregator('CO AQI')
]


def aggregate_by_worst_reading(group):
    global n  # Show progress as this process can be slow
    n += 1
    if n % 10000 == 0:
        print(n)

    for measure in measures:
        measure.reset()
    out = {}
    # Copy the non-aggregated fields from the first row in the group
    out['Address'] = group.iloc[0]['Address']
    out['State'] = group.iloc[0]['State']
    out['County'] = group.iloc[0]['County']
    out['City'] = group.iloc[0]['City']
    out['Date Local'] = group.iloc[0]['Date Local']
    for _, row in group.iterrows():
        for measure in measures:
            measure.consider(row)
    for measure in measures:
        measure.update(out)
    return pd.Series(out)


In [None]:
df2 = df1.groupby(['Address', 'Date Local']).apply(aggregate_by_worst_reading, include_groups=True)

In [None]:
df2.to_csv('../data/cleaned_pollution_data.zip', index=False, compression='zip')