# Using Dask and Xgboost with Google Cloud Dataproc

In this notebook, you will learn the use of Dask to process dataset from big query leveraging [Dask-BigQuery connector](https://github.com/coiled/dask-bigquery) on Dataptoc.

__Dask__ is an open source library for parallel computing written in Python. Dask framework enables us to have a scheduler and a bunch of workers. You submit tasks to the scheduler and it automatically distributes the work among the workers. It works exceptionally well on a single machine, and can scale out to large clusters when needed. 

__RAPIDS XGBoost__ is a suite of fast, GPU-accelerated Gradient Boost Tree algorithms. 

__Dataproc__ is a fully managed and highly scalable service for running Apache Spark, Apache Flink, Presto, and 30+ open source tools and frameworks. It is a managed Spark and Hadoop service that lets you take advantage of open source data tools for batch processing, querying, streaming, and machine learning. Dataproc automation helps you create clusters quickly, manage them easily, and save money by turning clusters off when you don't need them. With less time and money spent on administration, you can focus on your jobs and your data. 

We can create GPU dataproc cluster with following commands:
```export REGION=europe-west4
export GCS_BUCKET=dongm-tlt
export CLUSTER_NAME=dongm-bq
export NUM_GPUS=2
export NUM_WORKERS=2

gcloud dataproc clusters create $CLUSTER_NAME  \
    --region $REGION \
    --zone europe-west4-c \
    --image-version=2.0-ubuntu18 \
    --master-machine-type n1-standard-8 \
    --num-workers $NUM_WORKERS \
    --worker-accelerator type=nvidia-tesla-t4,count=$NUM_GPUS \
    --worker-machine-type n1-highmem-8 \
    --num-worker-local-ssds 1 \
    --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/gpu/install_gpu_driver.sh,gs://goog-dataproc-initialization-actions-${REGION}/rapids/rapids.sh \
    --optional-components=JUPYTER,ZEPPELIN \
    --metadata rapids-runtime=SPARK \
    --bucket $GCS_BUCKET \
    --enable-component-gateway \
    --subnet default
```


## Introduction

GPUs can greatly accelerate all stages of an ML pipeline: pre-processing, training, and inference. In this workshop, we will be focusing on the pre-processing and training stages, using Python in a Jupyter Notebook environment. First, we will use Dask/RAPIDS to read a dataset into NVIDIA GPU memory and execute some basic functions. Then, we’ll use Dask to scale beyond our GPU memory capacity.

This notebook has following sections:

* Cluster set up with Dataproc
* Introduction to Dask
* Data loading from Big Query
* ETL with Dataframe operations
* Machine Learning with XGBoost

Before we begin the notebook, we need to preload the data into a big query table, we can follow Big Query UI to upload NYC Taxi data with wild card: `gs://anaconda-public-data/nyc-taxi/nyc.parquet/part*0.parquet`, this will read 16 parquet files to a big query table, total ~ 1.6GB parquet data.

## Introduction to Dask

Dask is the most commonly used parallelism framework within the PyData and SciPy communities. Dask is designed to scale from parallelizing workloads on the CPUs in your laptop to thousands of nodes in a cloud cluster. In conjunction with the open-source RAPIDS framework developed by NVIDIA, you can utilize the parallel processing power of both CPUs and NVIDIA GPUs. 

In Dask programming, we create computational graphs that define code we **would like** to execute, and then, give these computational graphs to a Dask scheduler which evaluates them lazily, and efficiently, in parallel.

In addition to using multiple CPU cores or threads to execute computational graphs in parallel, Dask schedulers can also be configured to execute computational graphs on multiple CPUs, or, as we will do in this workshop, multiple GPUs. As a result, Dask programming facilitates operating on datasets that are larger than the memory of a single compute resource.

In [1]:

import cupy as cp
import numpy as np
import os
import time

# Dask imports
import dask
import dask_cudf
import dask_ml
import dask_bigquery

from dask.distributed import WorkerPlugin, wait, progress


## Nvidia GPUs
Let us start by checking hardware available, we can run following command on worker node.
```
watch -n 0.5 nvidia-smi
```

### Starting a `YarnCluster`

In [4]:
from dask.distributed import Client
from dask_yarn import YarnCluster

cluster = YarnCluster(worker_class="dask_cuda.CUDAWorker", 
    worker_gpus=1, worker_vcores=4, worker_memory='24GB', 
    worker_env={"CONDA_PREFIX":"/opt/conda/default/"})
cluster.scale(4)

21/09/24 21:27:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/09/24 21:27:15 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
21/09/24 21:27:15 INFO client.RMProxy: Connecting to ResourceManager at cluster-c6ce-telco-dask2-m/10.128.0.8:8032
21/09/24 21:27:15 INFO client.AHSProxy: Connecting to Application History server at cluster-c6ce-telco-dask2-m/10.128.0.8:10200
21/09/24 21:27:16 INFO skein.Driver: Driver started, listening on 42335
21/09/24 21:27:16 INFO conf.Configuration: found resource resource-types.xml at file:/etc/hadoop/conf.empty/resource-types.xml
21/09/24 21:27:16 INFO resource.ResourceUtils: Adding resource type - name = yarn.io/gpu, units = , type = COUNTABLE
21/09/24 21:27:16 INFO skein.Driver: Uploading application resources to hdfs://cluster-c6ce-telco-dask2-m/user/root/.skein/application_163250478143

## Instantiating a Client Connection
The `dask.distributed` library gives us distributed functionality, including the ability to connect to the CUDA Cluster we just created. The `progress` import will give us a handy progress bar we can utilize below.

In [6]:
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://10.128.0.9:39345  Dashboard: http://10.128.0.9:35653/status,Cluster  Workers: 2  Cores: 8  Memory: 52.15 GiB


In a distrubuted cluster environment, you can choose number of workers and cores based on the cluster resources. Dask ships with a very helpful dashboard that in our case runs on port `8787`.

In [7]:
print('GPU Cluster status: ', client.status)

GPU Cluster status:  running


In [8]:
# Query the client for all connected workers
# Number of workers in out cluster
workers = client.has_what().keys()
n_workers = len(workers)
print('Number of GPU workers: ', n_workers)

ddf = dask_bigquery.read_gbq(
    project_id="k80-exploration",
    dataset_id="spark_rapids",
    table_id="nyc_taxi_0",
)

ddf.head()

Number of GPU workers:  2


### Dataset

We are using [NYC Taxi Trip Duration Dataset from Kaggle](https://www.kaggle.com/c/nyc-taxi-trip-duration).

### Data fields

| Colonne            | Description                                                                                                                                                                                                           |
|:-------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| __id__                 | a unique identifier for each trip                                                                                                                                                                                     |
| __vendor_id__         | a code indicating the provider associated with the trip record                                                                                                                                                        |
| __pickup_datetime__    | date and time when the meter was engaged                                                                                                                                                                              |
| __dropoff_datetime__   | date and time when the meter was disengaged                                                                                                                                                                           |
| __passenger_count__    | the number of passengers in the vehicle (driver entered value)                                                                                                                                                        |
| __pickup_longitude__   | the longitude where the meter was engaged                                                                                                                                                                             |
| __pickup_latitude__    | the latitude where the meter was engaged                                                                                                                                                                              |
| __dropoff_longitude__  | the longitude where the meter was disengaged                                                                                                                                                                          |
| __dropoff_latitude__   | the latitude where the meter was disengaged                                                                                                                                                                           |
| __store_and_fwd_flag__ | This flag indicates whether the trip record was held in vehicle memory before sending to the vendor because the vehicle did not have a connection to the server (Y=store and forward; N=not a store and forward trip) |
| __trip_duration__      | duration of the trip in seconds                                                                                                                                                                                       |

## ETL Exploration

### Taxi Data Configuration (Medium)
We can use the parquet data from the anaconda public repo here. Which will illustrate how much faster it is to read parquet, and gives us around 150 million rows of data to work with.

In [9]:
def clean(df_part, remap, must_haves):
    """
    This function performs the various clean up tasks for the data
    and returns the cleaned dataframe.
    """
    tmp = {col:col.strip().lower() for col in list(df_part.columns)}
    df_part = df_part.rename(columns=tmp)
    
    # rename using the supplied mapping
    df_part = df_part.rename(columns=remap)
    
    # iterate through columns in this df partition
    for col in df_part.columns:
        # drop anything not in our expected list
        if col not in must_haves:
            df_part = df_part.drop(col, axis=1)
            continue

        # fixes datetime error found by Ty Mckercher and fixed by Paul Mahler
        if df_part[col].dtype == 'object' and col in ['pickup_datetime', 'dropoff_datetime']:
            df_part[col] = df_part[col].astype('datetime64[ms]')
            continue

        # if column was read as a string, recast as float
        if df_part[col].dtype == 'object':
            df_part[col] = df_part[col].astype('float32')
        else:
            # downcast from 64bit to 32bit types
            # Tesla T4 are faster on 32bit ops
            if 'int' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('int32')
            if 'float' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('float32')
            df_part[col] = df_part[col].fillna(-1)
            
    return df_part


In [10]:
# list of column names that need to be re-mapped
remap = {}
remap['tpep_pickup_datetime'] = 'pickup_datetime'
remap['tpep_dropoff_datetime'] = 'dropoff_datetime'
remap['ratecodeid'] = 'rate_code'

#create a list of columns & dtypes the df must have
must_haves = {
 'pickup_datetime': 'datetime64[ms]',
 'dropoff_datetime': 'datetime64[ms]',
 'passenger_count': 'int32',
 'trip_distance': 'float32',
 'pickup_longitude': 'float32',
 'pickup_latitude': 'float32',
 'rate_code': 'int32',
 'dropoff_longitude': 'float32',
 'dropoff_latitude': 'float32',
 'fare_amount': 'float32'
}

# apply a list of filter conditions to throw out records with missing or outlier values
query_frags = [
    'fare_amount > 0 and fare_amount < 500',
    'passenger_count > 0 and passenger_count < 6',
    'pickup_longitude > -75 and pickup_longitude < -73',
    'dropoff_longitude > -75 and dropoff_longitude < -73',
    'pickup_latitude > 40 and pickup_latitude < 42',
    'dropoff_latitude > 40 and dropoff_latitude < 42'
]


In [11]:
# This is path to our dataset and it is a parquet file. Can we load a CSV file too? Oh yes! 
# taxi_parquet_path = "gs://anaconda-public-data/nyc-taxi/nyc.parquet"

# We are considering a limited columns for our ML pipeline.
response_id = 'fare_amount'
fields = ['passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code',
             'dropoff_longitude', 'dropoff_latitude', 'fare_amount']


### Load Dataset
By default, dask uses default block size of 64 MB each. So, `size of dataset / 64` will be number of tasks spread across partitions. If you desire to create a DataFrame with a specific block size, and use `npartitions` argument like below. We used `npartitions=len(workers)` to split data equally into all workers.

In [12]:
%%time

taxi_df = dask_cudf.from_dask_dataframe(ddf)
taxi_df = taxi_df.repartition(npartitions=len(workers))

CPU times: user 1.18 s, sys: 677 ms, total: 1.86 s
Wall time: 2.13 s


In [13]:
taxi_df = clean(taxi_df, remap, must_haves)
taxi_df = taxi_df.query(' and '.join(query_frags))
taxi_df = taxi_df[fields]


__Woah!__ I thought we will see data. What do you think has happened here?

Because Dask is lazy, the computation has not yet occurred. We can see that there are 160 tasks in the task graph. We can force computation by using `persist`. By forcing execution, the result is now explicitly in memory and our task graph only contains one task per partition (the baseline).

In [14]:
response_dtype=np.float32

with dask.annotate(workers=set(workers)):
        taxi_df = client.persist(collections=taxi_df)
    
wait(taxi_df)

X = taxi_df[taxi_df.columns.difference([response_id])].astype(np.float32)
y = taxi_df[response_id].astype(response_dtype)


####  Split data to training and test set
We will use the `train_test_split()` method from `Dask_ML`.


In [16]:
from dask_ml.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, \
                                                    train_size=0.80, \
                                                    random_state=42, \
                                                    shuffle=True)


In [17]:
X_train = X.persist()
X_test = X.persist()
y_train = y.persist()
y_test = y.persist()

### XGBoost

XGBoost falls under the category of Boosting techniques in Ensemble Learning. The algorithm was developed to efficiently reduce computing time and allocate an optimal usage of memory resources. Important features of implementation include handling of missing values (Sparse Aware), Block Structure to support parallelization in tree construction and the ability to fit and boost on new data added to a trained model. ([reference](https://www.kdnuggets.com/2017/10/xgboost-top-machine-learning-method-kaggle-explained.html)) 

Here is the original paper,
[XGBoost: A Scalable Tree Boosting System](https://arxiv.org/abs/1603.02754)


In [20]:
import xgboost as xgb

dmatrix_train = xgb.dask.DaskDMatrix(client, X_train, y_train)
dmatrix_test = xgb.dask.DaskDMatrix(client, X_test, y_test)

params = {"max_depth": 8,
        "max_leaves": 2 ** 8,
        "alpha": 0.9,
        "eta": 0.1,
        "gamma": 0.1,
        "learning_rate": 0.1,
        "subsample": 1,
        "reg_lambda": 1,
        "scale_pos_weight": 2,
        "min_child_weight": 30,
        "tree_method": "gpu_hist", #use GPU for tree building
        "objective": "reg:squaredlogerror",
        "grow_policy": "lossguide",
    }



In [21]:
%%time

# Use xgboost.dask to distribute across workers-- returns dict 
xgb_clasf = xgb.dask.train(client, 
                           params,
                           dmatrix_train, 
                           num_boost_round=2000,
                           evals=[(dmatrix_train, 'train'), (dmatrix_test,'test')]
                          ) 

# booster is the trained model 
model_xgb = xgb_clasf['booster'] 


CPU times: user 107 ms, sys: 7.04 ms, total: 114 ms
Wall time: 13.2 s


### Inferencing
#### Measuring model performance


In [22]:
# Set to use GPU for inference.
model_xgb.set_param({'predictor': 'gpu_predictor'})
# dtrain is the DaskDMatrix defined above.
y_predict = xgb.dask.predict(client, xgb_clasf['booster'], dmatrix_test)
y_predict


Unnamed: 0,Array,Chunk
Bytes,6.50 MiB,770.58 kiB
Shape,"(1703313,)","(197268,)"
Count,30 Tasks,10 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 6.50 MiB 770.58 kiB Shape (1703313,) (197268,) Count 30 Tasks 10 Chunks Type float32 numpy.ndarray",1703313  1,

Unnamed: 0,Array,Chunk
Bytes,6.50 MiB,770.58 kiB
Shape,"(1703313,)","(197268,)"
Count,30 Tasks,10 Chunks
Type,float32,numpy.ndarray


In [23]:
print("Preditction Labels are of type : " + str(type(y_predict)))
print("Test Labels are of type : " + str(type(y_test)))

Preditction Labels are of type : <class 'dask.array.core.Array'>
Test Labels are of type : <class 'dask_cudf.core.DataFrame'>


In [24]:
y = cp.asarray(y_predict)
y

array([58.520275 ,  8.61709  ,  6.8902235, ..., 15.131949 , 21.359398 ,
       20.86175  ], dtype=float32)

In [25]:
t = y_test.to_dask_array()
t = t.compute()
t

array([[69.5],
       [ 8. ],
       [ 5.7],
       ...,
       [14.5],
       [24. ],
       [20.5]], dtype=float32)

In [None]:
import dask

y_t, y_p = dask.compute(t, y)
#y_test_c, y_predict = dask.compute(y_test, y_predict)
print("XGB model AUC: {:.2%}".format(cuml.metrics.regression.mean_absolute_error(y_t, y_p, multioutput='uniform_average')))

#print("XGB model AUC: {:.2%}".format(cuml.metrics.roc_auc_score(y_t, y_p)))


In [31]:
print("XGBoost model accuracy:   {:.2%}".format(cuml.metrics.accuracy_score(y_t, y_p)))

XGBoost model accuracy:   36.46%


#### Resources

##### Dask

##### XGBoost
* [Ensemble Learning to Improve Machine Learning Results](https://blog.statsbot.co/ensemble-learning-d1dcd548e936)
* [Complete Guide to Parameter Tuning in XGBoost with codes in Python](https://www.analyticsvidhya.com/blog/2016/03/complete-guide-parameter-tuning-xgboost-with-codes-python/)
* [Understanding XGBoost Algorithm In Detail](https://analyticsindiamag.com/xgboost-internal-working-to-make-decision-trees-and-deduce-predictions/)

##### Random Forest Regressor
* [Random Forest](https://williamkoehrsen.medium.com/random-forest-simple-explanation-377895a60d2d) \
* [Random Forest Regression](https://towardsdatascience.com/machine-learning-basics-random-forest-regression-be3e1e3bb91a)
* [Classification and Regression by randomForest](https://www.researchgate.net/profile/Andy-Liaw/publication/228451484_Classification_and_Regression_by_RandomForest/links/53fb24cc0cf20a45497047ab/Classification-and-Regression-by-RandomForest.pdf)
