# Example data cleaning

In [1]:
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline, make_pipeline

from cleaners import data_types, drop_replace, eliminate_feats, impute
from cleaners.indicators import AddIndicators

X, y = fetch_openml("titanic", version=1, as_frame=True, return_X_y=True)

### Add some bogus columns


In [2]:
rs = np.random.RandomState(0)
sz = len(X)

# add dupe columns

X["float_to_str"] = rs.normal(scale=0.4, size=sz).astype(str)  # should get converted to float
X["Some<Colu,mn,NAme"] = rs.random_sample(size=sz)
X["zero_variance"] = np.ones(sz)
X["all_nans"] = pd.Series(len(X) * [np.nan])
X["correlated_to_age_1"] = 0.5 * X["age"] + rs.normal(scale=0.1, size=sz) + 6
X["correlated_to_age_2"] = X["age"] + rs.normal(scale=0.4, size=sz) - 7
y[pd.Series(y.index).sample(frac=0.05)] = np.nan
X["target"] = y

# add some nans
X.loc[pd.Series(X.index.values).sample(frac=0.1), "fare"] = np.nan

# add dupes
X = X.sample(n=10000, replace=True)
X["ix"] = pd.Series(np.ones(len(X))).cumsum() + 1
X.set_index("ix")
len(X)

10000

## Split dataset

In [3]:
X_train, X_test = train_test_split(X, test_size=0.2)
len(X_train)

8000

In [5]:
X.columns

Index(['pclass', 'name', 'sex', 'age', 'sibsp', 'parch', 'ticket', 'fare',
       'cabin', 'embarked', 'boat', 'body', 'home.dest', 'float_to_str',
       'Some<Colu,mn,NAme', 'zero_variance', 'all_nans', 'correlated_to_age_1',
       'correlated_to_age_2', 'target', 'ix'],
      dtype='object')

### Run like any sklearn transformers

In [None]:
X_train =  drop_replace.DropNa(subset=["target"]).fit_transform(X_train)
len(X_train)

### Set up data clean pipeline

In [None]:
mandatory_columns = ["pclass"]

pipeline_steps = [
    ("pre_drop", drop_replace.DropNamedCol(drop_cols=["name", "ticket", "home.dest"])),
    (
        "fix_types",
        data_types.FixDTypes(verbose=True),
    ),
    ("replace_bad_names", drop_replace.ReplaceBadColnameChars()),
    (
        "drop_mostly_nan",
        eliminate_feats.DropMostlyNaN(
            nan_frac_thresh=0.5,
            mandatory=mandatory_columns,
            skip_if_missing=True,
            sample_rate=0.3,
        ),
    ),

    (
        "drop_uninformative_1",
        eliminate_feats.DropUninformative(mandatory=mandatory_columns, sample_rate=0.3),
    ),
    (
        "impute",
        impute.ImputeByValue(
            cols=["fare"],
            imputer_kwargs=dict(add_indicator=True, strategy="median"),
            sample_rate=0.3,
        ),
    ),
    (
        "indicators",
        AddIndicators(
            values_to_indicate={"boat": ["None", "13 15 B", "C D"], "cabin": ["None"]},
            sample_rate=0.3,
        ),
    ),
    (
        "feature_elim_1",
        eliminate_feats.HighCorrelationElim(
            mandatory=mandatory_columns, sample_rate=0.3
        ),
    ),
]

In [None]:
# run the pipeline
your_pipeline = Pipeline(pipeline_steps)
X_train_tr = your_pipeline.fit_transform(X_train)
X_train_tr

In [None]:
assert X_train_tr.dtypes["float_to_str"] == "float64"

### Apply on test dataset

In [None]:
X_test_tr = your_pipeline.transform(X_test)

In [None]:
X_test_tr

## Also works with dask dataframes

In [None]:
import dask.dataframe as dd

X_test_tr_dd = your_pipeline.transform(dd.from_pandas(X_test, npartitions=3)).compute()

In [None]:
assert (X_test_tr.columns == X_train_tr.columns).all()

In [None]:
pd.testing.assert_frame_equal(X_test_tr_dd.sort_index(), X_test_tr.sort_index())