# Using Dask and CuML with Google Cloud Dataproc

In this workshop, you will learn the use of Dask and CuML on Dataptoc.

__Dask__ is an open source library for parallel computing written in Python. Dask framework enables us to have a scheduler and a bunch of workers. You submit tasks to the scheduler and it automatically distributes the work among the workers. It works exceptionally well on a single machine, and can scale out to large clusters when needed. 
<img src="./images/dask-logo.png" width="200" height="200">

__RAPIDS CuML__ is a suite of fast, GPU-accelerated machine learning algorithms designed for data science and analytical tasks. Our API mirrors Sklearn’s, and we provide practitioners with the easy fit-predict-transform paradigm without ever having to program on a GPU.
<img src="./images/rapids_cuml.png" width="300" height="200">

__Dataproc__ is a fully managed and highly scalable service for running Apache Spark, Apache Flink, Presto, and 30+ open source tools and frameworks. It is a managed Spark and Hadoop service that lets you take advantage of open source data tools for batch processing, querying, streaming, and machine learning. Dataproc automation helps you create clusters quickly, manage them easily, and save money by turning clusters off when you don't need them. With less time and money spent on administration, you can focus on your jobs and your data. 
<img src="./images/gcp_dataproc_logo.png" width="400" height="300">


## Introduction

GPUs can greatly accelerate all stages of an ML pipeline: pre-processing, training, and inference. In this workshop, we will be focusing on the pre-processing and training stages, using Python in a Jupyter Notebook environment. First, we will use Dask/RAPIDS to read a dataset into NVIDIA GPU memory and execute some basic functions. Then, we’ll use Dask to scale beyond our GPU memory capacity.

This notebook has following sections:

* Introduction to Dataproc
* Introduction to Dask
* Data Loading
* ETL
* Introduction to CuML
* Introduction to XGBoost
* Machine Learning pipeline

## Introduction to Dask

Dask is the most commonly used parallelism framework within the PyData and SciPy communities. Dask is designed to scale from parallelizing workloads on the CPUs in your laptop to thousands of nodes in a cloud cluster. In conjunction with the open-source RAPIDS framework developed by NVIDIA, you can utilize the parallel processing power of both CPUs and NVIDIA GPUs. 

In Dask programming, we create computational graphs that define code we **would like** to execute, and then, give these computational graphs to a Dask scheduler which evaluates them lazily, and efficiently, in parallel.

In addition to using multiple CPU cores or threads to execute computational graphs in parallel, Dask schedulers can also be configured to execute computational graphs on multiple CPUs, or, as we will do in this workshop, multiple GPUs. As a result, Dask programming facilitates operating on datasets that are larger than the memory of a single compute resource.

### Starting a `LocalCUDACluster`

`dask_cuda` provides utilities for Dask and CUDA (the "cu" in cuDF) interactions.

In [None]:
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster()

## Instantiating a Client Connection
The `dask.distributed` library gives us distributed functionality, including the ability to connect to the CUDA Cluster we just created. The `progress` import will give us a handy progress bar we can utilize below.

In [None]:
client = Client(cluster)
client

Dask ships with a very helpful dashboard that in our case runs on port `8787`

### Dataset

We are using [NYC Taxi Trip Duration Dataset from Kaggle](https://www.kaggle.com/c/nyc-taxi-trip-duration).

### Data fields

| Colonne            | Description                                                                                                                                                                                                           |
|:-------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| __id__                 | a unique identifier for each trip                                                                                                                                                                                     |
| __vendor_id__         | a code indicating the provider associated with the trip record                                                                                                                                                        |
| __pickup_datetime__    | date and time when the meter was engaged                                                                                                                                                                              |
| __dropoff_datetime__   | date and time when the meter was disengaged                                                                                                                                                                           |
| __passenger_count__    | the number of passengers in the vehicle (driver entered value)                                                                                                                                                        |
| __pickup_longitude__   | the longitude where the meter was engaged                                                                                                                                                                             |
| __pickup_latitude__    | the latitude where the meter was engaged                                                                                                                                                                              |
| __dropoff_longitude__  | the longitude where the meter was disengaged                                                                                                                                                                          |
| __dropoff_latitude__   | the latitude where the meter was disengaged                                                                                                                                                                           |
| __store_and_fwd_flag__ | This flag indicates whether the trip record was held in vehicle memory before sending to the vendor because the vehicle did not have a connection to the server (Y=store and forward; N=not a store and forward trip) |
| __trip_duration__      | duration of the trip in seconds                                                                                                                                                                                       |

### Taxi Data Configuration (Medium)
We can use the parquet data from the anaconda public repo here. Which will illustrate how much faster it is to read parquet, and gives us around 150 million rows of data to work with.

In [None]:
# Uncomment to test with Taxi Dataset

def data_loader():
    
    return

preload_data = False
append_to_existing = True
samples = 5
load_samples = 1
worker_counts = [8]
scaling_denom = 8
hardware_type = 'V100'
max_data_frac = 1.0
scale_type = 'weak' # weak | strong
out_prefix = 'taxi_medium'

if (not preload_data):
    data_loader = taxi_parquet_data_loader
else:
    data = taxi_parquet_data_loader(client, fraction=max_data_frac)
    data_loader = lambda client, response_dtype, fraction, random_state: data

print(data_loader)
if (not hardware_type):
    raise RuntimeError("Please specify the hardware type for this run! ex. (T4, V100, A100)")

sweep_kwargs = {
    'append_to_existing': append_to_existing,
    'samples': samples,
    'load_samples': load_samples,
    'worker_counts': worker_counts,
    'scaling_denom': scaling_denom,
    'hardware_type': hardware_type,
    'data_loader': data_loader,
    'max_data_frac': max_data_frac,
    'scaling_type': scale_type
}

In [None]:
import certifi
import cudf
import cuml
import cupy as cp
import numpy as np
import os
import pandas as pd
import random
import seaborn as sns
import time
import yaml

from functools import partial
from math import cos, sin, asin, sqrt, pi
from tqdm import tqdm
from typing import Optional

import dask
import dask.array as da
import dask_cudf

from dask.distributed import Client, WorkerPlugin, wait, progress

class SimpleTimer:
    def __init__(self):
        self.start = None
        self.end = None
        self.elapsed = None

    def __enter__(self):
        self.start = time.perf_counter_ns()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.end = time.perf_counter_ns()
        self.elapsed = self.end - self.start

### Taxi Data Setup

In [None]:
def clean(df_part, remap, must_haves):
    """
    This function performs the various clean up tasks for the data
    and returns the cleaned dataframe.
    """
    tmp = {col:col.strip().lower() for col in list(df_part.columns)}
    df_part = df_part.rename(columns=tmp)
    
    # rename using the supplied mapping
    df_part = df_part.rename(columns=remap)
    
    # iterate through columns in this df partition
    for col in df_part.columns:
        # drop anything not in our expected list
        if col not in must_haves:
            df_part = df_part.drop(col, axis=1)
            continue

        # fixes datetime error found by Ty Mckercher and fixed by Paul Mahler
        if df_part[col].dtype == 'object' and col in ['pickup_datetime', 'dropoff_datetime']:
            df_part[col] = df_part[col].astype('datetime64[ms]')
            continue

        # if column was read as a string, recast as float
        if df_part[col].dtype == 'object':
            df_part[col] = df_part[col].astype('float32')
        else:
            # downcast from 64bit to 32bit types
            # Tesla T4 are faster on 32bit ops
            if 'int' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('int32')
            if 'float' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('float32')
            df_part[col] = df_part[col].fillna(-1)
            
    return df_part

def coalesce_taxi_data(fraction, random_state):
    """
    This function loads and process data to form a dataframe which will be used as input for CuML algorithm.
    
    Parameters:
        fraction: Fraction of axis items to return. Cannot be used with n
        random_state: Seed for the random number generator (if int), or None. If None, a random seed will be chosen.
                if RandomState, seed will be extracted from current state.

    
    """
    base_path = 'gcs://anaconda-public-data/nyc-taxi/csv'

    # list of column names that need to be re-mapped
    remap = {}
    remap['tpep_pickup_datetime'] = 'pickup_datetime'
    remap['tpep_dropoff_datetime'] = 'dropoff_datetime'
    remap['ratecodeid'] = 'rate_code'

    #create a list of columns & dtypes the df must have
    must_haves = {
     'pickup_datetime': 'datetime64[ms]',
     'dropoff_datetime': 'datetime64[ms]',
     'passenger_count': 'int32',
     'trip_distance': 'float32',
     'pickup_longitude': 'float32',
     'pickup_latitude': 'float32',
     'rate_code': 'int32',
     'dropoff_longitude': 'float32',
     'dropoff_latitude': 'float32',
     'fare_amount': 'float32'
    }
    
    # apply a list of filter conditions to throw out records with missing or outlier values
    query_frags = [
        'fare_amount > 0 and fare_amount < 500',
        'passenger_count > 0 and passenger_count < 6',
        'pickup_longitude > -75 and pickup_longitude < -73',
        'dropoff_longitude > -75 and dropoff_longitude < -73',
        'pickup_latitude > 40 and pickup_latitude < 42',
        'dropoff_latitude > 40 and dropoff_latitude < 42'
    ]
    
    
    valid_months_2016 = [str(x).rjust(2, '0') for x in range(1, 7)]
    valid_files_2016 = [f'{base_path}/2016/yellow_tripdata_2016-{month}.csv' for month in valid_months_2016]
    
    df_2014_fractional = dask_cudf.read_csv(f'{base_path}/2014/yellow_*.csv', chunksize=25e6).sample(
        frac=fraction, random_state=random_state)
    df_2014_fractional = clean(df_2014_fractional, remap, must_haves)
    
    df_2015_fractional = dask_cudf.read_csv(f'{base_path}/2015/yellow_*.csv', chunksize=25e6).sample(
        frac=fraction, random_state=random_state)
    df_2015_fractional = clean(df_2015_fractional, remap, must_haves)
    
    df_2016_fractional = dask_cudf.read_csv(valid_files_2016, chunksize=25e6).sample(
        frac=fraction, random_state=random_state)
    df_2016_fractional = clean(df_2016_fractional, remap, must_haves)
    
    df_taxi = dask.dataframe.multi.concat([df_2014_fractional, df_2015_fractional, df_2016_fractional])
    df_taxi = df_taxi.query(' and '.join(query_frags))
    
    return df_taxi

# ETL Exploration CSV vs Parquet

In [None]:
remap = {}
remap['tpep_pickup_datetime'] = 'pickup_datetime'
remap['tpep_dropoff_datetime'] = 'dropoff_datetime'
remap['ratecodeid'] = 'rate_code'

#create a list of columns & dtypes the df must have
must_haves = {
 'pickup_datetime': 'datetime64[ms]',
 'dropoff_datetime': 'datetime64[ms]',
 'passenger_count': 'int32',
 'trip_distance': 'float32',
 'pickup_longitude': 'float32',
 'pickup_latitude': 'float32',
 'rate_code': 'int32',
 'dropoff_longitude': 'float32',
 'dropoff_latitude': 'float32',
 'fare_amount': 'float32'
}

# apply a list of filter conditions to throw out records with missing or outlier values
query_frags = [
    'fare_amount > 0 and fare_amount < 500',
    'passenger_count > 0 and passenger_count < 6',
    'pickup_longitude > -75 and pickup_longitude < -73',
    'dropoff_longitude > -75 and dropoff_longitude < -73',
    'pickup_latitude > 40 and pickup_latitude < 42',
    'dropoff_latitude > 40 and dropoff_latitude < 42'
]

workers = client.has_what().keys()

In [None]:
%%time
base_path = 'gcs://anaconda-public-data/nyc-taxi/csv'

with SimpleTimer() as timer_csv:
    # Load data into dask dataframe
    df_csv_2014 = dask_cudf.read_csv(f'{base_path}/2014/yellow_*.csv', chunksize=25e6)
    df_csv_2014 = clean(df_csv_2014, remap, must_haves)
    df_csv_2014 = df_csv_2014.query(' and '.join(query_frags))
    
    with dask.annotate(workers=set(workers)):
        df_csv_2014 = client.persist(collections=df_csv_2014)
        
    wait(df_csv_2014)

print(df_csv_2014.columns)
rows_csv = df_csv_2014.iloc[:,0].shape[0].compute()
print(f"CSV load took {timer_csv.elapsed/1e9} sec. For {rows_csv} rows of data => {rows_csv/(timer_csv.elapsed/1e9)} rows/sec")

In [None]:
client.cancel(df_csv_2014)

In [None]:
%%time
with SimpleTimer() as timer_parquet:
    df_parquet = dask_cudf.read_parquet(f'gs://anaconda-public-data/nyc-taxi/nyc.parquet', chunksize=25e6)
    df_parquet = clean(df_parquet, remap, must_haves)
    df_parquet = df_parquet.query(' and '.join(query_frags))
    
    with dask.annotate(workers=set(workers)):
        df_parquet = client.persist(collections=df_parquet)
    
    wait(df_parquet)

print(df_parquet.columns)
rows_parquet = df_parquet.iloc[:,0].shape[0].compute()
print(f"Parquet load took {timer_parquet.elapsed/1e9} sec. For {rows_parquet} rows of data => {rows_parquet/(timer_parquet.elapsed/1e9)} rows/sec")

In [None]:
client.cancel(df_parquet)

In [None]:
speedup = (rows_parquet/(timer_parquet.elapsed/1e9))/(rows_csv/(timer_csv.elapsed/1e9))
print(speedup)

## Performance Validation Code

In [None]:
def record_elapsed_timings_to_df(df, timings, record_template, type, columns, write_to=None):
    records = [dict(record_template, **{"sample_index": i,
                                        "elapsed": elapsed,
                                        "type": type})
                  for i, elapsed in enumerate(timings)]

    df = df.append(other=records, ignore_index=True)
    
    if (write_to):
        df.to_csv(write_to, columns=columns) 
            
    return df


def collect_load_time_samples(load_func, count, return_final_sample=True, verbose=False):
    timings = []
    for m in tqdm(range(count)):
        with SimpleTimer() as timer:
            df, X, y = load_func()
        timings.append(timer.elapsed)
    
    if (return_final_sample):
        return df, X, y, timings
    
    return None, None, None, timings


def collect_func_time_samples(func, count, verbose=False):
    timings = []
    for k in tqdm(range(count)):
        with SimpleTimer() as timer:
            func()
        timings.append(timer.elapsed)
        
    return timings


def sweep_fit_func(model, func_id, require_compute, X, y, xy_fit, count):
    _fit_func_attr = getattr(model, func_id)
    if (require_compute):
        if (xy_fit):
            fit_func = partial(lambda X, y: _fit_func_attr(X, y).compute(), X, y)
        else:
            fit_func = partial(lambda X: _fit_func_attr(X).compute(), X)
    else:
        if (xy_fit):
            fit_func = partial(_fit_func_attr, X, y)
        else:
            fit_func = partial(_fit_func_attr, X)                

    return collect_func_time_samples(func=fit_func, count=count)


def sweep_predict_func(model, func_id, require_compute, X, count):
    _predict_func_attr = getattr(model, func_id)
    predict_func = partial(lambda X: _predict_func_attr(X).compute(), X)
    
    return collect_func_time_samples(func=predict_func, count=count)
    

def performance_sweep(client, model, data_loader, hardware_type, worker_counts=[1], samples=1, load_samples=1, max_data_frac=1.0,
                    predict_frac=0.05, scaling_type='weak', xy_fit=True, fit_requires_compute=False, update_workers_in_kwargs=True,
                    response_dtype=np.float32, out_path='./perf_sweep.csv', append_to_existing=False, model_name=None,
                    fit_func_id="fit", predict_func_id="predict", scaling_denom=None, model_args={}, model_kwargs={}):
    """
    Primary performance sweep entrypoint.
    
    
    Parameters
    ------------
    client: DASK client associated with the cluster we're interesting in collecting performance data for.
    
    model: Model object on which to gather performance data. This will be created and destroyed,
        once for each element of 'worker_counts'
    
    data_loader: arbitrary data loading function that will be called to load the appropriate testing data.
        Function that is responsible for loading and returning the data to be used for a given performance run. Function
        signature must accept (client, fraction, and random_state). Client should be used to distribute data, and loaders
        should utilize fraction and random_state with dask's dataframe.sample method to allow for control of how much data
        is loaded.
        
        When called, its return value should be of the form: df, X, y, where df is the full dask_cudf dataframe, X is a
        dask_cudf dataframe which contains all explanatory variables that will be passed to the 'fit' function, and y is a
        dask_cudf series or dataframe that contains response variables which should be passed to fit/predict as fit(X, y)
    
    hardware_type: indicates the core hardware the current sweep is running on. ex. 'T4', 'V100', 'A100'
    
    worker_counts: List indicating the number of workers that should be swept. Ex [1, 2, 4]
        worker counts must fit within the cluster associated with 'client', if the current DASK worker count is different
        from what is requested on a given sweep, attempt to automatically scale the worker count. NOTE: this does not 
        mean we will scale the available cluster nodes, just the number of deployed worker pods.
    
    samples: number of fit/predict samples to record per worker count
    
    load_samples: number of times to sample data loads. This effectively times how long 'data_loader' runs.
    
    max_data_frac: maximum fraction of data to return.
        Strong scaling: each run will utilize max_data_frac data.
        Weak scaling: each run will utilize (current worker count) / (max worker count) * max_data_frac data.
        
    predict_frac: fraction of training data used to test inference
    
    scaling_type: values can be 'weak' or 'strong' indicating the type of scaling sweep to perform.
    
    xy_fit: indicates whether or not the model's 'fit' function is of the form (X, y), when xy_fit is False, we assume that
        fit is of the form (X), as is the case with various unsupervised methods ex. KNN.
    
    fit_requires_compute: False generally, set this to True if the model's 'fit' function requires a corresponding '.compute()'
        call to execute the required work.
    
    update_workers_in_kwargs: Some algorithms accept a 'workers' list, much like DASK, and will require their kwargs to have
        workers populated. Setting this flag handles this automatically.
        
    response_dtype: defaults to np.float32, some algorithms require another dtype, such as int32
    
    out_path: path where performance data csv should be saved
    
    append_to_existing: When true, append results to an existing csv, otherwise overwrite.
    
    model_name: Override what we output as the model name
    
    fit_func_id: Defaults to 'fit', only set this if the model has a non-standard naming.
    
    predict_func_id: Defaults to 'predict', only set this if the model has a on-standard predict naming.
    
    scaling_denom: (weak scaling) defaults to max(workers) if unset. Specifies the maximum worker count that weak scaling
        should scale against. For example, when using 1 worker in a weak scaling sweep, the worker will attempt to
        process a fraction of the total data equal to 1/scaling_denom
    
    model_args: args that will be passed to the model's constructor
    
    model_kwargs: keyword args that will be passed to the model's constructor

    Returns
    --------
    
    """
    
    cols = ['n_workers', 'sample_index', 'elapsed', 'type', 'algorithm', 'scaling_type', 'data_fraction', 'hardware']
    perf_df = cudf.DataFrame(columns=cols)
    if (append_to_existing):
        try:
            perf_df = cudf.read_csv(out_path)
        except:
            pass
    
    model_name = model_name if model_name else str(model)
    scaling_denom = scaling_denom if (scaling_denom is not None) else max(worker_counts)
    max_data_frac = min(1.0, max_data_frac)

    start_msg = f"Starting {scaling_type}-scaling performance sweep for:\n"
    start_msg += f" model      : {model_name}\n"
    start_msg += f" data loader: {data_loader}.\n"
    start_msg += f"Configuration\n"
    start_msg += "==========================\n"
    start_msg += f"{'Worker counts':<25} : {worker_counts}\n"
    start_msg += f"{'Fit/Predict samples':<25} : {samples}\n"
    start_msg += f"{'Data load samples':<25} : {load_samples}\n"
    start_msg += f"- {'Max data fraction':<23} : {max_data_frac}\n"
    start_msg += f"{'Model fit':<25} : {'X ~ y' if xy_fit else 'X'}\n"
    start_msg += f"- {'Response DType':<23} : {response_dtype}\n"
    start_msg += f"{'Writing results to':<25} : {out_path}\n"
    start_msg += f"- {'Method':<23} : {'overwrite' if not append_to_existing else 'append'}\n"
    print(start_msg, flush=True)
    
    for n in worker_counts:        
        fraction = (n / scaling_denom) * max_data_frac if scaling_type == 'weak' else max_data_frac
        record_template = {"n_workers": n, "type": "predict", "algorithm": model_name,
               "scaling_type": scaling_type, "data_fraction": fraction, "hardware": hardware_type}
        scale_workers(client, n)

        print(f"Sampling <{load_samples}> load times with {n} workers.", flush=True)
        
        load_func = partial(data_loader, client=client, response_dtype=response_dtype, fraction=fraction, random_state=0)
        df, X, y, load_timings = collect_load_time_samples(load_func=load_func, count=load_samples)
        
        perf_df = record_elapsed_timings_to_df(df=perf_df, timings=load_timings, type='load',
                                                    record_template=record_template, columns=cols, write_to=out_path)

        print(f"Finished loading <{load_samples}>, samples, to <{n}> workers with a mean time of {np.mean(load_timings)/1e9:0.4f} sec.", flush=True)
        print(f"Sweeping {model_name} '{fit_func_id}' with <{n}> workers. Sampling <{samples}> times.", flush=True)

        if (update_workers_in_kwargs and 'workers' in model_kwargs):
            model_kwargs['workers'] = workers = client.has_what().keys()
    
        m = model(*model_args, **model_kwargs)
        if (fit_func_id):
            fit_timings = sweep_fit_func(model=m, func_id=fit_func_id,
                                             require_compute=fit_requires_compute,
                                             X=X, y=y, xy_fit=xy_fit, count=samples)

            perf_df = record_elapsed_timings_to_df(df=perf_df, timings=fit_timings, type='fit',
                                                        record_template=record_template, columns=cols, write_to=out_path)

            print(f"Finished gathering <{samples}>, 'fit' samples using <{n}> workers, with a mean time of {np.mean(fit_timings)/1e9:0.4f} sec.",
                  flush=True)
        else:
            print(f"Skipping fit sweep, fit_func_id is None")

        if (predict_func_id):
            print(f"Sweeping {model_name} '{predict_func_id}' with <{n}> workers. Sampling <{samples}> times.", flush=True)
            predict_timings = sweep_predict_func(model=m, func_id=predict_func_id,
                                                     require_compute=True, X=X, count=samples)

            perf_df = record_elapsed_timings_to_df(df=perf_df, timings=predict_timings, type='predict',
                                                        record_template=record_template, columns=cols, write_to=out_path)
            
            print(f"Finished gathering <{samples}>, 'predict' samples using <{n}> workers, with a mean time of {np.mean(predict_timings)/1e9:0.4f} sec.",
                  flush=True)
        else:
            print(f"Skipping inference sweep. predict_func_id is None")

### Visualization and Analysis

In [None]:
def simple_ci(df, fields, groupby):
    gbdf = df[fields].groupby(groupby).agg(['mean', 'std', 'count'])   
    
    ci = (1.96 + gbdf['elapsed']['std'] / np.sqrt(gbdf['elapsed']['count']))
    
    ci_df = ci.reset_index()
    ci_df['ci.low'] = gbdf['elapsed'].reset_index()['mean'] - ci_df[0]
    ci_df['ci.high'] = gbdf['elapsed'].reset_index()['mean'] + ci_df[0]
    
    return ci_df

def visualize_csv_data(csv_path):
    df = cudf.read_csv(csv_path)
    
    fields = ['elapsed', 'elapsed_sec', 'type', 'n_workers', 'hardware', 'scaling_type']
    groupby = ['n_workers', 'type', 'hardware', 'scaling_type']
    df['elapsed_sec'] = df['elapsed']/1e9

    ci_df = simple_ci(df, fields, groupby=groupby)

    # Rescale to seconds
    ci_df[['ci.low', 'ci.high']] = ci_df[['ci.low', 'ci.high']]/1e9

    # Print confidence intervals
    print(ci_df[['hardware', 'n_workers', 'type', 'ci.low', 'ci.high']][ci_df['type'] != 'load'])

    sns.set_theme(style="whitegrid")
    sns.set(rc={'figure.figsize':(20, 10)}, font_scale=2)

    # Boxplots for elapsed time at each worker count.
    plot_df = df[fields][df[fields].type != 'load'].to_pandas()
    ax = sns.catplot(data=plot_df, x="n_workers", y="elapsed_sec",
                     col="type", row="scaling_type", hue="hardware", kind="box",
                     height=8, order=None)

### Taxi Data Loader

In [None]:
def taxi_csv_data_loader(client, response_dtype=np.float32, fraction=1.0, random_state=0):
    """
    A CSV data_loader. Reads CSV files, clean and process to return a dataframe.
    
    Parameters:
    
    client: DASK client associated with the cluster we're interesting in collecting performance data for.
    fraction: Fraction of axis items to return. Cannot be used with n
    random_state: Seed for the random number generator (if int), or None. If None, a random seed will be chosen.
                if RandomState, seed will be extracted from current state.

    """
    response_id = 'fare_amount'
    workers = client.has_what().keys()
    km_fields = ['passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code',
                 'dropoff_longitude', 'dropoff_latitude', 'fare_amount']
    
    taxi_df = coalesce_taxi_data(fraction=fraction, random_state=random_state)
    
    taxi_df = taxi_df[km_fields]
    with dask.annotate(workers=set(workers)):
        taxi_df = client.persist(collections=taxi_df)
    
    X = taxi_df[taxi_df.columns.difference([response_id])].astype(np.float32)
    y = taxi_df[response_id].astype(response_dtype)
    
    wait(taxi_df)
    
    return taxi_df, X, y

def taxi_parquet_data_loader(client, response_dtype=np.float32, fraction=1.0, random_state=0):
    """
    A Parquet data_loader. Reads parquet files, clean and process to return a dataframe.
    
    Parameters:
    
    client: DASK client associated with the cluster we're interesting in collecting performance data for.
    fraction: Fraction of axis items to return. Cannot be used with n
    random_state: Seed for the random number generator (if int), or None. If None, a random seed will be chosen.
                if RandomState, seed will be extracted from current state.

    """
    # list of column names that need to be re-mapped
    remap = {}
    remap['tpep_pickup_datetime'] = 'pickup_datetime'
    remap['tpep_dropoff_datetime'] = 'dropoff_datetime'
    remap['ratecodeid'] = 'rate_code'

    #create a list of columns & dtypes the df must have
    must_haves = {
     'pickup_datetime': 'datetime64[ms]',
     'dropoff_datetime': 'datetime64[ms]',
     'passenger_count': 'int32',
     'trip_distance': 'float32',
     'pickup_longitude': 'float32',
     'pickup_latitude': 'float32',
     'rate_code': 'int32',
     'dropoff_longitude': 'float32',
     'dropoff_latitude': 'float32',
     'fare_amount': 'float32'
    }

    # apply a list of filter conditions to throw out records with missing or outlier values
    query_frags = [
        'fare_amount > 0 and fare_amount < 500',
        'passenger_count > 0 and passenger_count < 6',
        'pickup_longitude > -75 and pickup_longitude < -73',
        'dropoff_longitude > -75 and dropoff_longitude < -73',
        'pickup_latitude > 40 and pickup_latitude < 42',
        'dropoff_latitude > 40 and dropoff_latitude < 42'
    ]

    workers = client.has_what().keys()
    taxi_parquet_path = "gs://anaconda-public-data/nyc-taxi/nyc.parquet"
    response_id = 'fare_amount'
    fields = ['passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code',
                 'dropoff_longitude', 'dropoff_latitude', 'fare_amount']
    
    taxi_df = dask_cudf.read_parquet(taxi_parquet_path, npartitions=len(workers))
    taxi_df = clean(taxi_df, remap, must_haves)
    taxi_df = taxi_df.query(' and '.join(query_frags))
    taxi_df = taxi_df[fields]
    
    with dask.annotate(workers=set(workers)):
        taxi_df = client.persist(collections=taxi_df)
    
    wait(taxi_df)

    X = taxi_df[taxi_df.columns.difference([response_id])].astype(np.float32)
    y = taxi_df[response_id].astype(response_dtype)
        
    return taxi_df, X, y

## Ensemble Methods

Ensemble methods are learning algorithms that construct a set of classifiers and then classify new data points by taking a weighted vote of their predictions. There are three subsets of ensemble learning methods. 

1. __BAGGing__, or __B__ootstrap __AGG__regating
2. __Boosting__
3. __Stacking__

If you like to get more details about it, please refer to [Simple Guide to Ensemble Methods](https://towardsdatascience.com/simple-guide-for-ensemble-learning-methods-d87cc68705a2)


### Random Forest Regressor

The Decision Tree algorithm has a major disadvantage in that it causes over-fitting. To address these weaknesses, we turn to Random Forest. Random forest is a Supervised Learning algorithm which uses ensemble learning method for classification and regression. It is very fast and robust than other models. For anyone interested, we have added original papers and couple of blogs to learn more about Random Forest Regressor.

We are going to use [RandomForestRegressor API](https://docs.rapids.ai/api/cuml/stable/api.html#random-forest) from CuML library.

In [None]:
from cuml.dask.ensemble import RandomForestRegressor

rf_kwargs = {
    "workers": client.has_what().keys(),
    "n_estimators": 10,
    "max_depth": 12
}
rf_csv_path = f"./{out_prefix}_random_forest_regression.csv"

performance_sweep(client=client, model=RandomForestRegressor,
                **sweep_kwargs,
                out_path=rf_csv_path,
                response_dtype=np.int32,
                model_kwargs=rf_kwargs)

In [None]:
rf_csv_path = f"./{out_prefix}_random_forest_regression.csv"
visualize_csv_data(rf_csv_path)

### XGBoost

XGBoost falls under the category of Boosting techniques in Ensemble Learning. The algorithm was developed to efficiently reduce computing time and allocate an optimal usage of memory resources. Important features of implementation include handling of missing values (Sparse Aware), Block Structure to support parallelization in tree construction and the ability to fit and boost on new data added to a trained model. ([reference](https://www.kdnuggets.com/2017/10/xgboost-top-machine-learning-method-kaggle-explained.html)) 

Here is the original paper,
[XGBoost: A Scalable Tree Boosting System](https://arxiv.org/abs/1603.02754)

In [None]:
import xgboost as xgb

xg_args = [client]
xg_kwargs = {
    'params': {
        'tree_method': 'gpu_hist',
    },
    'num_boost_round': 100
}

xgb_csv_path = f'./{out_prefix}_xgb.csv'

class XGBProxy():
    """
    Create a simple API wrapper around XGBoost so that it supports the fit/predict workflow.
    
    Parameters
    -------------
    data_loader: data loader object intended to be used by the performance sweep.
    """
    def __init__(self, data_loader):
        self.args = []
        self.kwargs = {}
        self.data_loader = data_loader
        self.trained_model = None
        
    def loader(self, client, response_dtype, fraction, random_state):
        """
        Wrap the data loader method so that it creates a DMatrix from the returned data.
        """
        df, X, y = self.data_loader(client, response_dtype, fraction, random_state)
        dmatrix = xgb.dask.DaskDMatrix(client, X, y)
        
        return dmatrix, dmatrix, dmatrix

    def __call__(self, *args, **kwargs):
        """
        Acts as a pseudo init function which initializes our model args.
        """
        self.args = args
        self.kwargs = kwargs
        
        return self
        
    def fit(self, X):
        """
        Wrap dask.train, and store the model on our proxy object.
        """
        if (self.trained_model):
            del self.trained_model
            
        self.trained_model = xgb.dask.train(*self.args,
                              dtrain=X,
                              evals=[(X, 'train')],
                              **self.kwargs)
        return self
    
    def predict(self, X):
        assert(self.trained_model)
        
        return xgb.dask.predict(*self.args, self.trained_model, X)
    

xgb_proxy = XGBProxy(data_loader)
performance_sweep(client=client, model=xgb_proxy, data_loader=xgb_proxy.loader, hardware_type=hardware_type,
                worker_counts=worker_counts, 
                samples=samples,
                load_samples=load_samples,
                max_data_frac=max_data_frac, 
                scaling_type=scale_type,
                out_path=xgb_csv_path,
                append_to_existing=append_to_existing,
                update_workers_in_kwargs=False,
                xy_fit=False,
                scaling_denom = scaling_denom,
                model_args=xg_args,
                model_kwargs=xg_kwargs)

#### Resources

##### Dask

##### XGBoost
* [Ensemble Learning to Improve Machine Learning Results](https://blog.statsbot.co/ensemble-learning-d1dcd548e936)
* [Complete Guide to Parameter Tuning in XGBoost with codes in Python](https://www.analyticsvidhya.com/blog/2016/03/complete-guide-parameter-tuning-xgboost-with-codes-python/)
* [Understanding XGBoost Algorithm In Detail](https://analyticsindiamag.com/xgboost-internal-working-to-make-decision-trees-and-deduce-predictions/)

##### Random Forest Regressor
* [Random Forest](https://williamkoehrsen.medium.com/random-forest-simple-explanation-377895a60d2d) \
* [Random Forest Regression](https://towardsdatascience.com/machine-learning-basics-random-forest-regression-be3e1e3bb91a)
* [Classification and Regression by randomForest](https://www.researchgate.net/profile/Andy-Liaw/publication/228451484_Classification_and_Regression_by_RandomForest/links/53fb24cc0cf20a45497047ab/Classification-and-Regression-by-RandomForest.pdf)