# Dask ML

[Dask-ML](https://ml.dask.org/) provides scalable machine learning in Python using Dask alongside popular machine learning libraries like Scikit-Learn, XGBoost, and others.

<img src="../images/dimensions_of_scale.svg" width="500" height="500" style="border-style: solid;">

### Problem Definition

There are two kinds of **scaling** challenges faced during training and prediction of Machine Learning models:

- **Scaling Model Size (CPU Bound)**: If data fits in memory, but model becomes large and/or complex, training/evaluation/scoring may take too much of time. This is a CPU Bound problem. To solve this, additional processors/cores need to be added to the environment. This could happen for large ensemble of many models, large number of hyperparameter combinations etc.

- **Scaling Data Size (Memory Bound)**: If data used for training or prediction doesn't fit into memory, pandas or numpy can't be used. This is a memory bound problem.

### How to Solve?

**Scaling Model Size (CPU Bound)**:
- Try for simpler model
- Add parallelization: Buy more CPU (Very much feasible)
- Add parallelization: **Distribute the load to multiple machines using Dask**

**Scaling Data Size (Memory Bound)**:
- Drop features. May be all data/features are not needed.
- Buy a bigger machine
- Use different data structures/algorithms. `dask-ml` has implemented estimators that works on `dask` array and dataframe. `dask` array and dataframe works with larger than memory data using out of core learning. This is applicable for [linear models](https://ml.dask.org/glm.html), [pre-processing](https://ml.dask.org/preprocessing.html), [clustering](https://ml.dask.org/clustering.html), [ensemble](https://ml.dask.org/modules/api.html#module-dask_ml.ensemble).

# Address CPU Bound Problem: Add parallelization using Dask

- **Scikit-Learn** already utilizes multiple cores on a single machine using [Joblib](http://joblib.readthedocs.io/en/latest/). Using Dask, Scikit-Learn can be used over multiple cores across multiple machines.

- **Other Libraries** like `XGBoost`, `LightGBM`, `TensorFlow` natively supports distributed computing. Dask-ML doesn't re-implement these systems. Instead, Dask-ML makes sure that these libraries work on top of Dask data structures like Dask DataFrame & Array. That way, data preparation can be done using Dask and then the prepared data can be handed over to other farmeworks.

In [35]:
from math import sqrt

import joblib

import numpy as np
import pandas as pd

import dask
import dask.dataframe as dd
from dask.distributed import Client

In [36]:
# Utility Functions
def get_segment_id_from_path(dd, path):
    """
    Returns the segment_id from the path of the file 
    """
    dd.segment_id = dd.segment_id.str.replace(path, "")
    dd.segment_id = dd.segment_id.str.replace(".csv", "")
    dd.segment_id = dd.segment_id.astype(np.int64)
    
    return dd


def generate_stat_features(df):
    """
    Generate basic statistical features for each sensor
    
    df: Pandas DataFrame associated with a particular partition (segment_id)
    """
    stat_dict = {}
    sensors_name = [
        'sensor_1', 'sensor_2', 'sensor_3', 
        'sensor_4', 'sensor_5', 'sensor_6', 
        'sensor_7', 'sensor_8', 'sensor_9', 
        'sensor_10']
    # Get the segment_id
    segment_id = df.iloc[0].segment_id
    stat_dict["segment_id"] = segment_id
    # Generate statistics for each sensor
    for name in sensors_name:
        df[name] = df[name].fillna(0)
        stat_dict[f"{name}_max"] = df[name].max()
        stat_dict[f"{name}_min"] = df[name].min()
        stat_dict[f"{name}_mean"] = df[name].mean()
        stat_dict[f"{name}_median"] = df[name].median()
        stat_dict[f"{name}_std"] = df[name].std()
        stat_dict[f"{name}_var"] = df[name].var()
        stat_dict[f"{name}_skew"] = df[name].skew()
        stat_dict[f"{name}_kurtosis"] = df[name].kurtosis()
        
    return pd.DataFrame([stat_dict])

### Start a Dask Client

Create Local Cluster with 10 Workers & connect to the client

In [3]:
client = Client(n_workers=10, threads_per_worker=1, memory_limit='1GB')

client

0,1
Client  Scheduler: tcp://127.0.0.1:43249  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 10  Cores: 10  Memory: 10.00 GB


### Reading the Raw Data
- Select a number of segment files (CSV)
- Read those files into a Dask DataFrame. Include the segment identifier as one of the columns along with the 10 sensors

In [4]:
DATA_DIR = "/opt/vssexclude/personal/kaggle/volcano/data/raw/train"

# Define the datatypes for different sensor data
data_types = {"sensor_1" : np.float32, 
                 "sensor_2" : np.float32, 
                 "sensor_3" : np.float32,
                 "sensor_4" : np.float32,
                 "sensor_5" : np.float32,
                 "sensor_6" : np.float32,
                 "sensor_7" : np.float32,
                 "sensor_8" : np.float32,
                 "sensor_9" : np.float32,
                 "sensor_10" : np.float32}

dd_sample = dd.read_csv(urlpath=f"{DATA_DIR}/2*.csv", 
                              blocksize=None, 
                              dtype=data_types, 
                              include_path_column='segment_id')

# dd_sample_small = dd.read_csv(urlpath=f"{DATA_DIR}/1403*.csv", 
#                               blocksize=None, 
#                               dtype=data_types, 
#                               include_path_column='segment_id')

# Insert a new column with segment_id along with the values from 10 sensors
dd_sample = dd_sample.map_partitions(get_segment_id_from_path, f"{DATA_DIR}/")

In [5]:
dd_sample

Unnamed: 0_level_0,sensor_1,sensor_2,sensor_3,sensor_4,sensor_5,sensor_6,sensor_7,sensor_8,sensor_9,sensor_10,segment_id
npartitions=525,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
,float32,float32,float32,float32,float32,float32,float32,float32,float32,float32,int64
,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...


In [39]:
dd_sample.head()

Unnamed: 0,sensor_1,sensor_2,sensor_3,sensor_4,sensor_5,sensor_6,sensor_7,sensor_8,sensor_9,sensor_10,segment_id
0,76.0,-296.0,332.0,312.0,-15.0,325.0,46.0,205.0,170.0,349.0,2000347986
1,46.0,-270.0,350.0,434.0,-8.0,559.0,-57.0,345.0,185.0,251.0,2000347986
2,46.0,-194.0,361.0,516.0,0.0,676.0,-34.0,323.0,150.0,141.0,2000347986
3,95.0,-200.0,314.0,602.0,-8.0,512.0,45.0,145.0,195.0,27.0,2000347986
4,119.0,-200.0,319.0,688.0,-31.0,340.0,205.0,242.0,327.0,-91.0,2000347986


### Create a Training Data Set with Labels

- Feature Engineering
    - Extract statistical features (min, max, std etc.) from the time series for each segment.  
- Label Generation
    - Read the train.csv file. It contains "time_to_eruption" for each segment_id
- Merge the fetaures with the labels

#### Generate basic statistical fetaures for each segment (Each segment contains a time series)

In [6]:
%%time

# Generate a Dask DataFrame consisting of basic statistics for the 
# time series associated with each segment_id/partition
dd_features = dd_sample.map_partitions(generate_stat_features)

CPU times: user 26 ms, sys: 0 ns, total: 26 ms
Wall time: 27.4 ms


#### What are the features generated?

In [9]:
dd_features.columns

Index(['segment_id', 'sensor_1_max', 'sensor_1_min', 'sensor_1_mean',
       'sensor_1_median', 'sensor_1_std', 'sensor_1_var', 'sensor_1_skew',
       'sensor_1_kurtosis', 'sensor_2_max', 'sensor_2_min', 'sensor_2_mean',
       'sensor_2_median', 'sensor_2_std', 'sensor_2_var', 'sensor_2_skew',
       'sensor_2_kurtosis', 'sensor_3_max', 'sensor_3_min', 'sensor_3_mean',
       'sensor_3_median', 'sensor_3_std', 'sensor_3_var', 'sensor_3_skew',
       'sensor_3_kurtosis', 'sensor_4_max', 'sensor_4_min', 'sensor_4_mean',
       'sensor_4_median', 'sensor_4_std', 'sensor_4_var', 'sensor_4_skew',
       'sensor_4_kurtosis', 'sensor_5_max', 'sensor_5_min', 'sensor_5_mean',
       'sensor_5_median', 'sensor_5_std', 'sensor_5_var', 'sensor_5_skew',
       'sensor_5_kurtosis', 'sensor_6_max', 'sensor_6_min', 'sensor_6_mean',
       'sensor_6_median', 'sensor_6_std', 'sensor_6_var', 'sensor_6_skew',
       'sensor_6_kurtosis', 'sensor_7_max', 'sensor_7_min', 'sensor_7_mean',
       'sensor_7_m

In [40]:
dd_features.head()

Unnamed: 0,segment_id,sensor_1_max,sensor_1_min,sensor_1_mean,sensor_1_median,sensor_1_std,sensor_1_var,sensor_1_skew,sensor_1_kurtosis,sensor_2_max,...,sensor_9_skew,sensor_9_kurtosis,sensor_10_max,sensor_10_min,sensor_10_mean,sensor_10_median,sensor_10_std,sensor_10_var,sensor_10_skew,sensor_10_kurtosis
0,2000348000.0,1181.0,-1358.0,-0.537574,0.0,239.286911,57258.222656,-0.078777,0.774571,8974.0,...,0.028973,0.753266,3127.0,-3692.0,1.551424,0.0,584.828125,342023.9375,-0.061417,1.219035


#### Generate Labels

In [41]:
# Get the segment ids from the dask dataframe
# Using Dask. But that's not necessary
selected_segment_ids = dd_sample.segment_id.unique().compute()

In [11]:
# Read the CSV file containing "time_to_erruption" for each segment
time_to_errupt_df = pd.read_csv(f"{DATA_DIR}/../train.csv")

# Filter out for the selected segment_ids
time_to_errupt_selected_df = time_to_errupt_df[time_to_errupt_df.segment_id.isin(selected_segment_ids)]
time_to_errupt_selected_df = time_to_errupt_selected_df.reset_index(drop=True)

#### Merge the Fetaures with the Labels

In [12]:
%%time
data_dd = dd.merge(dd_features, time_to_errupt_selected_df, how="left", on="segment_id")

data_dd

CPU times: user 10.2 ms, sys: 8.61 ms, total: 18.8 ms
Wall time: 24 ms


Unnamed: 0_level_0,segment_id,sensor_1_max,sensor_1_min,sensor_1_mean,sensor_1_median,sensor_1_std,sensor_1_var,sensor_1_skew,sensor_1_kurtosis,sensor_2_max,sensor_2_min,sensor_2_mean,sensor_2_median,sensor_2_std,sensor_2_var,sensor_2_skew,sensor_2_kurtosis,sensor_3_max,sensor_3_min,sensor_3_mean,sensor_3_median,sensor_3_std,sensor_3_var,sensor_3_skew,sensor_3_kurtosis,sensor_4_max,sensor_4_min,sensor_4_mean,sensor_4_median,sensor_4_std,sensor_4_var,sensor_4_skew,sensor_4_kurtosis,sensor_5_max,sensor_5_min,sensor_5_mean,sensor_5_median,sensor_5_std,sensor_5_var,sensor_5_skew,sensor_5_kurtosis,sensor_6_max,sensor_6_min,sensor_6_mean,sensor_6_median,sensor_6_std,sensor_6_var,sensor_6_skew,sensor_6_kurtosis,sensor_7_max,sensor_7_min,sensor_7_mean,sensor_7_median,sensor_7_std,sensor_7_var,sensor_7_skew,sensor_7_kurtosis,sensor_8_max,sensor_8_min,sensor_8_mean,sensor_8_median,sensor_8_std,sensor_8_var,sensor_8_skew,sensor_8_kurtosis,sensor_9_max,sensor_9_min,sensor_9_mean,sensor_9_median,sensor_9_std,sensor_9_var,sensor_9_skew,sensor_9_kurtosis,sensor_10_max,sensor_10_min,sensor_10_mean,sensor_10_median,sensor_10_std,sensor_10_var,sensor_10_skew,sensor_10_kurtosis,time_to_eruption
npartitions=525,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1,Unnamed: 44_level_1,Unnamed: 45_level_1,Unnamed: 46_level_1,Unnamed: 47_level_1,Unnamed: 48_level_1,Unnamed: 49_level_1,Unnamed: 50_level_1,Unnamed: 51_level_1,Unnamed: 52_level_1,Unnamed: 53_level_1,Unnamed: 54_level_1,Unnamed: 55_level_1,Unnamed: 56_level_1,Unnamed: 57_level_1,Unnamed: 58_level_1,Unnamed: 59_level_1,Unnamed: 60_level_1,Unnamed: 61_level_1,Unnamed: 62_level_1,Unnamed: 63_level_1,Unnamed: 64_level_1,Unnamed: 65_level_1,Unnamed: 66_level_1,Unnamed: 67_level_1,Unnamed: 68_level_1,Unnamed: 69_level_1,Unnamed: 70_level_1,Unnamed: 71_level_1,Unnamed: 72_level_1,Unnamed: 73_level_1,Unnamed: 74_level_1,Unnamed: 75_level_1,Unnamed: 76_level_1,Unnamed: 77_level_1,Unnamed: 78_level_1,Unnamed: 79_level_1,Unnamed: 80_level_1,Unnamed: 81_level_1,Unnamed: 82_level_1
,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [13]:
data_dd.columns

Index(['segment_id', 'sensor_1_max', 'sensor_1_min', 'sensor_1_mean',
       'sensor_1_median', 'sensor_1_std', 'sensor_1_var', 'sensor_1_skew',
       'sensor_1_kurtosis', 'sensor_2_max', 'sensor_2_min', 'sensor_2_mean',
       'sensor_2_median', 'sensor_2_std', 'sensor_2_var', 'sensor_2_skew',
       'sensor_2_kurtosis', 'sensor_3_max', 'sensor_3_min', 'sensor_3_mean',
       'sensor_3_median', 'sensor_3_std', 'sensor_3_var', 'sensor_3_skew',
       'sensor_3_kurtosis', 'sensor_4_max', 'sensor_4_min', 'sensor_4_mean',
       'sensor_4_median', 'sensor_4_std', 'sensor_4_var', 'sensor_4_skew',
       'sensor_4_kurtosis', 'sensor_5_max', 'sensor_5_min', 'sensor_5_mean',
       'sensor_5_median', 'sensor_5_std', 'sensor_5_var', 'sensor_5_skew',
       'sensor_5_kurtosis', 'sensor_6_max', 'sensor_6_min', 'sensor_6_mean',
       'sensor_6_median', 'sensor_6_std', 'sensor_6_var', 'sensor_6_skew',
       'sensor_6_kurtosis', 'sensor_7_max', 'sensor_7_min', 'sensor_7_mean',
       'sensor_7_m

### Split the Data

Split the Data into Training and Test

In [37]:
X = data_dd.drop(columns=["segment_id", "time_to_eruption"])
X

Unnamed: 0_level_0,sensor_1_max,sensor_1_min,sensor_1_mean,sensor_1_median,sensor_1_std,sensor_1_var,sensor_1_skew,sensor_1_kurtosis,sensor_2_max,sensor_2_min,sensor_2_mean,sensor_2_median,sensor_2_std,sensor_2_var,sensor_2_skew,sensor_2_kurtosis,sensor_3_max,sensor_3_min,sensor_3_mean,sensor_3_median,sensor_3_std,sensor_3_var,sensor_3_skew,sensor_3_kurtosis,sensor_4_max,sensor_4_min,sensor_4_mean,sensor_4_median,sensor_4_std,sensor_4_var,sensor_4_skew,sensor_4_kurtosis,sensor_5_max,sensor_5_min,sensor_5_mean,sensor_5_median,sensor_5_std,sensor_5_var,sensor_5_skew,sensor_5_kurtosis,sensor_6_max,sensor_6_min,sensor_6_mean,sensor_6_median,sensor_6_std,sensor_6_var,sensor_6_skew,sensor_6_kurtosis,sensor_7_max,sensor_7_min,sensor_7_mean,sensor_7_median,sensor_7_std,sensor_7_var,sensor_7_skew,sensor_7_kurtosis,sensor_8_max,sensor_8_min,sensor_8_mean,sensor_8_median,sensor_8_std,sensor_8_var,sensor_8_skew,sensor_8_kurtosis,sensor_9_max,sensor_9_min,sensor_9_mean,sensor_9_median,sensor_9_std,sensor_9_var,sensor_9_skew,sensor_9_kurtosis,sensor_10_max,sensor_10_min,sensor_10_mean,sensor_10_median,sensor_10_std,sensor_10_var,sensor_10_skew,sensor_10_kurtosis
npartitions=525,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1,Unnamed: 44_level_1,Unnamed: 45_level_1,Unnamed: 46_level_1,Unnamed: 47_level_1,Unnamed: 48_level_1,Unnamed: 49_level_1,Unnamed: 50_level_1,Unnamed: 51_level_1,Unnamed: 52_level_1,Unnamed: 53_level_1,Unnamed: 54_level_1,Unnamed: 55_level_1,Unnamed: 56_level_1,Unnamed: 57_level_1,Unnamed: 58_level_1,Unnamed: 59_level_1,Unnamed: 60_level_1,Unnamed: 61_level_1,Unnamed: 62_level_1,Unnamed: 63_level_1,Unnamed: 64_level_1,Unnamed: 65_level_1,Unnamed: 66_level_1,Unnamed: 67_level_1,Unnamed: 68_level_1,Unnamed: 69_level_1,Unnamed: 70_level_1,Unnamed: 71_level_1,Unnamed: 72_level_1,Unnamed: 73_level_1,Unnamed: 74_level_1,Unnamed: 75_level_1,Unnamed: 76_level_1,Unnamed: 77_level_1,Unnamed: 78_level_1,Unnamed: 79_level_1,Unnamed: 80_level_1
,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [38]:
y = data_dd['time_to_eruption']

y

Dask Series Structure:
npartitions=525
    int64
      ...
    ...  
      ...
      ...
Name: time_to_eruption, dtype: int64
Dask Name: getitem, 2626 tasks

#### Split the Data into X and y for y train and test data

In [17]:
from dask_ml.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=True, random_state=42)

In [18]:
type(X_train), type(X_test), type(y_train), type(y_test)

(dask.dataframe.core.DataFrame,
 dask.dataframe.core.DataFrame,
 dask.dataframe.core.Series,
 dask.dataframe.core.Series)

## Train & Predict using `sklearn.ensemble.RandomForestRegressor`

Define a RandomForestRegressor from Scikit-Learn

In [19]:
from sklearn.ensemble import RandomForestRegressor

regr = RandomForestRegressor(max_depth=4, random_state=0)

In [20]:
%%time
with joblib.parallel_backend('dask'):
    regr.fit(X_train, y_train)

CPU times: user 16.2 s, sys: 1.56 s, total: 17.8 s
Wall time: 43.3 s


In [21]:
y_predicted = regr.predict(X_test)

y_predicted

array([20281642.97197612, 23324811.66644375, 19831609.99798705,
       15853756.40726043, 26464532.0912097 , 25592367.11799994,
       11140410.26562735, 29545891.83218128, 24086389.56263251,
        4306648.53704344, 23219833.58681439, 20117455.71315083,
       27539747.86985772, 15860225.98950318, 11017214.87684195,
       19309753.29028193, 23432591.47000372, 12508939.04143933,
       19757511.34625054, 24103496.4524239 , 16470557.93116967,
       28026241.90383112, 32756278.117418  , 25167034.66287899,
        9495138.78651862, 28123976.73458135, 22766790.96395903,
       33411007.72717222, 23467688.15995711, 23619789.51914347,
       25495040.49678827, 25735420.42068996, 16422897.46808155,
       22373867.66474379, 23440411.21845127, 25092001.06794098,
       24017164.67877829, 15435692.09842027, 21162586.94472861,
       24285812.85930696, 25900158.93605411,  9969371.10807803,
       22273646.28924372, 28958990.96885689, 28997313.16518413,
       23609203.70186076, 27426529.10192

In [22]:
type(y_test), type(y_predicted)

(dask.dataframe.core.Series, numpy.ndarray)

In [23]:
y_test

Dask Series Structure:
npartitions=525
    int64
      ...
    ...  
      ...
      ...
Name: time_to_eruption, dtype: int64
Dask Name: split, 3676 tasks

In [24]:
y_test_values = y_test.to_dask_array().compute()

y_test_values

array([37121208, 14620729, 12678388,   481996, 46507264, 36335121,
       13117924, 20141151, 31660902,  4023475, 31609293, 38302199,
       36738869, 39325300,  8131989, 41732667, 27953952, 19340560,
       31357602, 40806969,  3334460, 35881998, 10094521, 30527583,
        7779347, 31565859,  8954364, 47793586, 19578808, 25468953,
       26668615,  2649772,  5348522, 37917129, 22238776,  8202892,
       11962633, 13929131, 24532642,  9544168, 40823745, 10746680,
       15666051, 16438629, 23692506, 40342742, 22550812, 27369423,
       37969683, 23827486, 37518118, 19237462, 11645849, 45630646,
       31715507, 14061109,  3508365, 44540742, 37456771, 43594390,
       37983626,  5523317])

In [25]:
type(y_test_values), type(y_predicted)

(numpy.ndarray, numpy.ndarray)

In [26]:
from sklearn.metrics import mean_squared_error
from math import sqrt

sqrt(mean_squared_error(y_test_values, y_predicted))

12657079.969401622

## Train & Predict using `dask_xgboost`

- https://github.com/dask/dask-xgboost
- https://gist.github.com/mrocklin/19c89d78e34437e061876a9872f4d2df
- http://matthewrocklin.com/blog/work/2017/03/28/dask-xgboost

In [27]:
import dask_xgboost as dxgb

In [28]:
# Define XGB Parameters
xgb_params = {
                'objective': 'reg:squarederror',
                'eval_metric': 'rmse',
                'seed': 42,
                # Type of the booster
                'booster': 'gbtree',
                # parameters for tree booster
                'learning_rate': 0.3,
                'max_depth': 4,
                }

In [29]:
type(X_train), type(y_train)

(dask.dataframe.core.DataFrame, dask.dataframe.core.Series)

In [30]:
%%time
bst = dxgb.train(client, xgb_params, X_train, y_train)

CPU times: user 12.2 s, sys: 1.62 s, total: 13.8 s
Wall time: 28 s


In [31]:
predictions = dxgb.predict(client, bst, X_test).persist()

In [32]:
type(y_test), type(y_predicted)

(dask.dataframe.core.Series, numpy.ndarray)

In [33]:
y_test

Dask Series Structure:
npartitions=525
    int64
      ...
    ...  
      ...
      ...
Name: time_to_eruption, dtype: int64
Dask Name: split, 3676 tasks

In [34]:
from dask_ml.metrics import mean_squared_error

sqrt(mean_squared_error(y_test.to_dask_array(), predictions))

11431404.447203357

### Close the client

In [42]:
client.close()