In [1]:
import warnings
warnings.filterwarnings("ignore")

from dask.distributed import Client, progress

client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:62602  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 8.00 GB


In [2]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_validate, GridSearchCV
from sklearn.metrics import roc_auc_score
import joblib
from dask_ml.model_selection import train_test_split
import pandas as pd

In [3]:
import dask.dataframe as dd

df = dd.read_csv('https://tf-assets-prod.s3.amazonaws.com/tf-curric/data-science/creditcard.csv',
                dtype={'Time': 'float64'})

In [4]:
df.head()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,0.0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,...,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0
1,0.0,1.191857,0.266151,0.16648,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,...,-0.225775,-0.638672,0.101288,-0.339846,0.16717,0.125895,-0.008983,0.014724,2.69,0
2,1.0,-1.358354,-1.340163,1.773209,0.37978,-0.503198,1.800499,0.791461,0.247676,-1.514654,...,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0
3,1.0,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,...,-0.1083,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.5,0
4,2.0,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,...,-0.009431,0.798278,-0.137458,0.141267,-0.20601,0.502292,0.219422,0.215153,69.99,0


In [5]:
X = df.drop(columns=['Time', 'Class'])

y = df.Class

In [6]:
X

Unnamed: 0_level_0,V1,V2,V3,V4,V5,V6,V7,V8,V9,V10,V11,V12,V13,V14,V15,V16,V17,V18,V19,V20,V21,V22,V23,V24,V25,V26,V27,V28,Amount
npartitions=3,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1
,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [7]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

In [8]:
rf_model = RandomForestClassifier()

In [9]:
with joblib.parallel_backend('dask'):
    scores = cross_validate(rf_model, X_train.compute(), y_train.compute(), cv=4)
    
scores

{'fit_time': array([228.7873919 , 226.47077489, 225.79484129, 217.21821618]),
 'score_time': array([0.29788709, 0.23246408, 0.38835096, 0.34624791]),
 'test_score': array([0.99917508, 0.99940324, 0.99945589, 0.99929793])}

In [8]:
rf_params = {"max_depth": [2, 4, 8, 16]}

rf_model = RandomForestClassifier()

grid_search_rf = GridSearchCV(rf_model,
                           param_grid=rf_params,
                           return_train_score=True,
                           cv=4,
                           n_jobs=-1, 
                           scoring='roc_auc')

In [9]:
with joblib.parallel_backend('dask'):
    grid_search_rf.fit(X_train.compute(), y_train.compute())

In [10]:
print(f"The best value is: {grid_search_rf.best_params_}")
print("The test AUC score is: ", grid_search_rf.score(X_test.compute(), y_test.compute()))

The best value is: {'max_depth': 16}
The test AUC score is:  0.9800834890691456


In [11]:
grid_search_rf.cv_results_

{'mean_fit_time': array([488.79011351, 512.16819048, 503.44695884, 525.96539205]),
 'std_fit_time': array([37.357987  , 11.51777506, 19.79309034,  3.46308919]),
 'mean_score_time': array([0.23121154, 0.24077004, 0.3013342 , 0.32125372]),
 'std_score_time': array([0.03269812, 0.03015991, 0.03321266, 0.03203861]),
 'param_max_depth': masked_array(data=[2, 4, 8, 16],
              mask=[False, False, False, False],
        fill_value='?',
             dtype=object),
 'params': [{'max_depth': 2},
  {'max_depth': 4},
  {'max_depth': 8},
  {'max_depth': 16}],
 'split0_test_score': array([0.9900059 , 0.99265429, 0.99722891, 0.99629935]),
 'split1_test_score': array([0.91846483, 0.91469679, 0.92385389, 0.94326229]),
 'split2_test_score': array([0.95747189, 0.97716522, 0.9814463 , 0.97566394]),
 'split3_test_score': array([0.94370982, 0.950823  , 0.98011473, 0.96867572]),
 'mean_test_score': array([0.95241311, 0.95883483, 0.97066096, 0.97097532]),
 'std_test_score': array([0.02582171, 0.0295470

In [12]:
from dask_ml.linear_model import LogisticRegression

In [13]:
lr = LogisticRegression()
lr.fit(X_train.values.compute(), y_train.values.compute())

LogisticRegression()

In [14]:
preds_train = lr.predict(X_train.values.compute())
preds_test = lr.predict(X_test.values.compute())

print("Training score is: ", roc_auc_score(preds_train, y_train.values.compute()))
print("Test score is: ", roc_auc_score(preds_test, y_test.values.compute()))

Training score is:  0.8888788398067726
Test score is:  0.9057810949482296


In [15]:
client.close()

