## Advanced Python - Individual Assignment
### Bike Sharing Analysis with Dask Structures
Areknaz Khaligian
May 19, 2019

### Introduction
#### Assingment Instructions

The objective is to rewrite the Bike Sharing analysis done in the Python for Statistical Programming subject using Dask data structures and ecosystem instead of plain pandas.

The maximum score of the assignment is 4 points and the grading will be as follows:

Creation of a git repository with a proper README, incremental commits, and some sort of automatic or programmatic download of the data before the analysis (1 point). Notice that the data should not be checked out in the repository. Including data files in git repositories is considered a bad practice.        

Use of dask.dataframe and distributed.Client for all the data manipulation (2 points). Remember that calling .compute() in a Dask DataFrame turns it into a pandas dataframe, which resides in RAM and loses the distributed advantages. The more Dask structures are used, the higher the grade.          

Use of Dask-ML for distributed training and model selection https://ml.dask.org/ (1 point). See below for inspiration.

### Import Libraries

In [1]:
import dask.dataframe as dd
from dask_ml.preprocessing import Categorizer, DummyEncoder, StandardScaler
from dask.distributed import Client
import dask_ml.model_selection as dcv

from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.externals.joblib import parallel_backend
from sklearn.pipeline import make_pipeline
from sklearn.pipeline import Pipeline

from scipy.stats import skew, pearsonr

### Read Data

In [2]:
# read csv from url
dataset = dd.read_csv('https://s3.eu-central-1.amazonaws.com/ie-mbd-advpython-ml-bikesharing-dask/hour.csv')

# print sample of the dataset
dataset.head()

Unnamed: 0,instant,dteday,season,yr,mnth,hr,holiday,weekday,workingday,weathersit,temp,atemp,hum,windspeed,casual,registered,cnt
0,1,2011-01-01,1,0,1,0,0,6,0,1,0.24,0.2879,0.81,0.0,3,13,16
1,2,2011-01-01,1,0,1,1,0,6,0,1,0.22,0.2727,0.8,0.0,8,32,40
2,3,2011-01-01,1,0,1,2,0,6,0,1,0.22,0.2727,0.8,0.0,5,27,32
3,4,2011-01-01,1,0,1,3,0,6,0,1,0.24,0.2879,0.75,0.0,3,10,13
4,5,2011-01-01,1,0,1,4,0,6,0,1,0.24,0.2879,0.75,0.0,0,1,1


### Data Cleaning

No null values to clean

In [3]:
sum(dataset.isnull().values.any())

0

Drop "instant", it is a unique ID number

In [4]:
dataset = dataset.drop('instant', axis =1)
dataset.head()

Unnamed: 0,dteday,season,yr,mnth,hr,holiday,weekday,workingday,weathersit,temp,atemp,hum,windspeed,casual,registered,cnt
0,2011-01-01,1,0,1,0,0,6,0,1,0.24,0.2879,0.81,0.0,3,13,16
1,2011-01-01,1,0,1,1,0,6,0,1,0.22,0.2727,0.8,0.0,8,32,40
2,2011-01-01,1,0,1,2,0,6,0,1,0.22,0.2727,0.8,0.0,5,27,32
3,2011-01-01,1,0,1,3,0,6,0,1,0.24,0.2879,0.75,0.0,3,10,13
4,2011-01-01,1,0,1,4,0,6,0,1,0.24,0.2879,0.75,0.0,0,1,1


Convert dteday to datetime datatype

In [5]:
dataset['dteday']=dataset['dteday'].astype('M8')
dataset['dteday'].head()

0   2011-01-01
1   2011-01-01
2   2011-01-01
3   2011-01-01
4   2011-01-01
Name: dteday, dtype: datetime64[ns]

Check skewness and range for temp, atemp, hum, windspeed, casual, registerd, cnt

(We see that only casual/registered/cnt are very skewed, but we will plan to scale all the variables before training, so this will solve the issue)

In [6]:
numerics = ['temp', 'atemp', 'hum', 'windspeed', 'casual', 'registered', 'cnt']
for n in numerics:
    print(n)
    print(skew(dataset[n]))
    print()

temp
-0.00602036366695605

atemp
-0.09042105336080838

hum
-0.1112775438226877

windspeed
0.5748555816221624

casual
2.4990211743609105

registered
1.5577697580511438

cnt
1.2773013463494975



Check correlations between variables

In [7]:
print("casual vs cnt:",pearsonr(dataset['casual'], dataset['cnt']))

print("registered vs cnt:", pearsonr(dataset['registered'], dataset['cnt']))

casual vs cnt: (0.6945640779749493, 0.0)
registered vs cnt: (0.9721507308642992, 0.0)


In [8]:
print("temp vs atemp:", pearsonr(dataset['temp'], dataset['atemp']))

print("temp vs cnt:", pearsonr(dataset['temp'], dataset['cnt']))

print("atemp vs cnt:", pearsonr(dataset['atemp'], dataset['cnt']))

temp vs atemp: (0.9876721390396487, 0.0)
temp vs cnt: (0.4047722757786586, 0.0)
atemp vs cnt: (0.40092930412663186, 0.0)


We only want to keep one of registerd or count, but since registered is more highly correlated with the target (cnt) we will drop casual

Also, because temp and atemp are so highly correlated we will drop atemp because it is slightly less correlated with the target (cnt)

In [9]:
dataset = dataset.drop(['casual','atemp'], axis =1)
dataset.head()

Unnamed: 0,dteday,season,yr,mnth,hr,holiday,weekday,workingday,weathersit,temp,hum,windspeed,registered,cnt
0,2011-01-01,1,0,1,0,0,6,0,1,0.24,0.81,0.0,13,16
1,2011-01-01,1,0,1,1,0,6,0,1,0.22,0.8,0.0,32,40
2,2011-01-01,1,0,1,2,0,6,0,1,0.22,0.8,0.0,27,32
3,2011-01-01,1,0,1,3,0,6,0,1,0.24,0.75,0.0,10,13
4,2011-01-01,1,0,1,4,0,6,0,1,0.24,0.75,0.0,1,1


### Basic Pipelines

In these pipeline we first convert the categorical variables to type category, then we dummy encode these variables and scale the numeric variables before running an Random Forest Regressor or Linear Regression.

In [10]:
# list of categorical variables
categories = ['season', 'yr', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit']

In [11]:
# set up pipelines

pipe_lin = make_pipeline(
    Categorizer(columns=categories),
    DummyEncoder(),
    StandardScaler(),
    LinearRegression(),
)

pipe_rf = make_pipeline(
    Categorizer(columns=categories),
    DummyEncoder(),
    StandardScaler(),
    RandomForestRegressor(),
)

In [12]:
# print pipleine steps
pipe_lin.steps

[('categorizer', Categorizer(categories=None,
        columns=['season', 'yr', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit'])),
 ('dummyencoder', DummyEncoder(columns=None, drop_first=False)),
 ('standardscaler', StandardScaler(copy=True, with_mean=True, with_std=True)),
 ('linearregression',
  LinearRegression(copy_X=True, fit_intercept=True, n_jobs=None,
           normalize=False))]

In [13]:
# print pipeline steps 
pipe_rf.steps

[('categorizer', Categorizer(categories=None,
        columns=['season', 'yr', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit'])),
 ('dummyencoder', DummyEncoder(columns=None, drop_first=False)),
 ('standardscaler', StandardScaler(copy=True, with_mean=True, with_std=True)),
 ('randomforestregressor',
  RandomForestRegressor(bootstrap=True, criterion='mse', max_depth=None,
             max_features='auto', max_leaf_nodes=None,
             min_impurity_decrease=0.0, min_impurity_split=None,
             min_samples_leaf=1, min_samples_split=2,
             min_weight_fraction_leaf=0.0, n_estimators='warn', n_jobs=None,
             oob_score=False, random_state=None, verbose=0, warm_start=False))]

### Pipeline with GridSearchCV 

In [14]:
# set up pipeline with labels to prepare for grid search
pipe_rf2 = Pipeline(
        [
        ("categorizer", Categorizer(columns=categories)),
        ("dummifier", DummyEncoder()),
        ("scaler", StandardScaler()),
        ("rf", RandomForestRegressor()),
    ]
)

# set up grid search
pipe_rf_cv = dcv.GridSearchCV(
    pipe_rf2,
    param_grid={
        "rf__max_features": [10,20,30,40,50,60], # tune max_features aka mtry
    },
)

In [15]:
# print summary
pipe_rf_cv

GridSearchCV(cache_cv=True, cv=None, error_score='raise',
       estimator=Pipeline(memory=None,
     steps=[('categorizer', Categorizer(categories=None,
      columns=['season', 'yr', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit'])), ('dummifier', DummyEncoder(columns=None, drop_first=False)), ('scaler', StandardScaler(copy=True, with_mean=True, with_std=True)), ('rf', RandomForest...s='warn', n_jobs=None,
           oob_score=False, random_state=None, verbose=0, warm_start=False))]),
       iid=True, n_jobs=-1,
       param_grid={'rf__max_features': [10, 20, 30, 40, 50, 60]},
       refit=True, return_train_score='warn', scheduler=None, scoring=None)

In [16]:
# drop dteday since we already know the index at which to do the train/test split
dataset = dataset.drop('dteday', axis=1)

# set y, target variable
y = dataset['cnt']

# set the rest of the dataset to be X, the features
X = dataset.drop('cnt',axis=1)

### Train Models

#### Train Test Split

In [17]:
# train test split
y_train = y.loc[:17117]
y_test = y.loc[17118:]
X_train = X.loc[:17117,:]
X_test = X.loc[17118:,:]

#### Start Client

In [18]:
client = Client()  # Connect to a Dask Cluster
client

0,1
Client  Scheduler: tcp://127.0.0.1:57641  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 8.23 GB


#### Train models with parallel backend
Here we can see that the Random Forest Model performs better than the Linear Model, both in terms of overall score and ability to generalize (the difference between the train and test score is lower).          

When running a grid search cv for the random forest model, max_features = 60 is the optimal parameter setting and greatly improves the model accuracy.  Now the train and test score are nearly identical, and higher than the original Random Forest test-score which means the model is more accurate and is better able to generalize.

In [19]:
# connect to dask, train models, and print accuracy scores

with parallel_backend("dask"):

    pipe_lin.fit(X_train, y_train)
    print("lin_train score:", pipe_lin.score(X_train, y_train))
    print("lin_test_score:", pipe_lin.score(X_test, y_test))

    pipe_rf.fit(X_train, y_train)
    print("rf_train score:", pipe_rf.score(X_train, y_train))
    print("rf_test_score:", pipe_rf.score(X_test, y_test))
    
    pipe_rf_cv.fit(X_train.compute(), y_train.compute())
    print("rf_cv_train score:", pipe_rf_cv.score(X_train.compute(), y_train.compute()))
    print("rf_cv_test_score:", pipe_rf_cv.score(X_test.compute(), y_test.compute()))

lin_train score: 0.9741143724760584
lin_test_score: 0.8957396182956116




rf_train score: 0.9980570487042165
rf_test_score: 0.9623662633345436
rf_cv_train score: 0.9999109288173633
rf_cv_test_score: 0.9947093551312931


In [20]:
# print best parameters from grid search
pipe_rf_cv.best_params_

{'rf__max_features': 60}

In [21]:
# disconnect from client
client.close()