In [1]:
#!pip install dask-ml

In [2]:
import seaborn as sns
import matplotlib.pyplot as plt

In [3]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import cross_validate, GridSearchCV
from sklearn.metrics import roc_auc_score, r2_score
import joblib
from dask_ml.model_selection import train_test_split
import pandas as pd
import warnings

warnings.filterwarnings("ignore")
from dask import delayed


In [4]:
import warnings
warnings.filterwarnings("ignore")

from dask.distributed import Client, progress

client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:52542  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 8.00 GB


In [5]:
import dask.dataframe as dd
df = dd.read_csv("https://raw.githubusercontent.com/stedy/Machine-Learning-with-R-datasets/master/insurance.csv")
df.head()

Unnamed: 0,age,sex,bmi,children,smoker,region,charges
0,19,female,27.9,0,yes,southwest,16884.924
1,18,male,33.77,1,no,southeast,1725.5523
2,28,male,33.0,3,no,southeast,4449.462
3,33,male,22.705,0,no,northwest,21984.47061
4,32,male,28.88,0,no,northwest,3866.8552


In [6]:
df.info

<bound method DataFrame.info of Dask DataFrame Structure:
                 age     sex      bmi children  smoker  region  charges
npartitions=1                                                          
               int64  object  float64    int64  object  object  float64
                 ...     ...      ...      ...     ...     ...      ...
Dask Name: read-csv, 1 tasks>

In [7]:
df = df.drop(columns=['region'])

In [8]:
#mean and count
df.describe().compute()

Unnamed: 0,age,bmi,children,charges
count,1338.0,1338.0,1338.0,1338.0
mean,39.207025,30.663397,1.094918,13270.422265
std,14.04996,6.098187,1.205493,12110.011237
min,18.0,15.96,0.0,1121.8739
25%,27.0,26.29625,0.0,4740.28715
50%,39.0,30.4,1.0,9382.033
75%,51.0,34.69375,2.0,16639.912515
max,64.0,53.13,5.0,63770.42801


In [9]:
#checking null values
df.isna().sum(axis=0).compute()

age         0
sex         0
bmi         0
children    0
smoker      0
charges     0
dtype: int64

In [10]:
# Converting binary columns to 1s and 0s
df["smoker"] = (df["smoker"] == "yes").astype(int)
df["sex"] = (df["sex"] == "male").astype(int)

In [11]:
 df['sex']

Dask Series Structure:
npartitions=1
    int32
      ...
Name: sex, dtype: int32
Dask Name: getitem, 11 tasks

In [12]:
## This is our feature set
X = df[['smoker', 'sex', 'age', 'bmi']]

# This is our target variable
y = df['charges']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2)

# Since our data can fit into memory
# we persist them to the RAM.
X_train.persist()
X_test.persist()
y_train.persist()
y_test.persist()

Dask Series Structure:
npartitions=1
    float64
        ...
Name: charges, dtype: float64
Dask Name: split, 1 tasks

In [13]:
#doing four fold cross-validation in this code.
#training four models and evaluating them on a different hold-out group

rf_model = RandomForestRegressor()

with joblib.parallel_backend('dask'):
    scores = cross_validate(rf_model, X_train.compute(), y_train.compute(), cv=4)
    
scores

{'fit_time': array([ 4.3981483 , 11.0117619 , 11.11451197,  4.53482032]),
 'score_time': array([0.11783481, 1.52273703, 1.56340218, 0.20407391]),
 'test_score': array([0.85429773, 0.80832901, 0.87432206, 0.81126294])}

In [15]:
#creating a GridSearchCV object to tune the max_depth hyperparameter of the random forest classifier

rf_params = {"max_depth": [2, 4, 8, 16]}

rf_model = RandomForestRegressor()

grid_search_rf = GridSearchCV(rf_model,
                           param_grid=rf_params,
                           return_train_score=True,
                           iid=True,
                           cv=4, 
                           scoring='r2')

In [16]:
df.dtypes

age           int64
sex           int32
bmi         float64
children      int64
smoker        int32
charges     float64
dtype: object

In [17]:
#from dask_ml import preprocessing
#df = preprocessing.LabelEncoder()
#df.fit([1, 2, 2, 6])

In [18]:
#bmi = df['bmi']
#bmi = int(bmi)

In [19]:
#fit_transform(y)

In [20]:
y_train.compute()

0       16884.9240
1        1725.5523
4        3866.8552
6        8240.5896
8        6406.4107
           ...    
1332    11411.6850
1333    10600.5483
1334     2205.9808
1335     1629.8335
1336     2007.9450
Name: charges, Length: 1073, dtype: float64

In [21]:
X_train.compute()

Unnamed: 0,smoker,sex,age,bmi
0,1,0,19,27.90
1,0,1,18,33.77
4,0,1,32,28.88
6,0,0,46,33.44
8,0,1,37,29.83
...,...,...,...,...
1332,0,0,52,44.70
1333,0,1,50,30.97
1334,0,0,18,31.92
1335,0,0,18,36.85


In [24]:
with joblib.parallel_backend('dask'):
    grid_search_rf.fit(X_train.compute(), y_train.compute())

In [25]:
print("The best value is: ", grid_search_rf.best_params_)
print("The test AUC score is: ", grid_search_rf.score(X_test.compute(), y_test.compute()))

The best value is:  {'max_depth': 4}
The test AUC score is:  0.8026660292846033


In [26]:
from dask_ml.linear_model import LogisticRegression

In [27]:
lr = LogisticRegression()
lr.fit(X_train.values.compute(), y_train.values.compute())

LogisticRegression()

In [28]:
preds_train = lr.predict(X_train.values.compute())
preds_test = lr.predict(X_test.values.compute())

print("Training score is: ", roc_auc_score(preds_train, y_train.values.compute()))
print("Test score is: ", roc_auc_score(preds_test, y_test.values.compute()))

Training score is:  0.9271390374331552
Test score is:  0.9163498098859315


In [30]:
client.close()