# Machine Learning on Distributed Dask with SageMaker and Fargate

This notebook will demonstrate how to perform Machine Learning on Distributed Dask using SageMaker and Fargate.  We will demo how to connect to distributed dask fargate cluster, scale out dask worker nodes, perform EDA work on public newyork cab trip data sets. Then, we demonstrate how you can run regression algorithms and hyperparameters optimization on distributed dask cluster. Next, we will demonstrate how you can monitor the operational metrics of Dask Cluster that will be fronted by Network Load Balancer for accessing the Dask Cluster Status UI from internet. Finally, we will close with how to build your own python script container and run against the dask fargate cluster.  This notebook was inspired by customer use case where they were running dask on local computer for building regression models.   

## Setup required packages

In [None]:
#!conda update -y dask

In [None]:
#!conda install -y s3fs

In [None]:
#!conda install -c anaconda lz4 -y

In [None]:
#!conda install -c conda-forge dask-ml -y

In [None]:
#!conda install -c anaconda joblib -y

In [None]:
#!conda install scikit-learn=0.21 -y

In [None]:
#!conda update -y scikit-learn

## Connect to Dask Fargate Cluster.  You need to provision this cluster following the instructions from here https://github.com/rvvittal/aws-dask-sm-fargate

In [None]:
from dask.distributed import Client
#client = Client('Dask-Scheduler.local-dask:8786')
client = Client('127.0.0.1:8786')


## Scale out the number of dask workers as needed for your data science work

In [None]:
#!conda install -c conda-forge awscli -y

In [None]:
#!aws ecs update-service --service Dask-Workers --desired-count 20 --cluster Fargate-Dask-Cluster

## Restart the client after scale out operation

In [None]:
client.restart()

## Load dask dataframe with the trip data


## TODO: Introduction to Dask DataFrame

In [None]:
import s3fs
import dask.dataframe as dd
import boto3
import dask.distributed
#df = dd.read_csv('s3://octank-claims-web/public-data/yellow_tripdata_2018-01.csv', storage_options={'anon': False})
# df = dd.read_csv('s3://nyc-tlc/trip data/green_tripdata_2018-02.csv', storage_options={'anon': True})

In [None]:
df = dd.read_csv(
    's3://nyc-tlc/trip data/green_tripdata_2018-02.csv', storage_options={'anon': True},
    parse_dates=['lpep_pickup_datetime', 'lpep_dropoff_datetime'],
).sample(frac=0.1, replace=True)


In [None]:
df.head()

In [None]:
# load and count number of rows
len(df)

In [None]:
df.dtypes

## Persist multiple Dask collections into memory

In [None]:
df_persisted = client.persist(df)
print(df_persisted.head())

## Compute the mean trip distance grouped by the number of passengers

In [None]:
grouped_df = df.groupby(df_persisted.passenger_count).trip_distance.mean().compute()
print(grouped_df)

## Compute Max trip distance

In [None]:
max_trip_dist = df_persisted.trip_distance.max().compute()
print(max_trip_dist)

## Count the total trip distance and count for each vendor

In [None]:
%%time
df.groupby('VendorID').agg({'passenger_count':'count', 'trip_distance': 'sum'}).astype(int).reset_index()\
.rename(columns={'passenger_count':'Trip Count'}).compute()

## Count Missing Values for Each Feature

In [None]:
df.isna().sum().compute()

## Visual EDA  
- ref https://medium.com/datadriveninvestor/analyzing-big-data-with-dask-a05a8798da8c

In [None]:
##Selecting top 10 rides based on fare amount
most_paid_rides_dask = df[['PULocationID', 'fare_amount']].nlargest(10, "fare_amount")

In [None]:
##Visualizing most paid rides through Barplot
import matplotlib.pyplot as plt
most_paid_rides_dask.set_index('PULocationID',sorted=True).compute().plot(kind='barh',stacked=False, figsize=[10,8], legend=True)
#######
plt.title('Most Paid Rides')
plt.xlabel('Fare Amount')
plt.ylabel('PU LocationID')
plt.show()


In [None]:
##Visualizing trip distance through Barplot
import matplotlib.pyplot as plt
most_paid_rides_dask2 = df[['trip_distance', 'fare_amount']].nlargest(10, "trip_distance")
most_paid_rides_dask2.set_index('trip_distance',sorted=True).compute().plot(kind='bar', colormap='PiYG', stacked=False, figsize=[10,8], legend=True)
#######
plt.title('Fares by Distance')
plt.xlabel('Trip Distance')
plt.ylabel('Fare Amount')
plt.show()

## TODO: Regression modeling with Scikit Learn

In [None]:
df = dd.read_csv(
    's3://nyc-tlc/trip data/green_tripdata_2018-02.csv', storage_options={'anon': True},
    parse_dates=['lpep_pickup_datetime', 'lpep_dropoff_datetime'],
).sample(frac=0.1, replace=True)

In [None]:
df['trip_duration'] = (df['lpep_dropoff_datetime'] - df['lpep_pickup_datetime']).map(lambda x: x.total_seconds())

In [None]:
df.head()

In [None]:
import datetime
import numpy as np

df_y = np.log1p(df['trip_duration'])
df['pickup_weekday'] = df.lpep_pickup_datetime.dt.weekday
df['pickup_weekofyear'] = df.lpep_pickup_datetime.dt.weekofyear
df['pickup_hour'] = df.lpep_pickup_datetime.dt.hour
df['pickup_minute'] = df.lpep_pickup_datetime.dt.minute
df['pickup_year_seconds'] = (df.lpep_pickup_datetime - datetime.datetime(2019, 1, 1, 0, 0, 0)).dt.seconds
df['pickup_week_hour'] = (df.pickup_weekday * 24) + df.pickup_hour

In [None]:
df.head()

In [None]:

df = df.drop(['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'trip_duration','store_and_fwd_flag', 'extra','mta_tax','tip_amount','tolls_amount','ehail_fee','improvement_surcharge'], axis=1)


In [None]:
df.isna().sum().compute()

In [None]:
df_y.isna().sum().compute()

In [None]:
df.dtypes

In [None]:
from sklearn.metrics import mean_squared_error
from math import sqrt

In [None]:
dfp = df.compute()

In [None]:
dfp_y = df_y.compute()

In [None]:
#Splitting the data into Train and Validation set
from sklearn.model_selection import train_test_split 
xtrain, xtest, ytrain, ytest = train_test_split(dfp,dfp_y,test_size=1/3, random_state=0)

In [None]:
mean_pred = np.repeat(ytrain.mean(),len(ytest))

sqrt(mean_squared_error(ytest, mean_pred))

In [None]:
def cv_score(ml_model, rstate = 11,cols = dfp.columns):
    i = 1
    cv_scores = []
    df1 = dfp.copy()
    df1 = dfp[cols]
    
    kf = KFold(n_splits=5,random_state=rstate,shuffle=True)
    for train_index,test_index in kf.split(df1,dfp_y):
        print('\n{} of kfold {}'.format(i,kf.n_splits))
        xtr,xvl = df1.loc[train_index],df1.loc[test_index]
        ytr,yvl = dfp_y[train_index],dfp_y[test_index]

        model = ml_model
        model.fit(xtr, ytr)
        train_val = model.predict(xtr)
        pred_val = model.predict(xvl)
        rmse_score_train = sqrt(mean_squared_error(ytr, train_val))
        rmse_score = sqrt(mean_squared_error(yvl, pred_val))
        sufix = ""
        msg = ""
        #msg += "Train RMSE: {:.5f} ".format(rmse_score_train)
        msg += "Valid RMSE: {:.5f}".format(rmse_score)
        print("{}".format(msg))
        # Save scores
        cv_scores.append(rmse_score)
        i+=1
    return cv_scores

In [None]:
from sklearn.linear_model import LinearRegression 
from sklearn.model_selection import KFold
linreg_scores = cv_score(LinearRegression())

In [None]:
import datetime


taxi['pickup_weekday'] = taxi.lpep_pickup_datetime.dt.weekday
taxi['pickup_weekofyear'] = taxi.lpep_pickup_datetime.dt.weekofyear
taxi['pickup_hour'] = taxi.lpep_pickup_datetime.dt.hour
taxi['pickup_minute'] = taxi.lpep_pickup_datetime.dt.minute
taxi['pickup_year_seconds'] = (taxi.lpep_pickup_datetime - datetime.datetime(2019, 1, 1, 0, 0, 0)).dt.seconds
taxi['pickup_week_hour'] = (taxi.pickup_weekday * 24) + taxi.pickup_hour
taxi['passenger_count'] = taxi.passenger_count.astype(float).fillna(-1)
taxi = taxi.fillna(value={'VendorID': 'missing', 'RatecodeID': 'missing', 'store_and_fwd_flag': 'missing' })

# keep track of column names for pipeline steps
numeric_feat = ['pickup_weekday',  'pickup_weekofyear', 'pickup_hour', 'pickup_minute', 'pickup_year_seconds', 'pickup_week_hour',  'passenger_count']
categorical_feat = ['VendorID', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID']
features = numeric_feat + categorical_feat
y_col = 'total_amount'

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import GridSearchCV
import numpy as np

pipeline = Pipeline(steps=[
    ('preprocess', ColumnTransformer(transformers=[
        ('num', StandardScaler(), numeric_feat),
        ('cat', OneHotEncoder(handle_unknown='ignore', sparse=False), categorical_feat),
    ])),
    ('clf', ElasticNet(normalize=False, max_iter=10)),
])

# this is our grid
params = {
    'clf__l1_ratio': np.arange(0, 1.01, 0.01),
    'clf__alpha': [0, 0.5, 1, 2],
}

grid_search = GridSearchCV(pipeline, params, cv=3, n_jobs=-1, scoring='neg_mean_squared_error')
grid_search.fit(taxi[features], taxi[y_col])
print(grid_search.best_score_)


In [None]:
import numpy as np
# this is our grid
params = {
    'clf__l1_ratio': np.arange(0, 1.01, 0.01),
    'clf__alpha': [0, 0.5, 1, 2],
}


In [None]:
from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import GridSearchCV

from dask_ml.compose import ColumnTransformer
from dask_ml.preprocessing import StandardScaler, DummyEncoder, Categorizer
from dask_ml.model_selection import GridSearchCV

# Dask has slightly different way of one-hot encoding
pipeline = Pipeline(steps=[
    ('categorize', Categorizer(columns=categorical_feat)),
    ('onehot', DummyEncoder(columns=categorical_feat)),
    ('scale', ColumnTransformer(transformers=[('num', StandardScaler(), numeric_feat)])),
    ('clf', ElasticNet(normalize=False, max_iter=10)),
])

# params same as above
grid_search = GridSearchCV(pipeline, params, cv=3, scoring='neg_mean_squared_error')


In [None]:
from joblib import _dask, parallel_backend
from sklearn.utils import register_parallel_backend
from joblib import parallel_backend
register_parallel_backend('distributed',_dask.DaskDistributedBackend)
import numpy as np
from time import time

In [None]:
#start = time()
#with parallel_backend('distributed', scheduler_host='dask-Scheduler.local-dask:8786'):
#    grid_search.fit(taxi[features], taxi[y_col])

In [None]:
#grid_search.fit(taxi[features], taxi[y_col])
#print(grid_search.best_score_)

In [None]:
#print(grid_search.best_score_)

In [None]:
from pprint import pprint
from time import time
import logging

from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline


In [None]:
# Scale Up: set categories=None to use all the categories
categories = [
    'alt.atheism',
    'talk.religion.misc',
]

print("Loading 20 newsgroups dataset for categories:")
print(categories)

data = fetch_20newsgroups(subset='train', categories=categories)
print("%d documents" % len(data.filenames))
print("%d categories" % len(data.target_names))
print()

In [None]:
pipeline = Pipeline([
    ('vect', HashingVectorizer()),
    ('tfidf', TfidfTransformer()),
    ('clf', SGDClassifier(max_iter=1000)),
])


In [None]:
parameters = {
    'tfidf__use_idf': (True, False),
    'tfidf__norm': ('l1', 'l2'),
    'clf__alpha': (0.00001, 0.000001),
    # 'clf__penalty': ('l2', 'elasticnet'),
    # 'clf__n_iter': (10, 50, 80),
}


In [None]:
grid_search = GridSearchCV(pipeline, parameters, n_jobs=-1, verbose=1, cv=3, refit=False, iid=False)

In [None]:
grid_search.fit(data.data, data.target)

In [None]:
import joblib

with joblib.parallel_backend('dask'):
    grid_search.fit(data.data, data.target)

In [None]:
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier

from joblib import _dask, parallel_backend
from joblib import register_parallel_backend
from joblib import parallel_backend
register_parallel_backend('distributed',_dask.DaskDistributedBackend)
import numpy as np
from time import time

In [None]:
# Loading the Digits dataset
import joblib
digits = datasets.load_digits()
# To apply an classifier on this data, we need to flatten the image, to
# turn the data in a (samples, feature) matrix:
n_samples = len(digits.images)
X = digits.images.reshape((n_samples, -1))
y = digits.target
# Split the dataset in two equal parts
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=0)
clf = RandomForestClassifier(n_estimators=20)
# use a full grid over all parameters
param_grid = {"max_depth": [3,4,5,6, None],
              "max_features": [1, 3, 10, None],
              "min_samples_split": [2, 3, 10],
              "bootstrap": [True, False],
              "criterion": ["gini", "entropy"]}
# run grid search
grid_search = GridSearchCV(clf, param_grid=param_grid, cv=8, iid=True)
start = time()

with joblib.parallel_backend('dask'):
    grid_search.fit(X, y)
   # clf.fit(X, y)

#with parallel_backend('distributed', scheduler_host='dask-Scheduler.local-dask:8786'):
#    grid_search.fit(X, y)
#    clf.fit(X, y)

In [None]:
print("GridSearchCV took %.2f seconds for %d candidate parameter settings."
      % (time() - start, len(grid_search.cv_results_['params'])))
results = grid_search.cv_results_
    
# Return the index of the best validation score
idx = np.flatnonzero(results['rank_test_score'] == 1 )
print("The best score is: " + str(results['mean_test_score'][idx[0]]))
                     
#print the parameters for the best job      
print("Parameters: {0}".format(results['params'][idx[0]]))

In [None]:
print(results)

### create pipeline for regression model

In [None]:
from dask_ml.compose import ColumnTransformer
from dask_ml.preprocessing import StandardScaler, Categorizer
from dask_ml.model_selection import GridSearchCV

# Dask has slightly different way of one-hot encoding
pipeline = Pipeline(steps=[
    ('categorize', Categorizer(columns=categorical_feat)),
   # ('onehot', DummyEncoder(columns=categorical_feat)),
    ('scale', ColumnTransformer(transformers=[('num', StandardScaler(), numeric_feat)])),
    ('clf', ElasticNet(normalize=False, max_iter=100)),
])

# params same as above
grid_search = GridSearchCV(pipeline, params, cv=3, scoring='neg_mean_squared_error')



In [None]:
from time import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import QuantileTransformer
from sklearn.pipeline import make_pipeline

from sklearn.inspection import partial_dependence
from sklearn.inspection import plot_partial_dependence
from sklearn.experimental import enable_hist_gradient_boosting  # noqa
from sklearn.ensemble import HistGradientBoostingRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.datasets import fetch_california_housing


In [None]:
cal_housing = fetch_california_housing()
X = pd.DataFrame(cal_housing.data, columns=cal_housing.feature_names)
y = cal_housing.target

y -= y.mean()

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1,
                                                    random_state=0)


In [None]:
X_train.head()

In [None]:
cal_housing.target

## Linear Regression

In [None]:
df['trip_duration'] = (df['lpep_dropoff_datetime'] - df['lpep_pickup_datetime']).map(lambda x: x.total_seconds())

## Run your python script container for your machine learning work.  
### Make sure to follow the steps in github repo for building/deploying this container before running this step

In [None]:
!aws ecr get-login-password --region us-west-2 | docker login --username AWS --password-stdin 716664005094.dkr.ecr.us-west-2.amazonaws.com

In [None]:
!docker run -e s3url='s3://nyc-tlc/trip data/green_tripdata_2018-02.csv' -e schurl='tcp://Dask-Scheduler.local-dask:8786' 716664005094.dkr.ecr.us-west-2.amazonaws.com/daskclientapp:latest

## Scale in the Fargate cluster worker nodes after all work is done

In [None]:
!pip3 install --upgrade --user awscli

In [None]:
conda install -c conda-forge awscli -y

In [None]:
!aws ecs update-service --service Dask-Workers --desired-count 1 --cluster Fargate-Dask-Cluster