# Predicting NYC Taxi Fares with RAPIDS

[RAPIDS](https://rapids.ai/) is a suite of GPU accelerated data science libraries with APIs that should be familiar to uses of Pandas, Dask, and Scikitlearn.

This notebook focuses on showing how to use cuDF with Dask & XGBoost to scale GPU DataFrame ETL-style operations & model training out to multiple GPUs on mutliple nodes as part of Google Cloud Dataproc.

Anaconda has graciously made some of the NYC Taxi dataset available in [a public Google Cloud Storage bucket](https://console.cloud.google.com/storage/browser/anaconda-public-data/nyc-taxi/csv/). We'll use our Dataproc Cluster of GPUs to process it and train a model that predicts the fare amount.

In [31]:
import numpy as np
import numba, xgboost, socket

import dask, dask_cudf
from dask_cuda import LocalCUDACluster
from dask.delayed import delayed
from dask.distributed import Client, wait

# connect to the Dask cluster that Dataproc stood up
client = Client(socket.gethostname()+':8786')
# forces workers to restart. useful to ensure GPU memory is clear
client.restart()

# attempt to limit work-stealing as much as possible
dask.config.set({'distributed.scheduler.work-stealing': False})
dask.config.get('distributed.scheduler.work-stealing')
dask.config.set({'distributed.scheduler.bandwidth': 1})
dask.config.get('distributed.scheduler.bandwidth')

client

0,1
Client  Scheduler: tcp://test-m:8786  Dashboard: http://test-m:8787/status,Cluster  Workers: 12  Cores: 12  Memory: 0 B


# Inspecting the Data

Now that we have a cluster of GPU workers, we'll use [dask-cudf](https://github.com/rapidsai/dask-cudf/) to load and parse a bunch of CSV files into a distributed DataFrame.

First we'll tell Dask to have all workers use `gsutil cp` to copy data from the GCS bucket to local disk. This is significantly faster than having Dask read from the bucket directly into a DataFrame.

In [88]:
# dask-cuda-worker creates one worker per GPU on Dataproc worker instances
# Get the list of unique IP addresses
machine_ips = []
machines = []
workers = list(client.scheduler_info()['workers'].keys())
for worker in workers:
    ip = worker.split(":")[1].replace("//", "")
    if ip not in machine_ips:
        machine_ips.append(ip)
        machines.append(worker)

In [103]:
import subprocess

def copy_files():
    proc = subprocess.Popen(
        "gsutil cp -r gs://anaconda-public-data/nyc-taxi/csv/2014 .",
        shell=True,
        stdout=subprocess.PIPE
    )
    proc.wait()
    return proc.communicate()[0]

# have each worker instance copy data for 2014 over to local disk
#client.run(copy_files, workers=machines)

# have the master copy data
copy_files()

b''

In [104]:
taxi_df = dask_cudf.read_csv('2014/yellow*')
taxi_df.head().to_pandas()

FileNotFoundError: 

# Data Cleanup

As usual, the data needs to be massaged a bit before we can start adding features that are useful to an ML model.

In this case, the taxi data for different years has different column names. We'll do a little string manipulation, column renaming and dropping to fix the problem.

In [4]:
# helper function which takes a DataFrame partition
def clean(df_part, mapper):    
    # some col-names include pre-pended space.. remove them
    tmp = {col:col.strip().lower() for col in list(df_part.columns)}
    df_part = df_part.rename(tmp)
    
    # drop any column without a supplied replacement name
    for col in mapper:
        if col in mapper and mapper[col] == None and col in df_part.columns:
            df_part = df_part.drop(col)
    
    # rename according to supplied mapping
    df_part = df_part.rename(mapper)
        
    # fill all na values for non-object/string columns
    for col in df_part.columns:
        if df_part[col].dtype != 'object':
            df_part[col] = df_part[col].fillna(-1)
    
    return df_part

# create a dict mapping existing names to intended names
# any dict entry with `None` will be dropped
col_map = dict.fromkeys([
    'vendor_id', 'vendorid', 'payment_type', 'surcharge', 'mta_tax',
    'tip_amount', 'tolls_amount', 'total_amount', 'store_and_fwd_flag', 'pulocationid',
    'dolocationid'
])
col_map['tpep_pickup_datetime'] = 'pickup_datetime'
col_map['tpep_dropoff_datetime'] = 'dropoff_datetime'
col_map['ratecodeid'] = 'rate_code'

# apply the `clean` function to every partition in the taxi DataFrame
parts = [dask.delayed(clean)(part, col_map) for part in taxi_df.to_delayed()]
taxi_df = dask_cudf.from_delayed(parts)

# apply a list of filter conditions to throw out records with missing or outlier values
query_frags = [
    'fare_amount > 0 and fare_amount < 500',
    'passenger_count > 0 and passenger_count < 6',
    'pickup_longitude > -75 and pickup_longitude < -73',
    'dropoff_longitude > -75 and dropoff_longitude < -73',
    'pickup_latitude > 40 and pickup_latitude < 42',
    'dropoff_latitude > 40 and dropoff_latitude < 42'
]
taxi_df = taxi_df.query(' and '.join(query_frags))

# inspect the results of cleaning
taxi_df.head().to_pandas()

Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount
0,2012-12-13 08:17:27,2012-12-13 08:38:40,1,1.9,-73.96411,40.757821,1,-73.983536,40.754967,14.5
1,2012-12-13 08:35:37,2012-12-13 08:54:25,1,3.4,-73.9685,40.798956,1,-73.98414,40.759404,15.0
2,2012-12-13 08:38:59,2012-12-13 08:43:43,1,0.7,-74.006575,40.731896,1,-73.998296,40.736337,5.5
3,2012-12-13 08:56:37,2012-12-13 09:00:01,1,0.6,-73.975472,40.78534,1,-73.97174,40.791869,4.5
4,2012-12-13 07:43:04,2012-12-13 07:47:53,1,0.7,-73.984874,40.763159,1,-73.974123,40.759363,5.0


# Adding Interesting Features

Dask & cuDF provide standard DataFrame operations, but also let you run "user defined functions" on the underlying data.

cuDF's [apply_rows](https://rapidsai.github.io/projects/cudf/en/0.6.0/api.html#cudf.dataframe.DataFrame.apply_rows) operation is similar to Pandas's [DataFrame.apply](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.apply.html), except that for cuDF, custom Python code is [JIT compiled by numba](https://numba.pydata.org/numba-doc/dev/cuda/kernels.html) into GPU kernels.

In [5]:
import math
import cudf

#Numba Kernel to calculate Haversine distance
@numba.cuda.jit
def haversine_kernel(lat1, lon1, lat2, lon2, outputCol):
    iRow = numba.cuda.grid(1)
    p = 0.017453292519943295 # Pi/180
    if iRow < outputCol.size:
        a = 0.5 - math.cos((lat2[iRow] - lat1[iRow]) * p)/2 + math.cos(lat1[iRow] * p) * \
            math.cos(lat2[iRow] * p) * (1 - math.cos((lon2[iRow] - lon1[iRow]) * p)) / 2                                 
        outputCol[iRow] = 12734 * math.asin(math.sqrt(a))

def haversine_distance(gdf):
    nRows = gdf.shape[0]
    blockSize = 128
    blockCount = nRows // blockSize + 1
    lat1_arr = gdf['pickup_latitude'].to_gpu_array()
    lon1_arr = gdf['pickup_longitude'].to_gpu_array()
    lat2_arr = gdf['dropoff_latitude'].to_gpu_array()
    lon2_arr = gdf['dropoff_longitude'].to_gpu_array()
                        
    # allocate device memory for the result
    outputCol = cudf.rmm.device_array ( shape=(nRows), dtype=lat1_arr.dtype.name)
    
    haversine_kernel[(blockCount),(blockSize)](lat1_arr, lon1_arr, lat2_arr, lon2_arr, outputCol)
    gdf.add_column(name='h_distance', data = outputCol)
    return gdf

#Numba Kernel to calculate day of the week from Date
@numba.cuda.jit
def day_of_the_week_kernel(output ,year, month, day):
    iRow = numba.cuda.grid(1)
    if iRow < output.size:
        year[iRow] -= month[iRow] < 3
        month[iRow] = (month[iRow] + 9)%12 + 1
        output[iRow] = (year[iRow] + int(year[iRow]/4) - int(year[iRow]/100) + int(year[iRow]/400) + math.floor(2.6*month[iRow] - 0.2) + day[iRow] -1) % 7

@numba.cuda.jit
def round_kernel(lat1, outputCol):
    iRow = numba.cuda.grid(1)
    if iRow < outputCol.size:
        a = math.ceil(lat1[iRow])
        outputCol[iRow] = a
        
def lat_rounder(gdf, lat, name):
    nRows = gdf.shape[0]
    blockSize = 128
    blockCount = nRows // blockSize + 1
    lat1_arr = gdf[lat].to_gpu_array()
    
    outputCol = cudf.rmm.device_array ( shape=(nRows), dtype='int32')
    
    round_kernel[(blockCount, blockSize)](lat1_arr, outputCol)
    gdf.add_column(name=name, data=outputCol)
    return gdf
        
        
def day_of_week(gdf):
    nRows = gdf.shape[0]
    blockSize = 128
    blockCount = nRows // blockSize + 1
    year_arr = gdf['year'].to_gpu_array()
    month_arr = gdf['month'].to_gpu_array()
    day_arr = gdf['day'].to_gpu_array()
    outputCol = cudf.rmm.device_array ( shape=(nRows), dtype=day_arr.dtype.name)
    
    day_of_the_week_kernel[(blockCount),(blockSize)](outputCol, year_arr, month_arr, day_arr)
    gdf.add_column(name='day_of_week', data = outputCol)
    gdf['day_of_week'] = gdf['day_of_week'].astype('float32')
    return gdf

def add_features(df):
    df['hour'] = df['pickup_datetime'].dt.hour
    df['year'] = df['pickup_datetime'].dt.year
    df['month'] = df['pickup_datetime'].dt.month
    df['day'] = df['pickup_datetime'].dt.day
    df['diff'] = df['dropoff_datetime'].astype('int64') - df['pickup_datetime'].astype('int64')
    
    df = df.drop('pickup_datetime')
    df = df.drop('dropoff_datetime')
    
    df = day_of_week(df)
    df['is_weekend'] = (df['day_of_week']/4).floor()
    df = haversine_distance(df)
    df = lat_rounder(df, 'pickup_latitude','pick_lat_round')
    df = lat_rounder(df, 'dropoff_latitude','drop_lat_round')
    df = lat_rounder(df, 'pickup_longitude','pick_long_round')
    df = lat_rounder(df, 'dropoff_longitude','drop_long_round')
    
    df = df.drop('pickup_latitude')
    df = df.drop('dropoff_latitude')
    df = df.drop('pickup_longitude')
    df = df.drop('dropoff_longitude')

    return df

# Add features
parts = [dask.delayed(add_features)(part) for part in taxi_df.to_delayed()]
taxi_df = dask_cudf.from_delayed(parts)

taxi_df.head().to_pandas()

Unnamed: 0,passenger_count,trip_distance,rate_code,fare_amount,hour,year,month,day,diff,day_of_week,is_weekend,h_distance,pick_lat_round,drop_lat_round,pick_long_round,drop_long_round
0,1,1.9,1,14.5,8,2012,10,13,1273000,3.0,0.0,1.665683,41,41,-73,-73
1,1,3.4,1,15.0,8,2012,10,13,1128000,3.0,0.0,4.588028,41,41,-73,-73
2,1,0.7,1,5.5,8,2012,10,13,284000,3.0,0.0,0.85413,41,41,-74,-73
3,1,0.6,1,4.5,8,2012,10,13,204000,3.0,0.0,0.790566,41,41,-73,-73
4,1,0.7,1,5.0,7,2012,10,13,289000,3.0,0.0,0.998404,41,41,-73,-73


# Pick a Training Set

Let's imagine you're making a trip to New York on the 25th and want to build a model to predict what fare prices will be like the last few days of the month based on the first part of the month. We'll use a query expression to identify the `day` of the month to use to divide the data into train and test sets.

The wall-time below represents how long it takes your GPU cluster to run the ETL portion of the workflow.

In [7]:
%%time

# note you could also use taxi_df.query('day < 25') if you prefer that syntax
X_train = taxi_df[taxi_df['day'] < 25]

# create a Y_train ddf with just the target variable
Y_train = X_train[['fare_amount']]

# drop the target variable from the training ddf
X_train = X_train[X_train.columns.difference(['fare_amount'])]

# force compute these DataFrames and persist them in GPU memory
X_train = X_train.persist()
Y_train = Y_train.persist()

# this wont return until all data is in GPU memory
wait([X_train, Y_train])

DoneAndNotDoneFutures(done={<Future: status: finished, type: DataFrame, key: ('getitem-5e3e1945a4f1058deca7894afdd1fced', 15)>, <Future: status: finished, type: DataFrame, key: ('getitem-5e3e1945a4f1058deca7894afdd1fced', 12)>, <Future: status: finished, type: DataFrame, key: ('getitem-5e3e1945a4f1058deca7894afdd1fced', 5)>, <Future: status: finished, type: DataFrame, key: ('getitem-f4f254366f8a8df1f7d9fe88baab4f78', 15)>, <Future: status: finished, type: DataFrame, key: ('getitem-f4f254366f8a8df1f7d9fe88baab4f78', 8)>, <Future: status: finished, type: DataFrame, key: ('getitem-5e3e1945a4f1058deca7894afdd1fced', 18)>, <Future: status: finished, type: DataFrame, key: ('getitem-5e3e1945a4f1058deca7894afdd1fced', 14)>, <Future: status: finished, type: DataFrame, key: ('getitem-f4f254366f8a8df1f7d9fe88baab4f78', 19)>, <Future: status: finished, type: DataFrame, key: ('getitem-f4f254366f8a8df1f7d9fe88baab4f78', 7)>, <Future: status: finished, type: DataFrame, key: ('getitem-5e3e1945a4f1058d

In [8]:
# display how many records will be used in training
def pretty(val):
    print("{:,}".format(val))

pretty(len(X_train))

263,428,853


# Train the XGBoost Regression Model

The wall time output below indicates how long it took your GPU cluster to train an XGBoost model over the training set.

In [9]:
%%time

import dask_xgboost as dxgb_gpu

params = {
 'learning_rate': 0.3,
  'max_depth': 8,
  'objective': 'reg:squarederror', #gpu:reg deprecated
  'subsample': 0.6,
  'gamma': 1,
  'silent': True,
  'verbose_eval': True,
  'tree_method':'gpu_hist',
  'n_gpus': 1
}

bst = dxgb_gpu.train(client, params, X_train, Y_train, num_boost_round=100)

CPU times: user 144 ms, sys: 40 ms, total: 184 ms
Wall time: 1min 48s


# How Good is Our Model?

Now that we have a trained model, we need to test it with the 25% of records we held out.

Based on the filtering conditions applied to this dataset, many of the DataFrame partitions will wind up having 0 rows.

This is a problem for XGBoost which doesn't know what to do with 0 length arrays. We'll apply a bit of Dask logic to check for and drop partitions without any rows.

In [10]:
X_test = taxi_df.query('day >= 25')

# in this dataset
lengths = X_test.map_partitions(len).compute()
nonempty = [length > 0 for length in lengths]

X_test = X_test.partitions[nonempty].persist()

In [11]:
# Create Y_test with just the fare amount
Y_test = X_test[['fare_amount']]

# Drop the fare amount from X_test
X_test = X_test[X_test.columns.difference(['fare_amount'])]

# generate predictions on the test set
Y_test['prediction'] = dxgb_gpu.predict(client, bst, X_test)

# Compute Root Mean Squared Error

In [12]:
Y_test['squared_error'] = (Y_test['prediction'] - Y_test['fare_amount'])**2

# inspect the results to make sure our calculation looks right
Y_test.head().to_pandas()

Unnamed: 0,fare_amount,prediction,squared_error
123,15.0,14.55858,0.194851
125,7.5,7.428008,0.005183
126,8.0,8.236113,0.055749
127,8.5,8.82984,0.108794
128,7.0,7.096241,0.009262


In [13]:
# compute the actual RMSE over the full test set
math.sqrt(Y_test.squared_error.mean().compute())

1.7131995443517858

# Takeaways

We just demonstrated how to use GPU DataFrames to scale ETL style operations out to multiple GPUs on multiple nodes.

We also showed how to pass prepared data directly to XGBoost without having the data ever leave GPU memory. As a result, we can run end to end data processing _and_ model training faster, using less hardware than with a CPU based solution.

What now?

[Check out RAPIDS on GitHub](https://github.com/rapidsai) and follow the development, or pitch in by reporting issues, making pull requests or even just requesting the features your workflows need. We look forward to hearing from you!

# Appendix

In [4]:
import os
import pandas as pd

# generate list of all files
base_dir = '/data/nyc_taxi/raw/'
files = []
for year in range(2009, 2019):
    for fn in os.listdir(base_dir+str(year)):
        if 'yellow' in fn:
            files.append(base_dir+str(year)+'/'+fn)

# get list of headers
def get_columns(fn):
    df = pd.DataFrame()
    with open(fn, 'r') as fp:
        df['year'] = [fn.split('-')[-2].split('_')[-1]]
        df['month'] = [fn.split('-')[-1].split('.')[0]]
        df['line'] = [fp.readline()]
    return df

parts = [dask.delayed(get_columns)(fn) for fn in files]
res = dask.dataframe.from_delayed(parts)
res.repartition(npartitions=1).compute().line.drop_duplicates()

0    vendor_name,Trip_Pickup_DateTime,Trip_Dropoff_...
0    vendor_id,pickup_datetime,dropoff_datetime,pas...
0    vendor_id, pickup_datetime, dropoff_datetime, ...
0    VendorID,tpep_pickup_datetime,tpep_dropoff_dat...
0    VendorID,tpep_pickup_datetime,tpep_dropoff_dat...
0    VendorID,tpep_pickup_datetime,tpep_dropoff_dat...
Name: line, dtype: object