# Big Data

### Machine Learning with Dask

* David Booker-Earley
* First attempt: 6/3/2020
* Most recent attempt: 7/2/2020

<!-- * Checkpoint 5, Module 37 -->

---

In [1]:
!pip install --upgrade "dask[complete]"

Collecting dask[complete]
  Downloading dask-2.20.0-py3-none-any.whl (826 kB)
[K     |████████████████████████████████| 826 kB 10.9 MB/s eta 0:00:01
Installing collected packages: dask
  Attempting uninstall: dask
    Found existing installation: dask 2.19.0
    Uninstalling dask-2.19.0:
      Successfully uninstalled dask-2.19.0
Successfully installed dask-2.20.0


In [2]:
!pip install dask-ml



In [3]:
import warnings
warnings.filterwarnings("ignore")


## imports to load and (pre-)process data
from dask.distributed import Client, progress
from dask_ml.model_selection import train_test_split
import dask.dataframe as dd
import joblib
import pandas as pd
import time


## imports for ML models and evaluation
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_validate, GridSearchCV
# from dask_ml.model_selection import GridSearchCV
from sklearn.metrics import roc_auc_score

In [4]:
## Instantiate Dask dashboard to monitor code execution
client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')
# client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:32933  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 8.00 GB


# Load Data

Source: [Kaggle](https://www.kaggle.com/mlg-ulb/creditcardfraud)

---

From the data source:

>Features V1, V2, … V28 are the principal components obtained with PCA, the only features which have not been transformed with PCA are 'Time' and 'Amount'. Feature 'Time' contains the seconds elapsed between each transaction and the first transaction in the dataset. The feature 'Amount' is the transaction Amount, this feature can be used for example-dependant cost-senstive learning. Feature 'Class' is the response variable and it takes value 1 in case of fraud and 0 otherwise.

In [5]:
# Dataframes implement the Pandas API
import dask.dataframe as dd

# Load data into Dask dataframe
link = 'creditcard.csv'
df = dd.read_csv(link, dtype={'Time': 'float64'})
df.head()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,0.0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,...,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0
1,0.0,1.191857,0.266151,0.16648,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,...,-0.225775,-0.638672,0.101288,-0.339846,0.16717,0.125895,-0.008983,0.014724,2.69,0
2,1.0,-1.358354,-1.340163,1.773209,0.37978,-0.503198,1.800499,0.791461,0.247676,-1.514654,...,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0
3,1.0,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,...,-0.1083,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.5,0
4,2.0,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,...,-0.009431,0.798278,-0.137458,0.141267,-0.20601,0.502292,0.219422,0.215153,69.99,0


## 1. Train machine learning models from scikit-learn using Dask as the backend of joblib. 

>Use all of the variables except `Class` as the feature set.

>Let `Class`be the target variable.

In [6]:
# Target variable
Y = df["Class"]

# Feature set
X = df.drop(columns=["Class"])

# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2)

# If the data can fit into memory, persist to RAM for faster runtimes
X_train.persist()
X_test.persist()
y_train.persist()
y_test.persist()

Dask Series Structure:
npartitions=3
    int64
      ...
      ...
      ...
Name: Class, dtype: int64
Dask Name: split, 3 tasks

### Random Forest Classifier

In [7]:
t_start = time.clock()


## Input Parameters
param_grid = {
    'max_depth': [2, 6],
    'n_jobs': [-1],
}


## Instantiate model. Prform Cross Validation to tune params
model = GridSearchCV(
    estimator=RandomForestClassifier(),
    param_grid=param_grid,
    n_jobs=-1,
    verbose=10,
    cv=3,
    scoring='roc_auc',
)
# Parallelize model fitting by using Dask as the backend of joblib
with joblib.parallel_backend('dask'):
    model.fit(X_train.compute(), y_train.compute())


## Make predictions
pred_test = model.predict(X_test.values.compute())


## Get Elapsed Time
t = time.clock() - t_start

Fitting 3 folds for each of 2 candidates, totalling 6 fits


[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done   2 out of   6 | elapsed:  2.8min remaining:  5.6min
[Parallel(n_jobs=-1)]: Done   3 out of   6 | elapsed:  2.9min remaining:  2.9min
[Parallel(n_jobs=-1)]: Done   4 out of   6 | elapsed:  2.9min remaining:  1.5min
[Parallel(n_jobs=-1)]: Done   6 out of   6 | elapsed:  3.1min remaining:    0.0s
[Parallel(n_jobs=-1)]: Done   6 out of   6 | elapsed:  3.1min finished


In [8]:
print("** Random Forest Classifier **")
if t >= 60**2:
    print(f"Runtime: {t/60**2 :0.1f} hours")
elif t >= 60.0:
    print(f"Runtime: {t/60 :0.1f} minutes")
else:
  print(f"Runtime: {t :0.4f} seconds")
print('--', ' --'*10)
print(f"Best parameters: {model.best_params_}")
print(f"ROC AUC Test score: {roc_auc_score(pred_test, y_test.values.compute())}")

** Random Forest Classifier **
Runtime: 19.9182 seconds
--  -- -- -- -- -- -- -- -- -- --
Best parameters: {'max_depth': 2, 'n_jobs': -1}
ROC AUC Test score: 0.9137112592420225


### Logistic Regression

In [9]:
t_start = time.clock()


## Input Parameters
param_grid = {
    'max_iter': [100, 150],
    'n_jobs': [-1],
}


## Instantiate model. Prform Cross Validation to tune params
model = GridSearchCV(
    estimator=LogisticRegression(),
    param_grid=param_grid,
    n_jobs=-1,
    verbose=10,
    cv=3,
    scoring='roc_auc',
)
# Parallelize model fitting by using Dask as the backend of joblib
with joblib.parallel_backend('dask'):
    model.fit(X_train.compute(), y_train.compute())


## Make predictions
pred_test = model.predict(X_test.values.compute())


## Get Elapsed Time
t = time.clock() - t_start

Fitting 3 folds for each of 2 candidates, totalling 6 fits


[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done   2 out of   6 | elapsed:   23.9s remaining:   47.9s
[Parallel(n_jobs=-1)]: Done   3 out of   6 | elapsed:   26.1s remaining:   26.1s
[Parallel(n_jobs=-1)]: Done   4 out of   6 | elapsed:   28.9s remaining:   14.5s
[Parallel(n_jobs=-1)]: Done   6 out of   6 | elapsed:   32.9s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done   6 out of   6 | elapsed:   32.9s finished


In [10]:
print("** Logistic Regression **")
if t >= 60**2:
    print(f"Runtime: {t/60**2 :0.1f} hours")
elif t >= 60.0:
    print(f"Runtime: {t/60 :0.1f} minutes")
else:
  print(f"Runtime: {t :0.4f} seconds")
print('--', ' --'*10)
print(f"Best parameters: {model.best_params_}")
print(f"Test score: {roc_auc_score(pred_test, y_test.values.compute())}")

** Logistic Regression **
Runtime: 5.2078 seconds
--  -- -- -- -- -- -- -- -- -- --
Best parameters: {'max_iter': 150, 'n_jobs': -1}
Test score: 0.8731190882055628


## 2. Compare the results of the models.

Although the Random Forest Classifier had a longer runtime, it performed better when making predictions based on its ROC-AUC score.

In [11]:
client.close()

