# Predicting NYC Taxi Fares with RAPIDS

[RAPIDS](https://rapids.ai/) is a suite of GPU accelerated data science libraries with APIs that should be familiar to users of Pandas, Dask, and Scikitlearn.

This notebook focuses on showing how to use cuDF with Dask & XGBoost to scale GPU DataFrame ETL-style operations & model training out to multiple GPUs on mutliple nodes using Amazon SageMaker.

We will use the NYC Taxi dataset available available in us-west-2. If you are using this notebook in a different region, make sure you copy the relevant data in a bucket in your region.

In [1]:
import numpy as np
import numba, xgboost, socket
import dask, dask_cudf
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import xgboost as xgb


cluster = LocalCUDACluster() # by default use all GPUs in the node.
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:43215  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 257.79 GB


# Inspecting the Data

Now that we have a cluster of GPU workers, we'll use [dask-cudf](https://github.com/rapidsai/dask-cudf/) to load and parse a bunch of CSV files into a distributed DataFrame.

In [2]:
base_path = 's3://us-west-2.serverless-analytics/NYC-Pub/yellow/yellow_tripdata_'

df_2014 = dask_cudf.read_csv(base_path+'*2014*.csv')
df_2014.head().to_pandas()

Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
0,CMT,2014-01-09 20:45:25,2014-01-09 20:52:31,1,0.7,-73.99477,40.736828,1,N,-73.982227,40.73179,CRD,6.5,0.5,0.5,1.4,0.0,8.9
1,CMT,2014-01-09 20:46:12,2014-01-09 20:55:12,1,1.4,-73.982392,40.773382,1,N,-73.960449,40.763995,CRD,8.5,0.5,0.5,1.9,0.0,11.4
2,CMT,2014-01-09 20:44:47,2014-01-09 20:59:46,2,2.3,-73.98857,40.739406,1,N,-73.986626,40.765217,CRD,11.5,0.5,0.5,1.5,0.0,14.0
3,CMT,2014-01-09 20:44:57,2014-01-09 20:51:40,1,1.7,-73.960213,40.770464,1,N,-73.979863,40.77705,CRD,7.5,0.5,0.5,1.7,0.0,10.2
4,CMT,2014-01-09 20:47:09,2014-01-09 20:53:32,1,0.9,-73.995371,40.717248,1,N,-73.984367,40.720524,CRD,6.0,0.5,0.5,1.75,0.0,8.75


We have about 164 mio records for the 2014 yellow taxi data.

In [4]:
len(df_2014)

165114361

# Data Cleanup

As usual, the data needs to be massaged a bit before we can start adding features that are useful to an ML model.

For example, in the 2014 taxi CSV files, there are `pickup_datetime` and `dropoff_datetime` columns. The 2015 CSVs have `tpep_pickup_datetime` and `tpep_dropoff_datetime`, which are the same columns. One year has `rate_code`, and another `RateCodeID`.

Also, some CSV files have column names with extraneous spaces in them.

Worst of all, starting in the July 2016 CSVs, pickup & dropoff latitude and longitude data were replaced by location IDs, making the second half of the year useless to us.

We'll do a little string manipulation, column renaming, and concatenating of DataFrames to sidestep the problems.

In [5]:
# list of column names that need to be re-mapped
remap = {}
remap['tpep_pickup_datetime'] = 'pickup_datetime'
remap['tpep_dropoff_datetime'] = 'dropoff_datetime'
remap['ratecodeid'] = 'rate_code'

#create a list of columns & dtypes the df must have
must_haves = {
 'pickup_datetime': 'datetime64[ns]',
 'dropoff_datetime': 'datetime64[ns]',
 'passenger_count': 'int32',
 'trip_distance': 'float32',
 'pickup_longitude': 'float32',
 'pickup_latitude': 'float32',
 'rate_code': 'int32',
 'dropoff_longitude': 'float32',
 'dropoff_latitude': 'float32',
 'fare_amount': 'float32'
}

In [6]:
# helper function which takes a DataFrame partition
def clean(df_part, remap, must_haves):    
    # some col-names include pre-pended spaces remove & lowercase column names
    tmp = {col:col.strip().lower() for col in list(df_part.columns)}
    df_part = df_part.rename(tmp)
    
    # rename using the supplied mapping
    df_part = df_part.rename(remap)
    
    # iterate through columns in this df partition
    for col in df_part.columns:
        # drop anything not in our expected list
        if col not in must_haves:
            df_part = df_part.drop(col)
            continue
        
        # fixes datetime error found by Ty Mckercher and fixed by Paul Mahler
        if df_part[col].dtype == 'object' and col in ['pickup_datetime', 'dropoff_datetime']:
            df_part[col] = df_part[col].astype('datetime64[ns]')
            continue
                
        # if column was read as a string, recast as float
        if df_part[col].dtype == 'object':
            df_part[col] = df_part[col].str.fillna('-1')
            df_part[col] = df_part[col].astype('float32')
        else:
            # downcast from 64bit to 32bit types
            # Tesla T4 are faster on 32bit ops
            if 'int' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('int32')
            if 'float' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('float32')
            df_part[col] = df_part[col].fillna(-1)
    
    return df_part

In [7]:
df_2014

Unnamed: 0_level_0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
npartitions=110,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
,object,object,object,int64,float64,float64,float64,int64,object,float64,float64,object,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [8]:
df_2014 = df_2014.map_partitions(clean, remap, must_haves, meta=must_haves)
df_2014.head().to_pandas()

Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount
0,2014-01-09 20:45:25,2014-01-09 20:52:31,1,0.7,-73.994766,40.736828,1,-73.982224,40.731789,6.5
1,2014-01-09 20:46:12,2014-01-09 20:55:12,1,1.4,-73.982391,40.77338,1,-73.960449,40.763996,8.5
2,2014-01-09 20:44:47,2014-01-09 20:59:46,2,2.3,-73.988571,40.739407,1,-73.986626,40.765217,11.5
3,2014-01-09 20:44:57,2014-01-09 20:51:40,1,1.7,-73.960213,40.770466,1,-73.979866,40.77705,7.5
4,2014-01-09 20:47:09,2014-01-09 20:53:32,1,0.9,-73.995369,40.717247,1,-73.984367,40.720524,6.0


Note the schema and the columns have been properly casted and renamed.

In [9]:
df_2014

Unnamed: 0_level_0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount
npartitions=110,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
,datetime64[ns],datetime64[ns],int32,float32,float32,float32,int32,float32,float32,float32
,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...


# Increasing Our Training Data Size

We still have 2015 and the first half of 2016's data to read and clean.

In [10]:
df_2015 = dask_cudf.read_csv(base_path+'*2015*.csv').map_partitions(clean, remap, must_haves, meta=must_haves)

In [12]:
len(df_2015)

146112989

## Handling 2016's Mid-Year Schema Change

In 2016, only January - June CSVs have the columns we need. If we try to read `base_path+2016/yellow_*.csv`, Dask will not appreciate having differing schemas in the same DataFrame.

Instead, we'll need to create a list of the valid months and read them independently.

In [13]:
months = [str(x).rjust(2, '0') for x in range(1, 7)]
valid_files = [base_path+'*2016-'+month+'.csv' for month in months]
valid_files


['s3://us-west-2.serverless-analytics/NYC-Pub/yellow/yellow_tripdata_*2016-01.csv',
 's3://us-west-2.serverless-analytics/NYC-Pub/yellow/yellow_tripdata_*2016-02.csv',
 's3://us-west-2.serverless-analytics/NYC-Pub/yellow/yellow_tripdata_*2016-03.csv',
 's3://us-west-2.serverless-analytics/NYC-Pub/yellow/yellow_tripdata_*2016-04.csv',
 's3://us-west-2.serverless-analytics/NYC-Pub/yellow/yellow_tripdata_*2016-05.csv',
 's3://us-west-2.serverless-analytics/NYC-Pub/yellow/yellow_tripdata_*2016-06.csv']

In [14]:
# read & clean 2016 data and concat all DFs
df_2016 = dask_cudf.read_csv(valid_files).map_partitions(clean, remap, must_haves, meta=must_haves)

len(df_2016)

69406520

In [47]:
# concatenate multiple DataFrames into one bigger one
taxi_df = dask.dataframe.multi.concat([df_2014, df_2015, df_2016])

In [48]:
# apply a list of filter conditions to throw out records with missing or outlier values
query_frags = [
    'fare_amount > 0 and fare_amount < 500',
    'passenger_count > 0 and passenger_count < 6',
    'pickup_longitude > -75 and pickup_longitude < -73',
    'dropoff_longitude > -75 and dropoff_longitude < -73',
    'pickup_latitude > 40 and pickup_latitude < 42',
    'dropoff_latitude > 40 and dropoff_latitude < 42'
]
taxi_df = taxi_df.query(' and '.join(query_frags))

# inspect the results of cleaning
taxi_df.head().to_pandas()

Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount
0,2014-01-09 20:45:25,2014-01-09 20:52:31,1,0.7,-73.994766,40.736828,1,-73.982224,40.731789,6.5
1,2014-01-09 20:46:12,2014-01-09 20:55:12,1,1.4,-73.982391,40.77338,1,-73.960449,40.763996,8.5
2,2014-01-09 20:44:47,2014-01-09 20:59:46,2,2.3,-73.988571,40.739407,1,-73.986626,40.765217,11.5
3,2014-01-09 20:44:57,2014-01-09 20:51:40,1,1.7,-73.960213,40.770466,1,-73.979866,40.77705,7.5
4,2014-01-09 20:47:09,2014-01-09 20:53:32,1,0.9,-73.995369,40.717247,1,-73.984367,40.720524,6.0


In [17]:
len(taxi_df)

359864961

# Adding Interesting Features

Dask & cuDF provide standard DataFrame operations, but also let you run "user defined functions" on the underlying data.

cuDF's [apply_rows](https://rapidsai.github.io/projects/cudf/en/0.6.0/api.html#cudf.dataframe.DataFrame.apply_rows) operation is similar to Pandas's [DataFrame.apply](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.apply.html), except that for cuDF, custom Python code is [JIT compiled by numba](https://numba.pydata.org/numba-doc/dev/cuda/kernels.html) into GPU kernels.

We'll use a Haversine Distance calculation to find total trip distance, and extract additional useful variables from the datetime fields.

In [49]:
import math
from math import cos, sin, asin, sqrt, pi

def haversine_distance_kernel(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude, h_distance):
    for i, (x_1, y_1, x_2, y_2) in enumerate(zip(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude)):
        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_2 = pi/180 * x_2
        y_2 = pi/180 * y_2
        
        dlon = y_2 - y_1
        dlat = x_2 - x_1
        a = sin(dlat/2)**2 + cos(x_1) * cos(x_2) * sin(dlon/2)**2
        
        c = 2 * asin(sqrt(a)) 
        r = 6371 # Radius of earth in kilometers
        
        h_distance[i] = c * r

def day_of_the_week_kernel(day, month, year, day_of_week):
    for i, (d_1, m_1, y_1) in enumerate(zip(day, month, year)):
        if month[i] <3:
            shift = month[i]
        else:
            shift = 0
        Y = year[i] - (month[i] < 3)
        y = Y - 2000
        c = 20
        d = day[i]
        m = month[i] + shift + 1
        day_of_week[i] = (d + math.floor(m*2.6) + y + (y//4) + (c//4) -2*c)%7
        
def add_features(df):
    df['hour'] = df['pickup_datetime'].dt.hour
    df['year'] = df['pickup_datetime'].dt.year
    df['month'] = df['pickup_datetime'].dt.month
    df['day'] = df['pickup_datetime'].dt.day
    #df['diff'] = df['dropoff_datetime'].astype('int32') - df['pickup_datetime'].astype('int32')
    #df['diff'] = (df['dropoff_datetime'] - df['pickup_datetime']).dt.minutes
    
    df['pickup_latitude_r'] = df['pickup_latitude']//.01*.01
    df['pickup_longitude_r'] = df['pickup_longitude']//.01*.01
    df['dropoff_latitude_r'] = df['dropoff_latitude']//.01*.01
    df['dropoff_longitude_r'] = df['dropoff_longitude']//.01*.01
    
    #df = df.drop('pickup_datetime', axis=1)
    #df = df.drop('dropoff_datetime', axis=1)
    
    
    df = df.apply_rows(haversine_distance_kernel,
                   incols=['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude'],
                   outcols=dict(h_distance=np.float32),
                   kwargs=dict())
    
    
    df = df.apply_rows(day_of_the_week_kernel,
                      incols=['day', 'month', 'year'],
                      outcols=dict(day_of_week=np.float32),
                      kwargs=dict())
    
    
    df['is_weekend'] = (df['day_of_week']<2)
    return df

In [50]:
print(must_haves)

{'pickup_datetime': 'datetime64[ns]', 'dropoff_datetime': 'datetime64[ns]', 'passenger_count': 'int32', 'trip_distance': 'float32', 'pickup_longitude': 'float32', 'pickup_latitude': 'float32', 'rate_code': 'int32', 'dropoff_longitude': 'float32', 'dropoff_latitude': 'float32', 'fare_amount': 'float32'}


In [51]:
features = {
 'hour': 'int32',
 'year': 'int32',
 'month': 'int32',
 'day': 'float32',
# 'diff': 'int32',
 'pickup_latitude_r': 'float32',
 'pickup_longitude_r': 'float32',
 'dropoff_latitude_r': 'float32',
 'dropoff_longitude_r': 'float32',
 'h_distance': 'float32',
 'day_of_week': 'float32',
 'is_weekend': 'bool'
}
#del must_haves['dropoff_datetime']
#del must_haves['pickup_datetime']
all_features = {**must_haves, **features}

In [52]:
print(all_features)

{'pickup_datetime': 'datetime64[ns]', 'dropoff_datetime': 'datetime64[ns]', 'passenger_count': 'int32', 'trip_distance': 'float32', 'pickup_longitude': 'float32', 'pickup_latitude': 'float32', 'rate_code': 'int32', 'dropoff_longitude': 'float32', 'dropoff_latitude': 'float32', 'fare_amount': 'float32', 'hour': 'int32', 'year': 'int32', 'month': 'int32', 'day': 'float32', 'pickup_latitude_r': 'float32', 'pickup_longitude_r': 'float32', 'dropoff_latitude_r': 'float32', 'dropoff_longitude_r': 'float32', 'h_distance': 'float32', 'day_of_week': 'float32', 'is_weekend': 'bool'}


In [53]:
taxi_df

Unnamed: 0_level_0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount
npartitions=244,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
,datetime64[ns],datetime64[ns],int32,float32,float32,float32,int32,float32,float32,float32
,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...


In [54]:
# actually add the features
taxi_df = taxi_df.map_partitions(add_features, meta=all_features)
# inspect the result
taxi_df.head().to_pandas()

Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount,...,year,month,day,pickup_latitude_r,pickup_longitude_r,dropoff_latitude_r,dropoff_longitude_r,h_distance,day_of_week,is_weekend
0,2014-01-09 20:45:25,2014-01-09 20:52:31,1,0.7,-73.994766,40.736828,1,-73.982224,40.731789,6.5,...,2014,1,9,40.73,-74.0,40.73,-73.989998,1.196175,4.0,False
1,2014-01-09 20:46:12,2014-01-09 20:55:12,1,1.4,-73.982391,40.77338,1,-73.960449,40.763996,8.5,...,2014,1,9,40.77,-73.989998,40.759998,-73.970001,2.122098,4.0,False
2,2014-01-09 20:44:47,2014-01-09 20:59:46,2,2.3,-73.988571,40.739407,1,-73.986626,40.765217,11.5,...,2014,1,9,40.73,-73.989998,40.759998,-73.989998,2.874643,4.0,False
3,2014-01-09 20:44:57,2014-01-09 20:51:40,1,1.7,-73.960213,40.770466,1,-73.979866,40.77705,7.5,...,2014,1,9,40.77,-73.970001,40.77,-73.979996,1.809662,4.0,False
4,2014-01-09 20:47:09,2014-01-09 20:53:32,1,0.9,-73.995369,40.717247,1,-73.984367,40.720524,6.0,...,2014,1,9,40.709999,-74.0,40.719997,-73.989998,0.996204,4.0,False


# Pick a Training Set

Let's imagine you're making a trip to New York on the 25th and want to build a model to predict what fare prices will be like the last few days of the month based on the first part of the month. We'll use a query expression to identify the `day` of the month to use to divide the data into train and test sets.

The wall-time below represents how long it takes your GPU cluster to load data from the Google Cloud Storage bucket and the ETL portion of the workflow.

In [55]:
%%time
X_train = taxi_df.query('day < 25').persist()

# create a Y_train ddf with just the target variable
Y_train = X_train[['fare_amount']].persist()
# drop the target variable from the training ddf
X_train = X_train[X_train.columns.difference(['fare_amount'])]

# this wont return until all data is in GPU memory
done = wait([X_train, Y_train])

CPU times: user 17.3 s, sys: 1.57 s, total: 18.9 s
Wall time: 7min 7s


In [62]:
X_train = X_train.drop('pickup_datetime', axis=1)
X_train = X_train.drop('dropoff_datetime', axis=1)

In [63]:
X_train

Unnamed: 0_level_0,day,day_of_week,dropoff_latitude,dropoff_latitude_r,dropoff_longitude,dropoff_longitude_r,h_distance,hour,is_weekend,month,passenger_count,pickup_latitude,pickup_latitude_r,pickup_longitude,pickup_longitude_r,rate_code,trip_distance,year
npartitions=244,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
,float32,float32,float32,float32,float32,float32,float32,int32,bool,int32,int32,float32,float32,float32,float32,int32,float32,int32
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


# Train the XGBoost Regression Model

The wall time output below indicates how long it took your GPU cluster to train an XGBoost model over the training set.

In [64]:
import xgboost as xgb

In [None]:
dtrain = xgb.dask.DaskDMatrix(client, X_train, Y_train)
# Use train method from xgboost.dask instead of xgboost.  This
# distributed version of train returns a dictionary containing the
# resulting booster and evaluation history obtained from
# evaluation metrics.
output = xgb.dask.train(client,
                        # Use GPU training algorithm
                        {'tree_method': 'gpu_hist'},
                        dtrain,
                        num_boost_round=100,
                        evals=[(dtrain, 'train')])
booster = output['booster']  # booster is the trained model
history = output['history']  # A dictionary containing evaluation results
# Save the model to file
booster.save_model('xgboost-model')
print('Training evaluation history:', history)

# How Good is Our Model?

Now that we have a trained model, we need to test it with the 25% of records we held out.

Based on the filtering conditions applied to this dataset, many of the DataFrame partitions will wind up having 0 rows.

This is a problem for XGBoost which doesn't know what to do with 0 length arrays. We'll apply a bit of Dask logic to check for and drop partitions without any rows.

In [None]:
def drop_empty_partitions(df):
    lengths = df.map_partitions(len).compute()
    nonempty = [length > 0 for length in lengths]
    return df.partitions[nonempty]

In [None]:
X_test = taxi_df.query('day >= 25').persist()
X_test = drop_empty_partitions(X_test)

# Create Y_test with just the fare amount
Y_test = X_test[['fare_amount']]

# Drop the fare amount from X_test
X_test = X_test[X_test.columns.difference(['fare_amount'])]

# display test set size
len(X_test)

In [None]:
prediction = xgb.dask.predict(client, booster, X_test)
prediction = prediction.compute()
print('Evaluation history:', history)

# Compute Root Mean Squared Error

In [None]:
Y_test['squared_error'] = (Y_test['prediction'] - Y_test['fare_amount'])**2

# inspect the results to make sure our calculation looks right
Y_test.head().to_pandas()

In [None]:
# compute the actual RMSE over the full test set
math.sqrt(Y_test.squared_error.mean().compute())

Not bad! We can predict a taxi fare to within about $1.79.

If I'm planning to head to Strata Data in NYC, I can probably fill out my ground transportation expense items ahead of time.

# Takeaways

We just demonstrated how to use GPU DataFrames to scale ETL style operations out to multiple GPUs on multiple nodes.

We also showed how to pass prepared data directly to XGBoost without having the data ever leave GPU memory. As a result, we can run end to end data processing _and_ model training faster, using less hardware than with a CPU based solution.

While other workflows will be more complex or operate on larger dataset sizes, our hope is that pre-processing and training on approximately 70GB (360 million rows) in about 4 minutes shows that GPUs can offer speed ups that give Data Scientists less time to drink coffee, and more time to iterate on and tune model performance.
