# Machine Learning on Distributed Dask with SageMaker and Fargate

This notebook will demonstrate how to perform Machine Learning on Distributed Dask using SageMaker Notebook and Fargate Cluster.  We will demo how to connect to distributed dask fargate cluster, scale out dask worker nodes, perform exploratory data analysis work on public newyork cab trip data sets located in public S3 bucket that has hundreds of millions of taxi trips. We will demonstrate some key APIs of dask dataframe that will help with big data processing. We will also show how you can monitor the dask cluster as you run various operations against the dask cluster. Finally, we demonstrate how you can run regression algorithms  on distributed dask cluster.

Before executing any steps in the notebook verify the following:
Navigate to Amazon ECS > Clusters and ensure Fargate-Dask-Cluster is running with 1 task each for Dask-Scheduler and Dask-Workers.

# 1. Setup conda packages dependencies
We need additional conda packages and newer version of a few existing packages for running distributed dask on SageMaker notebook and fargate cluster. LifeCycle config is setup to create a new conda environment named conda_daskpy36. Select that conda environment for running this notebook 

scikit-learn version 0.23 is required for integrating its joblib with dask for distrbuted dask cluster level processing.

dask-ml provides scalable machine learning in Python using Dask alongside popular machine learning libraries like Scikit-Learn, XGBoost, and others.

cloudpickle 1.6.0 is required to serialize Python constructs not supported by the default pickle module from the Python standard library 

In [None]:
%%time
!conda install scikit-learn=0.23.2 -c conda-forge -n python3 -y

In [None]:
%%time
!conda install -n python3 dask-ml=1.6.0 -c conda-forge -y

In [None]:
%%time
!conda install cloudpickle=1.6.0 -c conda-forge  -y

# 2. Setup Dask Client

In [None]:
from dask.distributed import Client

#enable this client for local device testing
#client = Client()

#enable this client for local distributed cluster testing 
#client = Client('localhost:8786')

#enable this client for fargate distributed cluster testing
client = Client('Dask-Scheduler.local-dask:8786')


## Scale out the number of dask workers as needed for your data science work

In [None]:
#enable this  when cluster is running on Fargate to scale out your cluster. 
!sudo aws ecs update-service --service Dask-Workers --desired-count 20 --cluster Fargate-Dask-Cluster

## Restart the client after scale out operation

Wait for ECS to scale out the cluster before running this step. Typically, takes about 5 minutes.  

In [None]:
client.restart()

# 3. Exploratory Data Analysis(EDA)

We will be using Dask Dataframe and perform various operations on the dataframe for data analysis.

A Dask DataFrame is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along the index. These Pandas DataFrames may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster. One Dask DataFrame operation triggers many operations on the constituent Pandas DataFrames. For more details, review this page: https://docs.dask.org/en/latest/dataframe.html 



In [None]:
import s3fs
import dask.dataframe as dd


In [None]:
df = dd.read_csv(
    's3://nyc-tlc/trip data/yellow_tripdata_2018-*.csv', storage_options={'anon': True}, parse_dates=['tpep_pickup_datetime','tpep_dropoff_datetime']
)

##  Calculate the trip duration in seconds 

In [None]:
df['trip_dur_secs'] = (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.seconds

In [None]:
%%time
df.head()

## Calculate max trip duration across all trips

In [None]:
%%time
max_trip_duration = df.trip_dur_secs.max().compute()
print(max_trip_duration)

## Calculate total mean for passengers across trips  by pickup date

In [None]:
# df['date_only'] = df['date_time_column'].dt.date
df['pickup_date'] = df['tpep_dropoff_datetime'].dt.date

In [None]:
%%time
df.head()

In [None]:
%%time
df_mean_psngr_pickup_date = df.groupby('pickup_date').passenger_count.mean().compute()

## Calculate total trips by pickup date

In [None]:
%%time
df_trips_by_pickup_date = df.groupby('pickup_date').store_and_fwd_flag.count().compute()

In [None]:
len(df_trips_by_pickup_date)

In [None]:
df_trips_by_pickup_date.head()

In [None]:
# load and count number of rows
len(df)

In [None]:
df.dtypes

## Persist  collections into memory
Calls to Client.compute or Client.persist submit task graphs to the cluster and return Future objects that point to particular output tasks. Compute returns a single future per input, persist returns a copy of the collection with each block or partition replaced by a single future. In short, use persist to keep full collection on the cluster and use compute when you want a small result as a single future.


In [None]:
from dask.distributed import Client, progress


In [None]:
%%time
df_persisted = client.persist(df)
print(df_persisted.head())

## Compute the mean trip distance grouped by the number of passengers

In [None]:
%%time
grouped_df = df.groupby(df_persisted.passenger_count).trip_distance.mean().compute()
print(grouped_df)

## Compute Max trip distance

In [None]:
%%time
max_trip_dist = df_persisted.trip_distance.max().compute()
print(max_trip_dist)

## Count the total trip distance and count for each vendor

In [None]:
%%time
df.groupby('VendorID').agg({'passenger_count':'count', 'trip_distance': 'sum'}).astype(int).reset_index()\
.rename(columns={'passenger_count':'Trip Count'}).compute()

## Count Missing Values for Each Feature

In [None]:
df.isna().sum().compute()

## Visualize your Exploratory Data Analysis

In this section will demonstrate how to perform  Visual exploratory data analysis

In [None]:
##Selecting top 10 rides based on fare amount
most_paid_rides_dask = df[['PULocationID', 'fare_amount']].nlargest(10, "fare_amount")

In [None]:
##Visualizing most paid rides through Barplot
import matplotlib.pyplot as plt
most_paid_rides_dask.set_index('PULocationID',sorted=True).compute().plot(kind='barh',stacked=False, figsize=[10,8], legend=True)
#######
plt.title('Most Paid Rides')
plt.xlabel('Fare Amount')
plt.ylabel('PU LocationID')
plt.show()


In [None]:
##Visualizing trip distance through Barplot
import matplotlib.pyplot as plt
most_paid_rides_dask2 = df[['trip_distance', 'fare_amount']].nlargest(10, "trip_distance")
most_paid_rides_dask2.set_index('trip_distance',sorted=True).compute().plot(kind='bar', colormap='PiYG', stacked=False, figsize=[10,8], legend=True)
#######
plt.title('Fares by Distance')
plt.xlabel('Trip Distance')
plt.ylabel('Fare Amount')
plt.show()

# 4. Regression modeling with  Scikit Learn and Distributed Dask

This section will demonstrate how to perform regression modeling using Scikit learn on Distributed Dask back-end. We will continue to the Newyork taxi trips dataset but now predict the duration of the trip using linear regression.

Many Scikit-Learn algorithms are written for parallel execution using Joblib, which natively provides thread-based and process-based parallelism. Joblib is what backs the n_jobs= parameter in normal use of Scikit-Learn. Dask can scale these Joblib-backed algorithms out to a cluster of machines by providing an alternative Joblib backend. 


In [None]:
dfl = dd.read_csv(
    's3://nyc-tlc/trip data/green_tripdata_2018-02.csv', storage_options={'anon': True},
    parse_dates=['lpep_pickup_datetime', 'lpep_dropoff_datetime'],
).sample(frac=0.8, replace=True)

### Preprocess data
1. Derive trip duration
2. Fill NaN value with zeros 
3. One-hot encode categorical variables

In [None]:
dfl['trip_duration'] = dfl['lpep_dropoff_datetime'] - dfl['lpep_pickup_datetime']

In [None]:
import numpy as np
dfl['trip_duration'] = dfl['trip_duration']/np.timedelta64(1,'D')
dfl['trip_duration'] = dfl['trip_duration'] * 24
dfl['trip_duration']

In [None]:
len(dfl)

In [None]:
dfl.head()

In [None]:
dfl = dfl.fillna(value=0)

In [None]:
dfl = dd.get_dummies(dfl.categorize()).compute()

In [None]:
dfl.head()

In [None]:
x = dfl[['VendorID','RatecodeID','PULocationID','DOLocationID','passenger_count','trip_distance','fare_amount','total_amount']]

In [None]:
y = dfl['trip_duration']

### Split data for training and testing

In [None]:
from dask_ml.model_selection import train_test_split

In [None]:
X_train, X_test, y_train, y_test = train_test_split(x, y, random_state=1)

In [None]:
len(X_train), len(X_test), len(y_train), len(y_test)

In [None]:
training_x = X_train.values
training_y = y_train.values

In [None]:
testing_x = X_test.values
testing_y = y_test.values

In [None]:
import numpy as np
from sklearn.metrics import mean_squared_error

In [None]:
def rmse(preds, actuals):
    error = mean_squared_error(actuals, preds)
    rmse = np.sqrt(error)
    print(rmse)

### Run Linear Regression model training on distributed dask cluster

In [None]:
import joblib
from dask_ml.linear_model import LinearRegression

with joblib.parallel_backend('dask'):
    lr = LinearRegression(random_state=1, fit_intercept=True)
    lr.fit(training_x,training_y)

### Run prediction on the trained model

In [None]:
preds = lr.predict(testing_x)
preds

# 5. Scale in the Fargate cluster worker nodes after all work is done

In [None]:
!sudo aws ecs update-service --service Dask-Workers --desired-count 1 --cluster Fargate-Dask-Cluster