# Data Pre-processing

In [1]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

from sklearn.preprocessing import FunctionTransformer, OneHotEncoder, KBinsDiscretizer
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics.pairwise import rbf_kernel
from sklearn.mixture import GaussianMixture

from feature_engine.encoding import CountFrequencyEncoder
from feature_engine.creation import CyclicalFeatures

pd.options.mode.copy_on_write = True

In [2]:
# utility function
def casting(df, features): 
    df[features] = df[features].astype(int)

    return df

Define mappings that will be used throughout pre processing.

In [3]:
loc_mappings = {
    'BLD 2': 'BLD 02',
    'BLD 8': 'BLD 08',
    'BLD 7': 'BLD 07',
    'BLD 1': 'BLD 10'
}

type_float_mapping = {
    'Current Charges': float,
    'Other Charges': float,
    'Consumption (HCF)': float,
    '# days': float
}

In [4]:
dataset = pd.read_csv('./data/dataset.csv')

In [5]:
dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 34627 entries, 0 to 34626
Data columns (total 25 columns):
 #   Column               Non-Null Count  Dtype  
---  ------               --------------  -----  
 0   Development Name     34620 non-null  object 
 1   Borough              34627 non-null  object 
 2   Account Name         34627 non-null  object 
 3   Location             34365 non-null  object 
 4   Meter AMR            34575 non-null  object 
 5   Meter Scope          8585 non-null   object 
 6   TDS #                34620 non-null  float64
 7   EDP                  34627 non-null  int64  
 8   RC Code              34627 non-null  object 
 9   Funding Source       34551 non-null  object 
 10  AMP #                34618 non-null  object 
 11  Vendor Name          34627 non-null  object 
 12  UMIS BILL ID         34627 non-null  int64  
 13  Revenue Month        34627 non-null  object 
 14  Service Start Date   34627 non-null  object 
 15  Service End Date     34627 non-null 

In [6]:
# Columns to be deleted
del_cols = [
    'Development Name', 'EDP', 'Vendor Name', 'Meter Scope',
    'AMP #', 'Water&Sewer Charges', 'UMIS BILL ID', 'RC Code'
] 

bool_cols = ['Meter AMR', 'Bill Analyzed', 'Estimated', 'Funding Source', 'Rate Class',]
strnum_cols = ['Current Charges', 'Other Charges', 'Consumption (HCF)', '# days']
loc_col  = ['Location']
meter_number_col = ['Meter Number']
tds_cols = ['TDS #']

pre_proc_cols = bool_cols + strnum_cols + loc_col + meter_number_col + tds_cols

# Columns to keep. Since column transformer makes a mess with the attributes of DataFrame
# this trick will be used to retrieve the correct column's names once the transformer has finished
keep_cols = dataset.columns.difference(set.union(set(del_cols), set(pre_proc_cols)))

In the following section, special transformers are constructed to apply pre-processing operations according to dict.xlsx.

## Preliminary operations
### P04 and P05

This transformer takes in input the DataFrame with the columns ['Meter AMR', 'Bill Analyzed', 'Estimated'] and applies a boolean transformations of these features.

In [7]:
bool_transformer = FunctionTransformer(
    func=lambda df, levels: df == levels,
    kw_args={'levels': ['AMR', 'Yes', 'Y', 'FEDERAL', 'Basic Water and Sewer']},
    feature_names_out='one-to-one'
)

### P09

The following transformer maps every string where numbers >= 1000 are represented with a comma, into a classic decimal number. For every string-number where the char "," is present, the transformer: 
1. Splits when "," is encountered;
2. Joins the resulting list.

In [8]:
strnum_transformer = FunctionTransformer(
    func=lambda df: df.map(lambda v: ''.join(v.split(',')) if ',' in v else v),
    feature_names_out='one-to-one'
)

### P06

The transformers here uses the function `fill_null_locations` in order to map every different null locations with a progressive number that corresponds to the index of the meter. 

Note: even if tecnically the function should operate on a series, a DataFrame is forced in order to not mess with `ColumnTransformer` later on.

In [9]:
def fill_null_locations(location_df: pd.DataFrame, df_null_location: pd.DataFrame):
    # Use the sub-dataframe to obtain the indices of each unique meter number associated with a null location
    meter_number_null_location_idx = df_null_location[['Meter Number', 'TDS #']].value_counts().reset_index()

    # Define the mapping as it is described in the documentation
    location_map = {
        # This is used to perform an inverse mapping of a dictionary
        v: f'loc_{k + 1}' for k, v in meter_number_null_location_idx['Meter Number'].to_dict().items()
    }

    # Fill the null locations by mapping the associated meter number with its index
    location_df.loc[location_df['Location'].isna(), 'Location'] = df_null_location['Meter Number'].map(location_map)
    
    return location_df

location_imputer = FunctionTransformer(
    func=fill_null_locations,
    kw_args={'df_null_location': dataset[dataset['Location'].isna()]},
    feature_names_out='one-to-one'
)

### P07

To fix the locations names a very straightforward mapping it is applied, using the previously defined dictionary `loc_mappings`.

In [10]:
fix_location_names_transformer = FunctionTransformer(
    func=lambda df: df[['Location']].map(lambda x: x if x not in loc_mappings.keys() else loc_mappings[x]), 
    feature_names_out='one-to-one'
)

### P09

The strategy to impute unknown meter is very similar to P06.

In [11]:
def fix_un_metered(meter_df: pd.DataFrame, df_un_metered: pd.DataFrame):
    # Add a building column to the input sub-df
    df_un_metered['Building'] = df_un_metered['TDS #'].astype(str) + '_' + df_un_metered['Location']

    # Use the sub dataframe to obtain the indices that will be assigned to distinguish un-metered cases
    un_metered_idx = df_un_metered[['Meter Number', 'Building']].value_counts().reset_index()

    # Define the mapping using the new building column
    meter_map = {
        v: f'meter_{k + 1}' for k, v in un_metered_idx['Building'].to_dict().items()
    }

    meter_df.loc[meter_df['Meter Number'] =='UN-METERED', 'Meter Number'] = df_un_metered['Building'].map(meter_map)

    return meter_df

meter_imputer = FunctionTransformer(
    func=fix_un_metered,
    kw_args={'df_un_metered': dataset[dataset['Meter Number'] =='UN-METERED']},
    feature_names_out='one-to-one'
)

### Other operations

In [12]:
# A pipeline to perform the operations on the location column
location_pipeline = Pipeline([
    ('nan_loc', location_imputer),
    ('loc_names', fix_location_names_transformer)
])

tds_imputer = SimpleImputer(strategy='constant', fill_value=999)

# Define the column transformer to wrap the defined transformations into a single object
encoder_col_transf = ColumnTransformer(
    transformers=[
        ('drop_cols', 'drop', del_cols), # P01
        ('keep_cols', 'passthrough', keep_cols), # define the features to be kept without transformations
        ('bool_features', bool_transformer, bool_cols), # P04 and P05
        ('str_to_float', strnum_transformer, strnum_cols), # P09
        ('location_transformation', location_pipeline, loc_col), # P06
        ('meter_imputation', meter_imputer, meter_number_col), # P08
        ('tds_imputation', tds_imputer, tds_cols), # P02
    ],
)

## A simple pre processing pipeline

In [13]:
row_deleter = FunctionTransformer(func=lambda x: x.drop(index=[4785, 8156]), feature_names_out='one-to-one')

preprocessing_pipe = Pipeline([
    ('rows_del', row_deleter), # P10
    ('col_transf', encoder_col_transf),
])

In [14]:
clean_dataset = pd.DataFrame(
    preprocessing_pipe.fit_transform(dataset), 
    columns=encoder_col_transf.get_feature_names_out()
)

real_col_names = [col[-1] for col in clean_dataset.columns.str.split('__')]

clean_dataset = clean_dataset \
                    .rename(columns=pd.Series(real_col_names, index=clean_dataset.columns).to_dict()) \
                    .astype(type_float_mapping) \
                    .convert_dtypes() \
                    .rename(columns={'Funding Source': 'is_federal', 'Rate Class': 'is_bws'})

clean_dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 34625 entries, 0 to 34624
Data columns (total 17 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   Account Name        34625 non-null  string 
 1   Borough             34625 non-null  string 
 2   Revenue Month       34625 non-null  string 
 3   Service End Date    34625 non-null  string 
 4   Service Start Date  34625 non-null  string 
 5   Meter AMR           34625 non-null  boolean
 6   Bill Analyzed       34625 non-null  boolean
 7   Estimated           34625 non-null  boolean
 8   is_federal          34625 non-null  boolean
 9   is_bws              34625 non-null  boolean
 10  Current Charges     34625 non-null  Float64
 11  Other Charges       34625 non-null  Float64
 12  Consumption (HCF)   34625 non-null  Int64  
 13  # days              34625 non-null  Int64  
 14  Location            34625 non-null  string 
 15  Meter Number        34625 non-null  string 
 16  TDS 

## Turning meter into unit of analysis

After the pre processing made, a meter cannot be associated anymore with multiple buildings.

In [15]:
clean_dataset[['Meter Number', 'TDS #', 'Location']].value_counts().shape[0] == clean_dataset['Meter Number'].unique().shape[0]

True

Define aggregations using dictionaries. To see the deepests reasons behind these operations look dict.xlsx.

In [16]:
first_agg_not_strings = ['TDS #', 'is_federal', 'is_bws', 'Meter AMR']
lonely_columns = ['Revenue Month', 'Service End Date', 'Meter Number']

first_agg_cols = clean_dataset.select_dtypes(include='string').columns.difference(lonely_columns).append(pd.Index(first_agg_not_strings))
sum_agg_cols = clean_dataset.select_dtypes(include=['number', 'bool']).columns.difference(first_agg_not_strings)

first_agg_mapping = {value: 'first' for value in first_agg_cols}
sum_agg_mapping = {value: lambda x: round(abs(sum(x)), 2) for value in sum_agg_cols}
other_agg_mapping = {lonely_columns[1]: 'last', lonely_columns[0]: 'count'}

aggregation_operations = first_agg_mapping | sum_agg_mapping | other_agg_mapping

At this point the groupby can be performed safely. The resulting DataFrame will be of about 700 instances.

In [17]:
aggregator_transformer = FunctionTransformer(
    func=lambda df, agg_map: (
        df.groupby(by='Meter Number')
            .aggregate(agg_map)
            .rename(columns={'Revenue Month': 'Times read'})
            .reset_index(drop=False)), 
    kw_args={'agg_map': aggregation_operations}
)

After the aggregation is performed, TDS and Location can be finally merged into a unique features called **buiding**.

In [18]:
def create_building(agg_df: pd.DataFrame):
    agg_df['Building'] = agg_df['TDS #'].astype(str) + '_' + agg_df['Location']
    agg_df = agg_df.drop(columns=(['Location', 'TDS #']))

    return agg_df

def create_score_columns(agg_df: pd.DataFrame):
    agg_df['Estimated score'] = np.abs((agg_df['Estimated'] / agg_df['Times read']))
    agg_df['Bill Analyzed score'] = np.abs((agg_df['Bill Analyzed'] / agg_df['Times read']))
    
    agg_df = agg_df.drop(columns=['Estimated', 'Times read', 'Bill Analyzed'])

    return agg_df

building_creator = FunctionTransformer(func=create_building)
score_creator = FunctionTransformer(func=create_score_columns)

Create a pipeline to perform these operations.

In [19]:
aggregation_pipeline = Pipeline([
    ('aggregation', aggregator_transformer),
    ('building', building_creator),
    ('score', score_creator)
])

agg_dataset = aggregation_pipeline \
                    .fit_transform(clean_dataset)

Let's save this version of the dataset for further explorations.

In [20]:
agg_dataset.to_csv('./data/agg_dataset.csv', index=False)

## After aggregation operations

### AP03

In [21]:
borough_transformer = FunctionTransformer(
    func=lambda df: df[['Borough']].map(lambda v: 'QUEENS' if v == 'FHA' else v),
    feature_names_out='one-to-one'
)

### AP04

In [22]:
location_cat_getter = FunctionTransformer(
    func=lambda df: df[['Building']].map(
                            lambda x: 
                                'STREET' if 'STREET' in x else 
                                'AVENUE' if 'AVENUE' in x else
                                'PLACE' if 'PLACE' in x else 
                                'COMMUNITY CENTER' if 'Community Center' in x else
                                'BOULEVARD' if 'BOULEVARD' in x else
                                'ROAD' if 'ROAD' in x else 
                                'UNSPECIFIED_TYPE' # default
                            ),
    feature_names_out='one-to-one'
)

### AP05

In [23]:
development_type_getter = FunctionTransformer(
    func=lambda df: df[['Account Name']].map(
                                    lambda x: 
                                        'FHA' if 'FHA' in x else 
                                        'REHAB' if 'REHAB' in x else 
                                        'UNSPECIFIED_CATEGORY' # default
                                    ),
    feature_names_out='one-to-one'
)

### AP07

In [24]:
def split_date(s: pd.Series):
    splitting = s.str.split('/')
    date_feature_name = s.name

    df = pd.DataFrame(s) # the series is turned into a DataFrame

    df[f'month_{date_feature_name}'] = splitting.map(lambda x: x [0])
    df[f'day_{date_feature_name}'] = splitting.map(lambda x: x[1])
    df[f'year_{date_feature_name}'] = splitting.map(lambda x: x[2])

    df = df.drop(columns=[date_feature_name])

    return df

date_divider = FunctionTransformer(func=split_date)

### Column transformer definition

In [25]:
# Applied for clustering dataset
mappings_col_transf_clustering = ColumnTransformer(
    transformers=[
        ('del_meter_numer', 'drop', 'Meter Number'),
        ('boroughs_mapping', borough_transformer, ['Borough']),
        ('location_category', location_cat_getter, ['Building']),
        ('development_type', development_type_getter, ['Account Name']),
        ('other_charges_bool', bool_transformer, ['Other Charges']),
        ('date_end_divider', date_divider, 'Service Start Date'),
        ('date_start_divider', date_divider, 'Service End Date'),
        ('del_dates', 'drop', ['Service Start Date', 'Service End Date']),
    ],
    remainder='passthrough'
).set_output(transform='pandas')

# Applied to perform association analysis
mappings_col_transf_association = ColumnTransformer(
    transformers=[
        ('del_meter_numer', 'drop', 'Meter Number'),
        ('boroughs_mapping', borough_transformer, ['Borough']),
        ('location_category', location_cat_getter, ['Building']),
        ('development_type', development_type_getter, ['Account Name']),
        ('other_charges_bool', bool_transformer.set_params(kw_args={'levels':0}), ['Other Charges']),
    ],
    remainder='passthrough'
).set_output(transform='pandas')

The following encodings are meant for clustering purposes.

In [26]:
count_enc_features = [
    'boroughs_mapping__Borough', 
    'location_category__Building', 
    'development_type__Account Name'
]

cyclic_enc_features = [
    'date_end_divider__month_Service Start Date', 
    'date_end_divider__day_Service Start Date',
    'date_start_divider__month_Service End Date',
    'date_start_divider__day_Service End Date'
]

enc_col_transf_clustering = ColumnTransformer(
    transformers=[
        ('count_enc', CountFrequencyEncoder(encoding_method='frequency'), count_enc_features),
        ('cyclic_enc', CyclicalFeatures(drop_original=True), cyclic_enc_features)
    ], 
    remainder='passthrough'
).set_output(transform='pandas')

In [27]:
clustering_preproc_pipeline = Pipeline([
    ('mappings', mappings_col_transf_clustering),
    ('cast', FunctionTransformer(func=casting, kw_args={'features': cyclic_enc_features})),
    ('encoding', enc_col_transf_clustering)
])

agg_dataset_clustering = clustering_preproc_pipeline.fit_transform(agg_dataset).convert_dtypes()
agg_dataset_association = mappings_col_transf_association.fit_transform(agg_dataset).convert_dtypes()

In [28]:
agg_dataset_clustering.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 742 entries, 0 to 741
Data columns (total 22 columns):
 #   Column                                                      Non-Null Count  Dtype  
---  ------                                                      --------------  -----  
 0   count_enc__boroughs_mapping__Borough                        742 non-null    Float64
 1   count_enc__location_category__Building                      742 non-null    Float64
 2   count_enc__development_type__Account Name                   742 non-null    Float64
 3   cyclic_enc__date_end_divider__month_Service Start Date_sin  742 non-null    Float64
 4   cyclic_enc__date_end_divider__month_Service Start Date_cos  742 non-null    Float64
 5   cyclic_enc__date_end_divider__day_Service Start Date_sin    742 non-null    Float64
 6   cyclic_enc__date_end_divider__day_Service Start Date_cos    742 non-null    Float64
 7   cyclic_enc__date_start_divider__month_Service End Date_sin  742 non-null    Float64
 8   

Prepare clustering dataset to be saved.

In [29]:
real_col_names = [col[-1].lower().replace(' ', '_') for col in agg_dataset_clustering.columns.str.split('__')]

agg_dataset_clustering = agg_dataset_clustering \
                        .rename(columns=pd.Series(real_col_names, index=agg_dataset_clustering.columns).to_dict()) \
                        .rename(columns={
                            'account_name': 'development_type',
                            'building': 'location_category',
                            'meter_amr': 'is_meter_amr',
                            'other_charges': 'is_only_water_sewer_charges'
                        }) \
                        .drop(columns=['is_bws'])

In [30]:
agg_dataset_clustering.to_csv('./data/cluster_dataset.csv', index=False)

In [31]:
agg_dataset_association.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 742 entries, 0 to 741
Data columns (total 14 columns):
 #   Column                             Non-Null Count  Dtype  
---  ------                             --------------  -----  
 0   boroughs_mapping__Borough          742 non-null    string 
 1   location_category__Building        742 non-null    string 
 2   development_type__Account Name     742 non-null    string 
 3   other_charges_bool__Other Charges  742 non-null    boolean
 4   remainder__Service Start Date      742 non-null    string 
 5   remainder__is_federal              742 non-null    boolean
 6   remainder__is_bws                  742 non-null    boolean
 7   remainder__Meter AMR               742 non-null    boolean
 8   remainder__# days                  742 non-null    Int64  
 9   remainder__Consumption (HCF)       742 non-null    Int64  
 10  remainder__Current Charges         742 non-null    Float64
 11  remainder__Service End Date        742 non-null    string 

Prepare association rules dataset to be saved.

In [32]:
real_col_names = [col[-1].lower().replace(' ', '_') for col in agg_dataset_association.columns.str.split('__')]

agg_dataset_association = agg_dataset_association \
                        .rename(columns=pd.Series(real_col_names, index=agg_dataset_association.columns).to_dict()) \
                        .rename(columns={
                            'account_name': 'development_type',
                            'building': 'location_category',
                            'meter_amr': 'is_meter_amr',
                            'other_charges': 'is_only_water_sewer_charges'
                            
                        }) \
                        .drop(columns=['is_bws'])

agg_dataset_association.to_csv('./data/association_dataset.csv', index=False)