<a href="https://colab.research.google.com/github/AlexBB999/Thinkful/blob/master/31_5_Dask.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**ASSIGNMENT ANSWERS AT BOTTOM**

##**Assignments**

In the following task, you'll continue working with the **Credit Card Fraud Detection dataset from Kaggle**. Before moving on to the tasks, you should load the dataset using Dask.

To complete this assignment, create a Jupyter notebook containing your solutions to the following tasks and submit as a link on Github.

**In this task, you'll train several machine learning models from scikit-learn using Dask as the backend of joblib**. 

This time, you need to use all the variables except Class as your feature set.

Class variable will be your target variable.

Compare the results of your models.

In [3]:
!pip install --upgrade "dask[complete]"

Requirement already up-to-date: dask[complete] in /usr/local/lib/python3.6/dist-packages (2.16.0)


In [4]:
pip install -U ipykernel

Requirement already up-to-date: ipykernel in /usr/local/lib/python3.6/dist-packages (5.2.1)


In [3]:
!pip install dask-ml



In [1]:
import warnings
warnings.filterwarnings("ignore")

from dask.distributed import Client, progress

client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:43845  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 8.00 GB


###**Parallelism in scikit-learn**
Scikit-learn library already offers a parallelization capability for some of its models. In particular, if you see n_jobs parameter in the documentation of a model, then it implies that you can set that parameter and scikit-learn dispatches the job to the cores of your computer.

 Scikit-learn does this using a library called **joblib**.

**However, this parallelization in Scikit-learn is restricted to a single machine**. 

Dask extends this and brings parallelization over a cluster.

As we'll see in this checkpoint, **if we use Dask as the backend to the joblib library, we can run Scikit-learn models over many machines in parallel**.

However as we said before, some models in Scikit-learn can't be trained on very large training data that doesn't fit into memory. For those situations, we'll talk about Dask's own machine learning module later in this checkpoint.



In [0]:
# Dataframes implement the Pandas API
import dask.dataframe as dd

# This loads the data into Dask dataframe
df = dd.read_csv('https://tf-assets-prod.s3.amazonaws.com/tf-curric/data-science/creditcard.csv', dtype={'Time': 'float64'})

In [3]:
df.head()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,V10,V11,V12,V13,V14,V15,V16,V17,V18,V19,V20,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,0.0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,0.090794,-0.5516,-0.617801,-0.99139,-0.311169,1.468177,-0.470401,0.207971,0.025791,0.403993,0.251412,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0
1,0.0,1.191857,0.266151,0.16648,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,-0.166974,1.612727,1.065235,0.489095,-0.143772,0.635558,0.463917,-0.114805,-0.183361,-0.145783,-0.069083,-0.225775,-0.638672,0.101288,-0.339846,0.16717,0.125895,-0.008983,0.014724,2.69,0
2,1.0,-1.358354,-1.340163,1.773209,0.37978,-0.503198,1.800499,0.791461,0.247676,-1.514654,0.207643,0.624501,0.066084,0.717293,-0.165946,2.345865,-2.890083,1.109969,-0.121359,-2.261857,0.52498,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0
3,1.0,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,-0.054952,-0.226487,0.178228,0.507757,-0.287924,-0.631418,-1.059647,-0.684093,1.965775,-1.232622,-0.208038,-0.1083,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.5,0
4,2.0,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,0.753074,-0.822843,0.538196,1.345852,-1.11967,0.175121,-0.451449,-0.237033,-0.038195,0.803487,0.408542,-0.009431,0.798278,-0.137458,0.141267,-0.20601,0.502292,0.219422,0.215153,69.99,0


###**Distributing the training**

**NOTE: DASK TRAIN_TEST_SPLIT**

In [0]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_validate, GridSearchCV
from sklearn.metrics import roc_auc_score
import joblib
from dask_ml.model_selection import train_test_split
import pandas as pd
import warnings

warnings.filterwarnings("ignore")

In [5]:
# This is our feature set
X = df[["V1", "V2", "V3", "Amount"]]

# This is our target variable
Y = df["Class"]

X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2)

# Since our data can fit into memory
# we persist them to the RAM.
X_train.persist()
X_test.persist()
y_train.persist()
y_test.persist()

Dask Series Structure:
npartitions=3
    int64
      ...
      ...
      ...
Name: Class, dtype: int64
Dask Name: split, 3 tasks

When we use Dask as the backend of the joblib library, we need to put our machine learning code inside the context manager of joblib.

That is, we need to put the code we want to parallelize inside the following with statement:

**with joblib.parallel_backend('dask'):**

    Scikit-learn code here

**Note that, we do four fold cross-validation in this code**. 

**So, we actually train four models and evaluate them on a different hold-out group**.

This alone is something that lays itself parallelization quite well.

That being said, Dask can also parallelize the training of a single random forest classifier.

In [6]:
rf_model = RandomForestClassifier()

with joblib.parallel_backend('dask'):
    scores = cross_validate(rf_model, X_train.compute(), y_train.compute(), cv=4)
    
scores

{'fit_time': array([197.90318394, 196.47006369, 196.38466454, 197.82622719]),
 'score_time': array([0.        , 1.54131079, 1.39354539, 1.9516499 ]),
 'test_score': array([       nan, 0.998472  , 0.99868275, 0.99898133])}

**Next, we move one step further and create a GridSearchCV object to tune the max_depth hyperparameter of the random forest classifier**:

In [0]:
# Random forest classifier
rf_params = {"max_depth": [2, 4, 8, 16]}

rf_model = RandomForestClassifier()

grid_search_rf = GridSearchCV(rf_model,
                           param_grid=rf_params,
                           return_train_score=True,
                           iid=True,
                           cv=4,
                           n_jobs=-1, 
                           scoring='roc_auc')

As you can recall, when we do **grid search with scikit-learn**, we fit the model on the data as follows:

**grid_search.fit(X, y)**

**However, this time we want to distribute the training and cross validation over a cluster with Dask**.

In order to do this, we need to use the **context manager** provided by joblib library using the **with statement of Python**.

Now, let's run the grid search over the hyperparameters of the random forest model:

In [0]:
with joblib.parallel_backend('dask'):
    grid_search_rf.fit(X_train.compute(), y_train.compute())

**We trained 16 different combinations of the model with four different values of the max_depth parameter and with four fold cross validation.**

Now, let's find out the best value for the max_depth parameter and get the AUC score on the test set:

In [9]:
print("The best value is: ", grid_search_rf.best_params_)
print("The test AUC score is: ", grid_search_rf.score(X_test.compute(), y_test.compute()))

The best value is:  {'max_depth': 8}
The test AUC score is:  0.930058326716623


###**Training on Large Datasets**



**The model we'll use is the logistic regression from Dask-ml's linear_model module.**

**We start by importing the estimator**:

In [0]:
from dask_ml.linear_model import LogisticRegression

Next, we create the logistic regression model and fit it to the training data.

Note that when calling the .fit() function, 

**we converted Dask dataframes X_train and y_train to Dask arrays by calling their .values attributes**.

This is because, right now

**Dask-ml's glm based estimators just work with Dask arrays**.

In [11]:
lr = LogisticRegression()
lr.fit(X_train.values.compute(), y_train.values.compute())

LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
                   intercept_scaling=1.0, max_iter=100, multi_class='ovr',
                   n_jobs=1, penalty='l2', random_state=None, solver='admm',
                   solver_kwargs=None, tol=0.0001, verbose=0, warm_start=False)

**Now, let's get the training and test scores:**

In [12]:
preds_train = lr.predict(X_train.values.compute())
preds_test = lr.predict(X_test.values.compute())

print("Training score is: ", roc_auc_score(preds_train, y_train.values.compute()))
print("Test score is: ", roc_auc_score(preds_test, y_test.values.compute()))

Training score is:  0.83967717034536
Test score is:  0.7908162967119029


In [0]:
df = dd.read_csv('https://tf-assets-prod.s3.amazonaws.com/tf-curric/data-science/creditcard.csv', dtype={'Time': 'float64'})

In [14]:
df.head()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,V10,V11,V12,V13,V14,V15,V16,V17,V18,V19,V20,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,0.0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,0.090794,-0.5516,-0.617801,-0.99139,-0.311169,1.468177,-0.470401,0.207971,0.025791,0.403993,0.251412,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0
1,0.0,1.191857,0.266151,0.16648,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,-0.166974,1.612727,1.065235,0.489095,-0.143772,0.635558,0.463917,-0.114805,-0.183361,-0.145783,-0.069083,-0.225775,-0.638672,0.101288,-0.339846,0.16717,0.125895,-0.008983,0.014724,2.69,0
2,1.0,-1.358354,-1.340163,1.773209,0.37978,-0.503198,1.800499,0.791461,0.247676,-1.514654,0.207643,0.624501,0.066084,0.717293,-0.165946,2.345865,-2.890083,1.109969,-0.121359,-2.261857,0.52498,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0
3,1.0,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,-0.054952,-0.226487,0.178228,0.507757,-0.287924,-0.631418,-1.059647,-0.684093,1.965775,-1.232622,-0.208038,-0.1083,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.5,0
4,2.0,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,0.753074,-0.822843,0.538196,1.345852,-1.11967,0.175121,-0.451449,-0.237033,-0.038195,0.803487,0.408542,-0.009431,0.798278,-0.137458,0.141267,-0.20601,0.502292,0.219422,0.215153,69.99,0


**RANDOM FOREST**

In [0]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_validate, GridSearchCV
from sklearn.metrics import roc_auc_score
import joblib
from dask_ml.model_selection import train_test_split
import pandas as pd
import warnings

warnings.filterwarnings("ignore")

In [28]:
# This is our feature set
X = df.drop('Class',axis=1)

# This is our target variable
Y = df["Class"]

X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2)

# Since our data can fit into memory
# we persist them to the RAM.
X_train.persist()
X_test.persist()
y_train.persist()
y_test.persist()

Dask Series Structure:
npartitions=3
    int64
      ...
      ...
      ...
Name: Class, dtype: int64
Dask Name: split, 3 tasks

In [29]:
rf_model = RandomForestClassifier()

with joblib.parallel_backend('dask'):
    scores = cross_validate(rf_model, X_train.compute(), y_train.compute(), cv=4)
    
scores

{'fit_time': array([593.12752938, 547.49490547, 598.46912408, 576.99013519]),
 'score_time': array([0.        , 1.3215363 , 0.63125014, 0.95480514]),
 'test_score': array([       nan, 0.99945701, 0.99943949, 0.99929936])}

In [0]:
# Random forest classifier
rf_params = {"max_depth": [2, 4, 8, 16]}

rf_model = RandomForestClassifier()

grid_search_rf = GridSearchCV(rf_model,
                           param_grid=rf_params,
                           return_train_score=True,
                           iid=True,
                           cv=4,
                           n_jobs=-1, 
                           scoring='roc_auc')

In [0]:
with joblib.parallel_backend('dask'):
    grid_search_rf.fit(X_train.compute(), y_train.compute())

In [32]:
print("The best value is: ", grid_search_rf.best_params_)
print("The test AUC score is: ", grid_search_rf.score(X_test.compute(), y_test.compute()))

The best value is:  {'max_depth': 4}
The test AUC score is:  0.9489093476524033


**LOGISTIC REGRESSION**

In [0]:
from dask_ml.linear_model import LogisticRegression

In [35]:
lr = LogisticRegression()
lr.fit(X_train.values.compute(), y_train.values.compute())

LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
                   intercept_scaling=1.0, max_iter=100, multi_class='ovr',
                   n_jobs=1, penalty='l2', random_state=None, solver='admm',
                   solver_kwargs=None, tol=0.0001, verbose=0, warm_start=False)

In [0]:
preds_train = lr.predict(X_train.values.compute())
preds_test = lr.predict(X_test.values.compute())


In [37]:

print("Training score is: ", roc_auc_score(preds_train, y_train.values.compute()))
print("Test score is: ", roc_auc_score(preds_test, y_test.values.compute()))

Training score is:  0.926896844690688
Test score is:  0.983719038046814
