# Preprocessing

We will process our data set and prepare it for modelling based on our EDA findings.

## Load data 

In [0]:
# -*- coding: utf-8 -*-
import dataiku
import pandas as pd, numpy as np
from dataiku import pandasutils as pdu

def load_data_by_name(name: str) -> pd.DataFrame:
    """
    Load dataset by its name.
    """
    mydataset = dataiku.Dataset(name)
    mydataset_df = mydataset.get_dataframe()
    
    return mydataset_df

train_df = load_data_by_name("census_income_learn")
test_df = load_data_by_name("census_income_test")

In [0]:
TARGET = "income"

In [0]:
col_mapping = {
    "col_0": "age", # matches type and range
    "col_1": "class of worker", # unique values checked with data dict (UVDD)
    "col_2": "detailed industry recode", # UVDD
    "col_3": "detailed occupation recode", # UVDD
    "col_4": "education", # UVDD
    "col_5": "wage per hour", # looks to be at right position, type checks, in cents?
    "col_6": "enroll in edu inst last wk", # UVDD
    "col_7": "marital stat", # UVDD
    "col_8": "major industry code", # UVDD
    "col_9": "major occupation code", # UVDD
    "col_10": "race", # UVDD
    "col_11": "hispanic origin", # UVDD - 10 unique in data dict? values match though
    "col_12": "sex", # UVDD
    "col_13": "member of a labor union", # UVDD
    "col_14": "reason for unemployment", # UVDD
    "col_15": "full or part time employment stat", # UVDD
    "col_16": "capital gains", # data dict check, range ok, dollars?
    "col_17": "capital losses", # data dict check, range ok, dollars?
    "col_18": "dividends from stocks", # data dict check
    "col_19": "tax filer stat", # UVDD
    "col_20": "region of previous residence", # UVDD
    "col_21": "state of previous residence", # UVDD
    "col_22": "detailed household and family stat", # data dict check
    "col_23": "detailed household summary in household", # data dict check
    "col_24": "instance weight", # SPECIAL
    "col_25": "migration code-change in msa", # UVDD
    "col_26": "migration code-change in reg", # UVDD
    "col_27": "migration code-move within reg", # UVDD
    "col_28": "live in this house 1 year ago",# UVDD
    "col_29": "migration prev res in sunbelt", # UVDD
    "col_30": "num persons worked for employer", # value check
    "col_31": "family members under 18", # UVDD
    "col_32": "country of birth mother",  # UVDD
    "col_33": "country of birth self",  # UVDD
    "col_34": "country of birth father",  # UVDD
    "col_35": "citizenship", # UVDD
    "col_36": "own business or self employed", # UVDD
    "col_37": "fill inc questionnaire for veteran's admin", # UVDD
    "col_38": "veterans benefits", # UVDD
    "col_39": "weeks worked in year", # data dict order
    "col_40": "year", # UVDD
    "col_41": "income"
}

In [0]:
num_to_object_cols = [
    "detailed industry recode", 
    "detailed occupation recode",
    "own business or self employed",
    "veterans benefits",
    "year",
    "num persons worked for employer",
]

col_type_map = {col: "object" for col in num_to_object_cols}

### Column mapping and dtypes 

We will organize the workflow into a single pipeline of transformers. This helps to ensure we apply the same transformations to train and test, avoid data leakage, and creates more easily maintainable code.

In [0]:
from sklearn import set_config
set_config(transform_output="pandas")

In [0]:
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin, OneToOneFeatureMixin
from typing import Dict

class MapColumns(TransformerMixin, BaseEstimator, OneToOneFeatureMixin):
    def __init__(self, *, col_name_map: Dict[str, str], col_type_map: Dict[str, str]):
        self.col_name_map = col_name_map
        self.col_type_map = col_type_map
        self.feature_names_out_ = None

    def fit(self, X, y=None):
        self.n_features_in_ = X.shape[1]
        return self
    
    def transform(self, X):
        X_out = X.copy()
        
        X_out = X_out.rename(columns=self.col_name_map)
        
        for col, type_ in self.col_type_map.items():
            X_out[col] = X_out[col].astype(type_)
        
        self.feature_names_out_ = X_out.columns
        
        return X_out
    
    def get_feature_names_out(self, input_features=None):
        return self.feature_names_out_

    
mapper_transformation = MapColumns(
    col_name_map=col_mapping,
    col_type_map=col_type_map
)

# test
sample = mapper_transformation.fit_transform(train_df.head(100))
sample

In [0]:
mapper_transformation.get_feature_names_out()

## Data Cleaning

We deal with missing values and duplicates as proposed in the EDA notebook.

In [0]:
from typing import List

from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer

class DropDuplicates(TransformerMixin, BaseEstimator, OneToOneFeatureMixin):
    def __init__(self, *, ignore_cols: List[str]):
        self.ignore_cols = ignore_cols
    
    def fit(self, X, y=None):
        self.n_features_in_ = X.shape[1]
        self.feature_names_out_ = list(X.columns)
        
        return self
    
    def transform(self, X):
        subset = X.drop(columns=self.ignore_cols).columns
        
        return X.drop_duplicates(subset=subset)
    
    def get_feature_names_out(self, input_features=None):
        return self.feature_names_out_

### Imputation ###

imputer_transformation = ColumnTransformer(
    [
        ('impute missing', SimpleImputer(strategy='constant', fill_value="Do not know"), ['hispanic origin'])
    ],
    remainder='passthrough',
    verbose_feature_names_out=False,
)

In [0]:
dropper_transformation = DropDuplicates(ignore_cols=['instance weight', 'income'])

sample = dropper_transformation.fit_transform(sample)
sample = imputer_transformation.fit_transform(sample)

In [0]:
sample.shape

In [0]:
sample.head()

## Extract categorical features

First, lets take those features from the EDA which showed the most singificant relationship with our target.

In [0]:
columns_to_dummy = [
    'class of worker',
    'sex',
    'education',
    'marital stat',
    'full or part time employment stat',
]

num_columns = [
    "age", "wage per hour", "capital gains", "capital losses", "dividends from stocks", "weeks worked in year"
]


We also take this step to encode the target:

In [0]:
sample[TARGET].unique()

In [0]:
class TargetEncode(TransformerMixin, BaseEstimator, OneToOneFeatureMixin):
    """
    Encode a binary target keeping a specified category.
    """
    def __init__(self, col_keep: str):
        self.col_keep = col_keep
    
    def fit(self, X, y=None):
        self.n_features_in_ = X.shape[1]
        self.feature_names_out_ = list(X.columns)
        
        return self
    
    def transform(self, X):
        X_out = X.copy()
        X_out[TARGET] = np.where(X_out[TARGET] == self.col_keep, 1, 0)
        
        return X_out
    
    def get_feature_names_out(self, input_features=None):
        return self.feature_names_out_
    
class PassthroughTransformer(TransformerMixin, BaseEstimator, OneToOneFeatureMixin):
    """
    Helper to keep some columns.
    """
    def __init__(self):
        pass
    
    def fit(self, X, y=None):
        self.n_features_in_ = X.shape[1]
        self.feature_names_out_ = list(X.columns)
        
        return self
    
    def transform(self, X):
        return X
    
    def get_feature_names_out(self, input_features=None):
        return self.feature_names_out_
    

In [0]:
from sklearn.preprocessing import OneHotEncoder, LabelEncoder

dummy_encoder = ColumnTransformer(
    [
        ('dummy_encode', OneHotEncoder(drop="first", min_frequency=0.05, sparse_output=False), columns_to_dummy),
        ('target_encode', TargetEncode(col_keep="50000+."), [TARGET]),
        ('passthrough', PassthroughTransformer(), num_columns)
    ],
    verbose_feature_names_out=False,
    remainder='drop',
)

dummy_encoder.fit_transform(sample)

### Define and Run the Preprocessing Pipeline 

Finally, we are ready to put together our pipeline:

In [0]:
### DEFINE ###

# column mapping
mapper_transformation = MapColumns(
    col_name_map=col_mapping,
    col_type_map=col_type_map
)

# duplicates
dropper_transformation = DropDuplicates(ignore_cols=['instance weight', 'income'])

# nulls
imputer_transformation = ColumnTransformer(
    [
        ('impute missing', SimpleImputer(strategy='constant', fill_value="Do not know"), ['hispanic origin'])
    ],
    remainder='passthrough',
    verbose_feature_names_out=False,
)


# final encoding - keeping numeric columns as is
dummy_encoder = ColumnTransformer(
    [
        ('dummy_encode', OneHotEncoder(drop="first", min_frequency=0.05, sparse_output=False), columns_to_dummy),
        ('target_encode', TargetEncode(col_keep="50000+."), [TARGET]),
        ('passthrough', PassthroughTransformer(), num_columns)
    ],
    verbose_feature_names_out=False,
    remainder='drop',
)


### COMPOSE ###

preprocessing_pipeline = Pipeline(
    [
        ('map_columns', mapper_transformation),
        ('drop_duplication', dropper_transformation),
        ('impute_null', imputer_transformation),
        ('dummy_encoder', dummy_encoder)
    ]
)

### FIT ###

fitted_pipeline = preprocessing_pipeline.fit(train_df)

### TRANSFORM ###

train_processed = fitted_pipeline.transform(train_df)
test_processed = fitted_pipeline.transform(test_df)

In [0]:
train_processed.head()

In [0]:
train_processed.shape, test_processed.shape

## Sanity Checks

These need to pass before we can save the data.

In [0]:
def validate_processed(df: pd.DataFrame) -> None:
    """
    Ensure key attributes before saving data.
    """
    # no nulls
    assert df.isna().sum().sum() == 0
    
    # target present
    assert TARGET in df.columns
    
    # all numeric
    assert df.shape[1] == df.select_dtypes("number").shape[1]
    
validate_processed(train_processed)
validate_processed(test_processed)

## Save Data 

In [0]:
# Write recipe outputs
processed_learn = dataiku.Dataset("processed_learn")
processed_learn.write_with_schema(train_processed)

processed_test = dataiku.Dataset("processed_test")
processed_test.write_with_schema(test_processed)