# 6 - Pipelines

In [101]:
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from pipeline_functions import *

### DC Dataset

In [102]:
dc_hour = pd.read_csv('dc-data/hour.csv')
dc_day = pd.read_csv('dc-data/day.csv')

Starting with renaming the columns so that functions based on column name will be reusable.

In [103]:
class RenameColDC(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.name_map = {'dteday':'date','yr':'year','mnth':'month','hr':'hour',
        'weathersit':'weather','hum':'humidity','cnt':'count'}
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X = X.rename(columns=self.name_map)
        
        return X

Fill in hours. Because the `date` columns have different contents (the DC set has no times), each dataset needs its own version of this operation. 

In [104]:
class FillHoursDC(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        return fill_hours_dc(X)

Filling values that are identical within each day.

In [105]:
class FillWMeans(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        if 'season' in X.columns and 'date' in X.columns:
            X.loc[:,'season'] = X.groupby('date')['season'].transform(
                lambda x: x.fillna(x.mean()))
            
        if 'year' in X.columns and 'date' in X.columns:
            X.loc[:,'year'] = X.groupby('date')['year'].transform(
                lambda x: x.fillna(x.mean()))
            
        if 'month' in X.columns and 'date' in X.columns:
            X.loc[:,'month'] = X.groupby('date')['month'].transform(
                lambda x: x.fillna(x.mean()))
            
        if 'holiday' in X.columns and 'date' in X.columns:
            X.loc[:,'holiday'] = X.groupby('date')['holiday'].transform(
                lambda x: x.fillna(x.mean()))
            
        if 'weekday' in X.columns and 'date' in X.columns:
            X.loc[:,'weekday'] = X.groupby('date')['weekday'].transform(
                lambda x: x.fillna(x.mean()))
            
        if 'workingday' in X.columns and 'date' in X.columns:
            X.loc[:,'workingday'] = X.groupby('date')['workingday'].transform(
                lambda x: x.fillna(x.mean()))
            
        return X

Filling values that move throughout the day.

In [106]:
class FillWInterpolate(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X.loc[:,'temp'] = X['temp'].interpolate(method='linear', limit_direction='both').round(2)
        X.loc[:,'atemp'] = X['atemp'].interpolate(method='linear', limit_direction='both').round(4)
        X.loc[:,'humidity'] = X['humidity'].interpolate(method='linear', limit_direction='both').round(2)
        X.loc[:,'windspeed'] = X['windspeed'].interpolate(method='linear', limit_direction='both').round(4)

        return X

Convert `date` column to `datetime` for compatibility with later functions.

In [107]:
class DateTimeConverter(BaseEstimator, TransformerMixin):
    def __init__(self, column_name):
        self.column_name = column_name

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        # Convert the specified column to datetime
        X[self.column_name] = pd.to_datetime(X[self.column_name], errors='coerce')
        return X

Merge in `weather` column from `dc_day` table. (Info about the table being merged must be specified within the pipeline.)

In [108]:
class MergeDataDC(BaseEstimator, TransformerMixin):
    def __init__(self, merge_df, on_column='date', how='left'):
        self.merge_df = merge_df
        self.merge_df.rename(columns={'dteday': 'date', 'weathersit': 'weather'}, inplace=True)
        self.on_column = on_column
        self.how = how

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        # Convert the 'dteday' column in both DataFrames to datetime if not already
        X[self.on_column] = pd.to_datetime(X[self.on_column], errors='coerce')
        self.merge_df[self.on_column] = pd.to_datetime(self.merge_df[self.on_column], errors='coerce')
        
        # Perform the merge
        X = X.merge(self.merge_df[[self.on_column, 'weather']], on=self.on_column, how=self.how)
        return X

Fill in missing `weather` values.

In [109]:
class FillWeatherValueDC(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X['weather_x'] = X['weather_x'].fillna(X['weather_y'])
        return X

Create `precip` column. Because the merge renames the `weather` column, this function is not reusable. (Also renames `weather_x` back to 'weather'.)

In [110]:
class PrecipMappingDC(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.precip_map = {1: 0, 2: 0, 3: 1, 4: 1}
        self.name_map = {'weather_x': 'weather'}

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X['precip'] = X['weather_x'].map(self.precip_map)
        X = X.rename(columns=self.name_map)
        return X

Get `day` out of `date`.

In [111]:
class GetDay(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X['day'] = X['date'].apply(lambda x: get_day(x))
        return X

Set non-float columns to `int`, reorder columns the way I like them.

In [112]:
class SetIntReorder(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.col_list = ['season','year','month','holiday','weekday','workingday','weather','count']
        self.col_order = ['date','season','year','month','day','hour','weekday','holiday',
            'workingday','weather','precip','temp','atemp','humidity','windspeed','count']
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X[self.col_list] = X[self.col_list].astype(int)
        X = X[self.col_order]

        return X

Set `date` as the index.

In [113]:
class SetIndexDate(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X = X.set_index('date')
        return X

Assemble the pipeline:

In [114]:
dc_pipe = Pipeline([
('rename_DC', RenameColDC()),
('fill_hours', FillHoursDC()),
('fill_w_means', FillWMeans()),
('fill_interpolate', FillWInterpolate()),
('datetime_convert', DateTimeConverter('date')),
('merge_data', MergeDataDC(dc_day, on_column='date', how='left')),
('fill_weather', FillWeatherValueDC()),
('precip_mapping', PrecipMappingDC()),
('get_day', GetDay()),
('set_int_reorder', SetIntReorder()),
('set_index_date', SetIndexDate()),])

In [115]:
dc_data = dc_pipe.fit_transform(dc_hour)

In [116]:
display(dc_data.shape)
dc_data.head()

(17544, 15)

Unnamed: 0_level_0,season,year,month,day,hour,weekday,holiday,workingday,weather,precip,temp,atemp,humidity,windspeed,count
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
2011-01-01,1,0,1,1,0,6,0,0,1,0,0.24,0.2879,0.81,0.0,16
2011-01-01,1,0,1,1,1,6,0,0,1,0,0.22,0.2727,0.8,0.0,40
2011-01-01,1,0,1,1,2,6,0,0,1,0,0.22,0.2727,0.8,0.0,32
2011-01-01,1,0,1,1,3,6,0,0,1,0,0.24,0.2879,0.75,0.0,13
2011-01-01,1,0,1,1,4,6,0,0,1,0,0.24,0.2879,0.75,0.0,1


### London Dataset

In [117]:
london = pd.read_csv('london-data/london_merged.csv')

Start with renaming the columns this time as well.

In [118]:
class RenameColLond(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.name_map = {'cnt':'count','t1':'temp','t2':'atemp','hum':'humidity',
        'wind_speed':'windspeed','weather_code':'weather','is_holiday':'holiday',
        'timestamp':'date'}

    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X = X.rename(columns=self.name_map)
        
        return X

Create ` workingday` column. It will end up with missing values when missing hours are filled, but it is already part of the `FillWMeans` function.

In [119]:
class MakeWorkingDayLond(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X['workingday'] = 1
        
        # Assign 0 in appropriate rows based on holiday or weekend conditions
        X.loc[((X['holiday'] == 1.0) | (X['is_weekend'] == 1.0)), 'workingday'] = 0
        
        return X

To fill any missing hours, there first needs to be an `hour` column.

In [120]:
class GetHour(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X['hour'] = X['date'].apply(lambda x: get_hour(x))
        return X

Now, fill in the missing hours. This function is different than the one for the DC table. 

In [121]:
class FillHoursLond(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        return fill_hours_london(X)

With all rows filled in, get all remaining time-related values out of `date` column. First use the previous function:
>Getday

Then get `year`, `month`, and `weekday`.

In [122]:
class GetYr_Mn_Wkdy(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X['year'] = X['date'].apply(lambda x: get_year(x))
        X['month'] = X['date'].apply(lambda x: get_month(x))
        X['weekday'] = X['date'].apply(lambda x: get_weekday(x))

        return X

Filling values that are identical within each day. Can reuse the previous function:<br>
>FillWMeans

Any columns who's values could not be filled by above function have no data at all for that day, and need to be dropped. 

In [123]:
class DropNaSubsetLond(BaseEstimator, TransformerMixin):
    def __init__(self, subset):
        self.subset = subset

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X_transformed = X.dropna(subset=self.subset)
        return X_transformed

Filling values that move throughout the day. Can reuse the previous function:<br>
>FillWInterpolate

Use `ForwardFill` for missing weather values, as there is no secondary table to draw from. 

In [124]:
class ForwardFillWeatherLond(BaseEstimator, TransformerMixin):
        def fit(self, X, y=None):
            return self
        
        def transform(self, X):
            X.loc[:, 'weather'] = X['weather'].ffill()
            return X

Perform value mappings; convert `season` and `weekday` to match DC set's values. Reduce number of `weather` values, matching based on descriptions, then create `precip` column. Assign numbers 0-2 for values in `year`.

In [125]:
class ValueMapTransformerLond(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.seas_map= {0:1,1:2,2:3,3:4}
        self.weather_map = {1:1,2:1,3:2,4:2,7:3,10:4,26:4,94:4}
        self.day_map = {6:0,0:1,1:2,2:3,3:4,4:5,5:6}
        self.year_map = {2015:0,2016:1,2017:2}
        self.precip_map = {1:0,2:0,3:1,4:1}
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X['season'] = X['season'].map(self.seas_map)
        X['weather'] = X['weather'].map(self.weather_map)
        X.loc[:,'weekday'] = X['weekday'].map(self.day_map)
        X.loc[:,'year'] = X['year'].map(self.year_map)
        X['precip'] = X['weather'].map(self.precip_map)

        return X

Set non-float columns to `int`, reorder columns the way I like them. Can reuse the previous function:<br>
>SetIntReorder

Set `date` as the index. Can reuse the previous function:<br>
>SetIndexDate

Assemble the pipeline:

In [126]:
l_pipe = Pipeline([
('rename_lond', RenameColLond()),
('workingday_col', MakeWorkingDayLond()),
('datetime_converter', DateTimeConverter('date')),
('get_hour', GetHour()),
('fill_hours', FillHoursLond()),
('get_day', GetDay()),
('get_time_values', GetYr_Mn_Wkdy()),
('fill_w_means', FillWMeans()),
('dropna', DropNaSubsetLond(subset=['season', 'workingday', 'holiday'])),
('fill_interpolate', FillWInterpolate()),
('fill_weather', ForwardFillWeatherLond()),
('value_mapping', ValueMapTransformerLond()),
('set_int_reorder', SetIntReorder()),
('set_index_date', SetIndexDate())
])

In [127]:
london_data = l_pipe.fit_transform(london)

In [128]:
display(london_data.shape)
london_data.head()

(17414, 15)

Unnamed: 0_level_0,season,year,month,day,hour,weekday,holiday,workingday,weather,precip,temp,atemp,humidity,windspeed,count
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
2015-01-04 00:00:00,4,0,1,4,0,0,0,0,2,0,3.0,2.0,93.0,6.0,182
2015-01-04 01:00:00,4,0,1,4,1,0,0,0,1,0,3.0,2.5,93.0,5.0,138
2015-01-04 02:00:00,4,0,1,4,2,0,0,0,1,0,2.5,2.5,96.5,0.0,134
2015-01-04 03:00:00,4,0,1,4,3,0,0,0,1,0,2.0,2.0,100.0,0.0,72
2015-01-04 04:00:00,4,0,1,4,4,0,0,0,1,0,2.0,0.0,93.0,6.5,47
