# Dask

In [1]:
import kaggle
import zipfile
import dask.dataframe as dd
from dask.distributed import Client
from sklearn.metrics import r2_score
from sklearn.pipeline import Pipeline
from dask_ml.compose import ColumnTransformer
from dask_ml.preprocessing import OneHotEncoder
from sklearn.preprocessing import PowerTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import TimeSeriesSplit
from dask_ml.model_selection import RandomizedSearchCV



In [2]:
kaggle.api.authenticate()
kaggle.api.dataset_download_files('marklvl/bike-sharing-dataset', path='data', unzip=False)



In [3]:
zipper = zipfile.ZipFile('data/bike-sharing-dataset.zip', 'r')
zipper.extractall('data/')
zipper.close()

In [4]:
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:64265  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 17.18 GB


In [48]:
types = {
    'season':'category',
    'yr':'category', 
    'mnth': 'category',
    'holiday' :'bool',
    'weekday': 'category',
    'workingday':'bool',
    'weathersit':'category',
}

df = dd.read_csv('data/hour.csv', parse_dates=[1], dtype=types)

In [49]:
precipitation = dd.read_csv("https://gist.githubusercontent.com/akoury/6fb1897e44aec81cced8843b920bad78/raw/b1161d2c8989d013d6812b224f028587a327c86d/precipitation.csv", parse_dates=[1])
df = dd.merge(df, precipitation, how="left", on=["dteday", "hr"])
df['precipitation'] = df['precipitation'].mask(df['precipitation'].isnull(), 0).mask(df['precipitation'] > 0, 1).astype(bool)
df = df.drop(['dteday', 'casual', 'registered'], axis=1)
df['hr'] = df['hr'].astype('category')
df = df.categorize()
df.head()

Unnamed: 0,instant,season,yr,mnth,hr,holiday,weekday,workingday,weathersit,temp,atemp,hum,windspeed,cnt,precipitation
0,1,1,0,1,0,False,6,False,1,0.24,0.2879,0.81,0.0,16,True
1,2,1,0,1,1,False,6,False,1,0.22,0.2727,0.8,0.0,40,True
2,3,1,0,1,2,False,6,False,1,0.22,0.2727,0.8,0.0,32,True
3,4,1,0,1,3,False,6,False,1,0.24,0.2879,0.75,0.0,13,True
4,5,1,0,1,4,False,6,False,1,0.24,0.2879,0.75,0.0,1,True


In [50]:
df.dtypes

instant             int64
season           category
yr               category
mnth             category
hr               category
holiday              bool
weekday          category
workingday           bool
weathersit       category
temp              float64
atemp             float64
hum               float64
windspeed         float64
cnt                 int64
precipitation        bool
dtype: object

In [51]:
train_df = df.loc[:15211]
holdout = df.loc[15211:]

In [52]:
num_pipeline = Pipeline(
    [("power_transformer", PowerTransformer(method="yeo-johnson", standardize=True))]
)

categorical_pipeline = Pipeline(
    [("one_hot", OneHotEncoder())]
)

pipe = Pipeline([
    ("column_transformer", ColumnTransformer([
#         ("numerical_pipeline", num_pipeline, ["hum", "temp", "atemp", "windspeed"]),
        ("categorical_pipeline", categorical_pipeline, ["season", "yr", "mnth", "hr", "weekday", "weathersit"]),
    ], remainder="passthrough")),
    ("random_forest", RandomForestRegressor(n_estimators=100, random_state=1))   
])

In [53]:
grid = {
    "random_forest__max_depth": [100, 150],
    "random_forest__min_samples_leaf": [5, 10],
    "random_forest__min_samples_split": [10, 20],
    "random_forest__max_leaf_nodes": [None, 80],
}

gridpipe = RandomizedSearchCV(pipe, grid, n_iter=100, cv=TimeSeriesSplit(n_splits=4), scoring="r2", random_state = 1)

In [54]:
X = train_df.drop(['cnt'], axis=1)
y = train_df['cnt']
gridpipe.fit(X,y)



RandomizedSearchCV(cache_cv=True,
          cv=TimeSeriesSplit(max_train_size=None, n_splits=4),
          error_score='raise',
          estimator=Pipeline(memory=None,
     steps=[('column_transformer', ColumnTransformer(n_jobs=1, preserve_dataframe=True, remainder='passthrough',
         sparse_threshold=0.3, transformer_weights=None,
         transformers=[('categorical_pipeline', Pipeline(memory=None,
     steps=[('one_hot', OneHotEncoder(categorical_features=None, ...imators=100, n_jobs=None,
           oob_score=False, random_state=1, verbose=0, warm_start=False))]),
          iid=True, n_iter=100, n_jobs=-1,
          param_distributions={'random_forest__max_depth': [100, 150], 'random_forest__min_samples_leaf': [5, 10], 'random_forest__min_samples_split': [10, 20], 'random_forest__max_leaf_nodes': [None, 80]},
          random_state=1, refit=True, return_train_score='warn',
          scheduler=None, scoring=None)

In [55]:
print(gridpipe.best_params_)

{'random_forest__max_depth': 100, 'random_forest__max_leaf_nodes': None, 'random_forest__min_samples_leaf': 5, 'random_forest__min_samples_split': 10}


In [56]:
predicted = gridpipe.predict(holdout.drop(['cnt'], axis=1))
score = r2_score(holdout['cnt'].values.compute(), predicted)
score

0.8174114959306136

In [57]:
client.close()