In [5]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder,OneHotEncoder
import warnings
import featuretools as ft
from sklearn.base import BaseEstimator, TransformerMixin, RegressorMixin, clone
from datetime import datetime
import datetime
from scipy.special import boxcox1p
def ignore_warn(*args, **kwargs):
    pass
warnings.warn = ignore_warn #ignore annoying warning (from sklearn and seaborn)
from scipy import stats
from scipy.stats import norm, skew #for some statistics
pd.set_option('display.float_format', lambda x: '{:.3f}'.format(x)) #Limiting floats output to 3 decimal points

In [6]:
def read_csv_series(path, ts_column="time"):
    """
    Read a time series from a CSV file.

    The CSV file must contain a column with either a UNIX timestamp or a datetime
    string with any format supported by Pandas. 

    Parameters
    -----
    path : path to CSV file

    ts_column : name of the column containing time data, "time" by default

    Returns
    -----
    ret : Pandas Series object with datetime as index
    """

    # read CSV
    df = pd.read_csv(path, parse_dates=[ts_column])
    # convert timestamps to datetime objects using panda's to_datetime
    df[ts_column] = pd.to_datetime(df[ts_column], unit="s")
    # set datetime as index (make time series)
    df.index = df[ts_column]
    # delete original time column
    del df[ts_column]

    # remove rows with duplicated time if there are any, keep first duplicate row
    df = df.loc[~df.index.duplicated(keep="first")]

    df.index.name = ts_column

    return df


def describe_series(df):
    """
    Show basic information about a Pandas Series or DataFrame

    Parameters
    -----
    df : a Pandas Series or DataFrame object
    """

    print("Head:")
    print(df.head())
    print("Stats:")
    print(df.describe())
    print("Count:")
    print(df.count())
    print("Columns: {}".format(df.columns))

    print("Start of time: {}".format(str(df.index[0])))
    print("End of time: {}".format(str(df.index[-1])))


def make_directory_tree(tree, output_dir):
    """
    Create the output directory tree structure specified by `tree` in `output_dir`

    Parameters
    -----
    tree : list of paths to create under `output_dir`

    output_dir : path to root of output directory tree
    """

    for d in tree:
        try:
            path = os.path.join(output_dir, d)
            os.makedirs(path, exist_ok=True)
        except OSError as e:
            if e.errno == errno.EEXIST and os.path.isdir(path):
                print("Path already exists: {}".format(d))
                print("Files may be overwritten")
                continue
            else:
                raise


def get_datasets(data_dir):
    """
    Get all .csv filenames from the specified directory

    Parameters
    -----
    data_dir : path to directory containing .csv files

    Returns
    -----
    ret : list containing dataset filenames
    """

    return [f for f in os.listdir(data_dir) if os.path.isfile(
        os.path.join(data_dir, f)) and f.endswith(".csv")]

def preprocess_report(path, outpath):
    """
    Preprocess a report
    
    Parameters
    -----
    path : path to input dataset

    outpath : path at which to save the processed dataset

    Returns
    -----
    ret : processed pollution series
    """

    print("Processing {}".format(path))

    # read CSV
    df = pd.read_csv(path, parse_dates=['time'])
    # convert timestamps to datetime objects using panda's to_datetime
    # Baseline pollution preprocess

    # convert series values to numeric type (np.float64 by default)
    # place NaN values in rows with invalid format

    objects = [col for col in df.columns if df[col].dtype == "object"]
    objects.remove('time')
    
    for col in objects:
        for row_no in range(df.shape[0]):
            if not pd.isnull(df.loc[row_no, col]):
                df.loc[row_no, col] = rreplace(df.loc[row_no, col])
                
    # if data is missing for more than 1 hour or > 1000, remove those rows
    # also set negative measurements to np.nan
    
    for col in objects:
        df[col]= pd.to_numeric(df[col],errors='coerce')
    
    weather_cat_cols=["precipType","summary"]
    
    #Baseline weather preprocess
    df = df[1:].drop(["icon"], axis=1)
    df = df.fillna({"precipType": "no precip",
                    "precipAccumulation": 0, "cloudCover": 0})
    df["pressure"].interpolate(inplace=True, limit=3)

    # check for columns with discrete values, one-hot encode them
    categorical_column_names = weather_cat_cols
    # cannot one-hot encode NaN values
    for column_name in categorical_column_names:
        one_hot = pd.get_dummies(df[column_name])
        df = df.join(one_hot)

    df = df.drop(categorical_column_names, axis=1)
    
    
    ts_column = 'time'
    df[ts_column] = pd.to_datetime(df[ts_column], unit="s")
    # set datetime as index (make time series)
    df.index = df[ts_column]
    # delete original time column
    del df[ts_column]

    # remove rows with duplicated time if there are any, keep first duplicate row
    df = df.loc[~df.index.duplicated(keep="first")]

    df.index.name = ts_column
    
    # create a new datetime index with hourly frequency
    idx = pd.date_range(start=df.index.min(), end=df.index.max(), freq='H')
    idx_name = df.index.name
    
    # reindex our series
    df = df.reindex(idx)
    df.index.name = idx_name
    
   
    # otherwise, interpolate
    df['PM10'].loc[(df['PM10'] > 1000) | (df['PM10'] < 0)] = np.nan
    df['PM10'] = df['PM10'].interpolate(method="linear", limit=1, limit_area="inside")
    df = df[df['PM10'].notna()]

    objects = [col for col in df.columns if df[col].dtype=="object"]
    objects.remove('time')
    
    target_col = "PM10"
    df[target_col] = np.log1p(df[target_col]) #Logarithming here
    target = df[target_col] #Logarithmed target var
    df['windBearing'].fillna(0,inplace=True) #according to dark sky api
    
    #Dropping apparent temperature
    df.drop("apparentTemperature",axis=1,inplace=True) #high correlation
    
    #Outlier detection and setting to nan to interpolate later
    
    columns = ["AQI","CO","CO2","NO2","O3","PM25","SO2","cloudCover","dewPoint","humidity",
              "ozone","precipAccumulation","precipIntensity","precipProbability","pressure","temperature","uvIndex",
              "visibility","windBearing","windGust","windSpeed"]
    
    for column in columns:
        pHigh = np.percentile(a, 98)
        pLow = np.percentile(a,2)
        df[column].loc[df[column] <pLow  or df[column] > pHigh]=np.nan
    
    #Missing data analysis
    total = df.isnull().sum().sort_values(ascending=False)
    percent = (df.isnull().sum()/df.isnull().count()).sort_values(ascending=False)
    missing_data = pd.concat([total, percent], axis=1, keys=['Total', 'Percent'])
    missing_data = missing_data[missing_data['Total']>0]
    
    # Dropping all features with missing data > 50%
    missing_data_50_cols = list(missing_data[missing_data['Percent']>0.5].index.values)
    df.drop(columns=missing_data_50_cols,axis=1,inplace=True)
    
    #Dropping rows with >75% of missing data
    thresh = int((len(df.columns)*75)/100)
    df.dropna(thresh=thresh,inplace=True)
    
    #Removing cols which were discarded
    weather_cols = list(set(weather_cols)-set(missing_data_50_cols))
    pollution_cols = list(set(pollution_cols)-set(missing_data_50_cols))
    
    #Imputing with interpolation
    df.index = pd.DatetimeIndex(df['time'])
    
    for weather_col in weather_cols:
        df[weather_col].interpolate(inplace=True, limit=4,method='time',limit_direction='both') # adapt limit here
        
    for pollution_col in pollution_cols:
        df[pollution_col].interpolate(inplace=True, limit=8,method='time',limit_direction='both') # adapt limit here
    
    #Next impute other missing values with aggregating functions, such as getting the median of the same day in the 
    #same month of each year
    unique_days = list(set(list(map(lambda x: x[5:10], df['time'].values))))
    pat = '|'.join(unique_hours)
    s = df['time'].str.extract('('+ pat + ')', expand=False)
    
    def transform_func(row,col,median):
        if np.isnan(row[col]):
            row[col] = median[row['time'][5:10]]
        return row
    
    for weather_col in weather_cols:
        median_values = df.groupby(s)[weather_col].median()
        df = df.apply(transform_func,axis=1,col=weather_col,median=median_values)
    
    #Next drop AQI, because we will generate new ones 
    df.drop(['AQI'], axis=1,inplace=True)
    
    #Impute pollution features with aggregating functions as above
    for pollution_col in pollution_cols:
        median_values = df.groupby(s)[pollution_col].median()
        df = df.apply(transform_func,axis=1,col=pollution_col,median=median_values)
        
    #Finish imputation
    total = df.isnull().sum().sort_values(ascending=False)
    percent = (df.isnull().sum()/df.isnull().count()).sort_values(ascending=False)
    missing_data = pd.concat([total, percent], axis=1, keys=['Total', 'Percent'])
    missing_data_cols = list(missing_data[missing_data['Total']>0].index.values)
    weather_cols = list(set(weather_cols).intersection(set(missing_data_cols)))
    pollution_cols = list(set(pollution_cols).intersection(set(missing_data_cols)))
    
    for weather_col in weather_cols:
        df[weather_col].interpolate(inplace=True, limit=4,method='time',limit_direction='both') # adapt limit here
        
    for pollution_col in pollution_cols:
        df[pollution_col].interpolate(inplace=True, limit=8,method='time',limit_direction='both') # adapt limit here
    
    for missing_col in missing_data_cols:
        df[missing_col].fillna(df[missing_col].median(),inplace=True)
    
    # FEATURE GENERATION
    
    def mapFunckijaPM25(row):
        row['AQI_PM25'] = df[row.name - datetime.timedelta(days=1) : row.name]['PM25'].mean()
        return row
    df = df.apply(mapFunckijaPM25,axis=1)

    def mapFunckijaPM10(row):
        row['AQI_PM10'] = df[row.name - datetime.timedelta(days=1) : row.name]['PM10'].mean()
        return row
    df = df.apply(mapFunckijaPM10,axis=1)

    def mapFunckijaO3(row):
        row['AQI_O3'] = df[row.name - datetime.timedelta(days=1) : row.name]['O3'].mean()
        return row
    #return train['PM25'][datetime.datetime[]]
    df = df.apply(mapFunckijaO3,axis=1)
    
    def mapFunkcijaPM10history(row):
        row['PM10_history'] = df[row.name-datetime.timedelta(hours=3) : row.name-datetime.timedelta(hours=1)]['PM10'].mean()
        return row
    df = df.apply(mapFunkcijaPM10history,axis=1)

    target_cols = ['O3', 'PM10', 'PM25', 'cloudCover', 'dewPoint', 'humidity',
       'precipAccumulation', 'precipIntensity', 'precipProbability',
       'pressure', 'temperature', 'uvIndex', 'visibility', 'windBearing',
       'windSpeed','AQI_PM25', 'AQI_PM10', 'AQI_O3','PM10_history']
    def mapFunkcijaDelta(row):
        for col in target_cols:
            row[col+"_delta"] = df[row.name:row.name][col] - df[row.name-datetime.timedelta(hours=1)
                                                                     :row.name-datetime.timedelta(hours=1)][col]
        return row
    df.iloc[1:,:] = df.iloc[1:,:].apply(mapFunkcijaDelta,axis=1)
    
    def mapFunkcijaMesecAndWeekNum(row):
        row.name = pd.Timestamp(row.name) #converting to timestamp
        row['month'] = row.name.month
        row['week_no'] = row.name.week            
        return row
    df = df.apply(mapFunkcijaMesec,axis=1)
    
    # SKEW ANALYSIS AND BOX COX TRANSFORMATION HERE
    numeric_feats = dt.dtypes[dt.dtypes != "object"].index

    # Check the skew of all numerical features
    skewed_feats = df[numeric_feats].apply(lambda x: skew(x.dropna())).sort_values(ascending=False)
    skewness = pd.DataFrame({'Skew' :skewed_feats})
    skewness = skewness[(skewness.Skew) > 0.75] #define thresh here, cv later
    skewed_features = skewness.index
    lam = 0.15
    for feat in skewed_features:
        df[feat] = boxcox1p(df[feat], lam)

    return df

def run_preprocessing(input_dir, output_dir):
    """
    Iterate over the input datasets and process them, generating new datasets in output directory.

    Parameters
    -----
    input_dir : path to directory containing raw data

    output_dir : output directory for storing processed data
    """

    make_directory_tree(["preprocessed"], output_dir)
    reports_dir = os.path.join(input_dir, "data")

    sorted_reports = sorted(get_datasets(reports_dir))

    for report in sorted_reports:
        loc = report.split('.')[0].split('_')[-1]
        print("Processing reports for {}".format(loc))
        df_report = preprocess_report(os.path.join(reports_dir, report), os.path.join(
            output_dir, "preprocessed", report))
        
        # drop columns with NaN for PM10
        df = df.dropna(subset=["PM10"])

        # save combined dataset
        pd.DataFrame(df).to_csv(os.path.join(
            output_dir, "preprocessedCSV", report))

In [None]:
#run_preprocessing("../data/raw", "../data/processed")