In [1]:
import dask
import dask.delayed
import time
import dask.dataframe as dd
from dask_ml.compose import ColumnTransformer
from dask_ml.impute import SimpleImputer
from dask_ml.model_selection import GridSearchCV
from dask_ml.preprocessing import StandardScaler, DummyEncoder
from sklearn.preprocessing import OneHotEncoder
from distributed import wait
from dask_ml.metrics import accuracy_score
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
from dask_ml.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from dask.distributed import Client, wait
from dask_saturn import SaturnCluster

# n_workers = 3
# cluster = SaturnCluster(n_workers=n_workers, scheduler_size="medium", worker_size="large")
# client = Client(cluster)

## try local cluster first
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)

In [2]:
# client.restart()
# client.close()
# ??LocalCluster

## View Cluster
- to access local cluster do the following:
  - replace lab/* with proxy/8787/status where the number if the port
- If I manually scale I can get more ram than I should

In [2]:
cluster

VBox(children=(HTML(value='<h2>LocalCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    …

In [3]:
df = dd.read_csv(
    "https://storage.googleapis.com/gcpdatascience-tribal-bird-305118/dunnhumby/training_data_psuedo.csv",
     dtype={
        'TARGET': 'int32',
        'HOUSEHOLD_KEY': 'float32',
        'COUPONS_REDEEMED': 'int32',
        'NUMBER_OF_CAMPAIGNS': 'int32',
        'COUPON_DAYS': 'int32',
        'FIRST_COUPON_DAY': 'int32',
        'LAST_COUPON_DAY': 'int32',
        'CAMPAIGNS_RECEIVED_REDEEMED_RATIO': 'float32',
        'NUMBER_TYPE_A': 'int32',
        'NUMBER_TYPE_B': 'int32',
        'NUMBER_TYPE_C': 'int32',
        'AGE_DESC': 'category',
        'MARITAL_STATUS_CODE': 'category',
        'INCOME_DESC': 'category',
        'HOMEOWNER_DESC': 'category',
        'HH_COMP_DESC': 'category',
        'HOUSEHOLD_SIZE_DESC': 'category',
        'KID_CATEGORY_DESC': 'category',
        'NUMBER_OF_PURCHASES': 'int32',
        'TOTAL_NUMBER_OF_PRODUCTS': 'int32' , 
        'AVG_PRODUCTS_PER_BASKET': 'float32',
        'TOTAL_SPEND': 'int32',
        'STD_SPEND': 'float32',
        'AVG_SPEND_PER_BASKET': 'float32',
        'AVG_SPEND_PER_PRODUCT': 'float32',
        'MOST_EXPENSIVE_PURCHASE':'int32',
        'AVG_DISCOUNT': 'float32',
        'BIGGEST_DISCOUNT': 'int32',
        'CUSTOMER_AGE': 'int32',
        'NUMBER_OF_WEEKS': 'int32',
        'NUMBER_UNIQUE_DAYS': 'int32',
        'AVG_GAP_IN_TRIPS': 'float32',
        'STD_GAPS_IN_TRIPS': 'float32',
        'MAX_GAPS_IN_TRIPS': 'float32'
    }
).sample(frac=0.3)

print(f"Num rows: {len(df)}, Size: {df.memory_usage(deep=True).sum().compute() / 1e6} MB")

print(df.dtypes)

Num rows: 2100751, Size: 258.467913 MB
TARGET                                  int32
HOUSEHOLD_KEY                         float32
COUPONS_REDEEMED                        int32
NUMBER_OF_CAMPAIGNS                     int32
COUPON_DAYS                             int32
FIRST_COUPON_DAY                        int32
LAST_COUPON_DAY                         int32
CAMPAIGNS_RECEIVED_REDEEMED_RATIO     float32
NUMBER_TYPE_A                           int32
NUMBER_TYPE_B                           int32
NUMBER_TYPE_C                           int32
AGE_DESC                             category
MARITAL_STATUS_CODE                  category
INCOME_DESC                          category
HOMEOWNER_DESC                       category
HH_COMP_DESC                         category
HOUSEHOLD_SIZE_DESC                  category
KID_CATEGORY_DESC                    category
NUMBER_OF_PURCHASES                     int32
TOTAL_NUMBER_OF_PRODUCTS                int32
AVG_PRODUCTS_PER_BASKET               flo

In [4]:
features = [col for col in df.columns if col not in ["TARGET", "HOUSEHOLD_KEY"]]
label = "TARGET"

# create column transformer
numeric_features = df[features].select_dtypes(include=["int64", "float64"]).columns
categorical_features = df[features].select_dtypes(include=["category"]).columns

df = df.categorize(columns=categorical_features) # need known categorical data for dummyencoder

## train test split
X_train, X_test, y_train, y_test = train_test_split(
    df[features], df[label], test_size=0.25, random_state=2,
    shuffle=True
)
df

Unnamed: 0_level_0,TARGET,HOUSEHOLD_KEY,COUPONS_REDEEMED,NUMBER_OF_CAMPAIGNS,COUPON_DAYS,FIRST_COUPON_DAY,LAST_COUPON_DAY,CAMPAIGNS_RECEIVED_REDEEMED_RATIO,NUMBER_TYPE_A,NUMBER_TYPE_B,NUMBER_TYPE_C,AGE_DESC,MARITAL_STATUS_CODE,INCOME_DESC,HOMEOWNER_DESC,HH_COMP_DESC,HOUSEHOLD_SIZE_DESC,KID_CATEGORY_DESC,NUMBER_OF_PURCHASES,TOTAL_NUMBER_OF_PRODUCTS,AVG_PRODUCTS_PER_BASKET,TOTAL_SPEND,STD_SPEND,AVG_SPEND_PER_BASKET,AVG_SPEND_PER_PRODUCT,MOST_EXPENSIVE_PURCHASE,AVG_DISCOUNT,BIGGEST_DISCOUNT,CUSTOMER_AGE,NUMBER_OF_WEEKS,NUMBER_UNIQUE_DAYS,AVG_GAP_IN_TRIPS,STD_GAPS_IN_TRIPS,MAX_GAPS_IN_TRIPS
npartitions=20,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1
,int32,float32,int32,int32,int32,int32,int32,float32,int32,int32,int32,category[known],category[known],category[known],category[known],category[known],category[known],category[known],int32,int32,float32,int32,float32,float32,float32,int32,float32,int32,int32,int32,int32,float32,float32,float32
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


## Persist Data to each worker

In [5]:
%%time
X_train,  X_test, y_train, y_test = dask.persist(X_train, X_test, 
                                                 y_train, y_test)

_ = wait(X_train)

CPU times: user 3.04 s, sys: 628 ms, total: 3.67 s
Wall time: 32.5 s


## Define our sklearn pipeline

In [6]:
numeric_transformer = Pipeline(
    steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
)

categorical_transformer =  DummyEncoder()

preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, numeric_features),
        ("cat", categorical_transformer, categorical_features),
    ]
)

pipelines = []

lr = Pipeline(
    steps=[
        ("preprocessor", preprocessor),
        ("clf", LogisticRegression(solver="liblinear", class_weight="balanced")),
    ]
)

rf = Pipeline(
       steps=[
         ("preprocessor", preprocessor),
         ("RF", RandomForestClassifier(random_state=2, class_weight="balanced")),
       ]
)


xgb = Pipeline(
        steps=[
         ("preprocessor", preprocessor),
         ("XGB", GradientBoostingClassifier(random_state=2)),
                ]
            )

pipelines = [lr, rf, xgb]

## Execute Test Model

In [8]:
# %%time

# start = time.perf_counter()

# lr_fitted = lr.fit(X_train, y_train)

# preds = lr_fitted.predict_proba(X_test)[:, 1]
# end = time.perf_counter()

# roc_auc_score(y_test, preds)
# timed_fit = end - start

## Run Multiple Models in a Pipeline

In [7]:
%%time

start = time.perf_counter()
pipelines_ = [dask.delayed(pl).fit(X_train, y_train) for pl in pipelines]
fit_pipelines = dask.compute(*pipelines_)
end = time.perf_counter()
pipeline_timed_fit = end - start

CPU times: user 48.1 s, sys: 9.74 s, total: 57.8 s
Wall time: 10min 17s


In [8]:
print(pipeline_timed_fit)

617.2008071310001
