# Predicting NYC Taxi Fares with RAPIDS

RAPIDS is a suite of GPU accelerated data science libraries with APIs that should be familiar to users of Pandas, Dask, and Scikitlearn.

This notebook focuses on showing how to use cuDF with Dask & XGBoost to scale GPU DataFrame ETL-style operations & model training out to multiple GPUs on mutliple nodes as part of Google Cloud Dataproc.

Anaconda has graciously made some of the NYC Taxi dataset available in a public Google Cloud Storage bucket. We'll use our Dataproc Cluster of GPUs to process it and train a model that predicts the fare amount.

For EDA we show the examples using [Holoviews](http://holoviews.org/) and [hvplot](https://hvplot.holoviz.org/). Best way to install Holoviews is to from `conda-forge` channel `conda install -c conda-forge holoviews`
and for hvplot `pyviz` channel. `conda install -c pyviz hvplot`

In [None]:
%matplotlib inline
import glob
import matplotlib.pyplot as plt
import socket, time
import modin.pandas as modin_omni_pd
# import xgboost as xgb

#To install Holoviews and hvplot
#conda install -c conda-forge holoviews
#conda install -c pyviz hvplot
import holoviews as hv
from holoviews import opts
import numpy as np
import hvplot.pandas
import hvplot.dask
hv.extension('bokeh')
import ray
ray.init(object_store_memory=42874035712)

# Inspecting the Data

Now that we have a cluster of GPU workers, we'll use [dask-cudf](https://github.com/rapidsai/dask-cudf/) to load and parse a bunch of CSV files into a distributed DataFrame. 

# Data Cleanup

As usual, the data needs to be massaged a bit before we can start adding features that are useful to an ML model.

For example, in the 2014 taxi CSV files, there are `pickup_datetime` and `dropoff_datetime` columns. The 2015 CSVs have `tpep_pickup_datetime` and `tpep_dropoff_datetime`, which are the same columns. One year has `rate_code`, and another `RateCodeID`.

Also, some CSV files have column names with extraneous spaces in them.

Worst of all, starting in the July 2016 CSVs, pickup & dropoff latitude and longitude data were replaced by location IDs, making the second half of the year useless to us.

We'll do a little string manipulation, column renaming, and concatenating of DataFrames to sidestep the problems.

In [5]:
#Dictionary of required columns and their datatypes
must_haves = {
     'pickup_datetime': 'datetime64[s]',
     'dropoff_datetime': 'datetime64[s]',
     'passenger_count': 'int32',
     'trip_distance': 'float32',
     'pickup_longitude': 'float32',
     'pickup_latitude': 'float32',
     'rate_code': 'int32',
     'dropoff_longitude': 'float32',
     'dropoff_latitude': 'float32',
     'fare_amount': 'float32'
    }

In [6]:
def clean(ddf, must_haves):
    # replace the extraneous spaces in column names and lower the font type
    tmp = {col:col.strip().lower() for col in list(ddf.columns)}
    ddf = ddf.rename(columns=tmp)

    ddf = ddf.rename(columns={
        'tpep_pickup_datetime': 'pickup_datetime',
        'tpep_dropoff_datetime': 'dropoff_datetime',
        'ratecodeid': 'rate_code'
    })
    
#    ddf['pickup_datetime'] = ddf['pickup_datetime'].astype('datetime64[ms]')
#    ddf['dropoff_datetime'] = ddf['dropoff_datetime'].astype('datetime64[ms]')

    for col in ddf.columns:
        if col not in must_haves:
            ddf = ddf.drop(columns=col)
            continue
        # if column was read as a string, recast as float
        if ddf[col].dtype == 'object':
            ddf[col] = ddf[col].fillna('-1')
#            ddf[col] = ddf[col].astype('float32')
#        else:
            # downcast from 64bit to 32bit types
            # Tesla T4 are faster on 32bit ops
#            if 'int' in str(ddf[col].dtype):
#                ddf[col] = ddf[col].astype('int32')
#            if 'float' in str(ddf[col].dtype):
#                ddf[col] = ddf[col].astype('float32')
#            ddf[col] = ddf[col].fillna(-1)
    
    return ddf

In [7]:
base_path = './nyctaxi/'

df_2014 = modin_omni_pd.concat([
    clean(modin_omni_pd.read_csv(x, parse_dates=[' pickup_datetime', ' dropoff_datetime']), must_haves)
    for x in glob.glob(base_path+'2014/yellow_*.csv')], ignore_index=True)
#df_2014 = modin_omni_pd.read_csv(base_path+'2014/yellow_tripdata_2014-01.csv', parse_dates=[' pickup_datetime', ' dropoff_datetime'])
#df_2014 = clean(df_2014, must_haves)
#df_2014.head()

In [8]:
df_2014.dtypes

pickup_datetime      datetime64[ns]
dropoff_datetime     datetime64[ns]
passenger_count               int64
trip_distance               float64
pickup_longitude            float64
pickup_latitude             float64
rate_code                     int64
dropoff_longitude           float64
dropoff_latitude            float64
fare_amount                 float64
dtype: object

<b> NOTE: </b>We will realize that some of 2015 data has column name as `RateCodeID` and others have `RatecodeID`. When we rename the columns in the clean function, it internally doesn't pass meta while calling map_partitions(). This leads to the error of column name mismatch in the returned data. For this reason, we will call the clean function with map_partition and pass the meta to it. Here is the link to the bug created for that: https://github.com/rapidsai/cudf/issues/5413 

In [9]:
df_2014.shape

(165114361, 10)

We still have 2015 and the first half of 2016's data to read and clean. Let's increase our dataset.

In [10]:
df_2015 = modin_omni_pd.concat([
    clean(modin_omni_pd.read_csv(x, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime']), must_haves)
    for x in glob.glob(base_path + '2015/yellow_*.csv')], ignore_index=True)
#df_2015 = modin_omni_pd.read_csv(base_path+'2015/yellow_tripdata_2015-01.csv', parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
#df_2015 = clean(df_2015, must_haves)

In [11]:
df_2015.shape

(146112989, 10)

# Handling 2016's Mid-Year Schema Change

In 2016, only January - June CSVs have the columns we need. If we try to read base_path+2016/yellow_*.csv, Dask will not appreciate having differing schemas in the same DataFrame.

Instead, we'll need to create a list of the valid months and read them independently.

In [12]:
months = [str(x).rjust(2, '0') for x in range(1, 7)]
valid_files = [base_path+'2016/yellow_tripdata_2016-'+month+'.csv' for month in months]

In [13]:
#read & clean 2016 data and concat all DFs
df_2016 = modin_omni_pd.concat([
    clean(modin_omni_pd.read_csv(x, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime']), must_haves)
    for x in valid_files], ignore_index=True)
#df_2016 = clean(modin_omni_pd.read_csv(valid_files[0], parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime']), must_haves)

In [14]:
#concatenate multiple DataFrames into one bigger one
taxi_df = modin_omni_pd.concat([df_2014, df_2015, df_2016], ignore_index=True)

In [15]:
# taxi_df = taxi_df.persist()
taxi_df.dtypes

pickup_datetime      datetime64[ns]
dropoff_datetime     datetime64[ns]
passenger_count               int64
trip_distance               float64
pickup_longitude            float64
pickup_latitude             float64
rate_code                     int64
dropoff_longitude           float64
dropoff_latitude            float64
fare_amount                 float64
dtype: object

In [16]:
taxi_df.shape

(380633870, 10)

## Exploratory Data Analysis (EDA)

Here, we are checking out if there are any non-sensical records and outliers, and in such case, we need to remove them from the dataset.

In [17]:
# check out if there is any negative total trip time
taxi_df[taxi_df.dropoff_datetime <= taxi_df.pickup_datetime].head()

Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount
3832,2014-01-09 20:27:41,2014-01-09 19:05:21,1,1.8,0.0,0.0,1,-73.969591,40.789003,9.5
16134,2014-01-09 22:15:40,2014-01-09 22:15:14,1,1.3,0.0,0.0,1,-74.002861,40.760625,7.5
16181,2014-01-09 22:27:13,2014-01-09 22:26:49,1,1.9,0.0,0.0,1,-73.976287,40.741631,8.5
16873,2014-01-10 00:14:23,2014-01-10 00:14:23,1,0.7,-74.005136,40.718694,1,-74.003879,40.728503,4.0
25248,2014-01-10 00:39:16,2014-01-10 00:38:59,1,1.5,0.0,0.0,1,-73.999284,40.733897,6.5


In [18]:
# check out if there is any abnormal data where trip distance is short, but the fare is very high.
taxi_df[(taxi_df.trip_distance < 10) & (taxi_df.fare_amount > 300)].head()

Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount
373953,2014-01-12 15:03:38,2014-01-12 15:05:31,1,0.0,-73.823842,40.690258,5,-73.823847,40.69028,400.0
520658,2014-01-06 23:44:07,2014-01-06 23:45:01,1,0.0,-73.9279,41.677242,5,-73.9279,41.677242,370.0
525419,2014-01-06 23:50:28,2014-01-06 23:51:40,1,0.0,-73.9279,41.677242,5,-73.9279,41.677242,370.0
562916,2014-01-07 12:09:38,2014-01-07 12:10:27,1,0.0,-73.929695,40.812032,5,-73.929695,40.812032,500.0
576409,2014-01-07 16:35:10,2014-01-07 16:35:41,1,0.0,-73.996006,40.720933,5,-73.996007,40.720928,500.0


In [19]:
# check out if there is any abnormal data where trip distance is long, but the fare is very low.
taxi_df[(taxi_df.trip_distance > 50) & (taxi_df.fare_amount < 50)].head()

Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount
11580,2014-01-09 22:22:14,2014-01-09 22:42:59,1,82.1,-74.002175,40.751753,1,-73.964216,40.719554,20.0
33723,2014-01-10 05:43:48,2014-01-10 06:12:20,1,84.2,-73.965053,40.806673,1,-73.957981,40.713139,34.5
122880,2014-01-10 18:16:53,2014-01-10 18:48:53,1,89.9,-73.997185,40.742357,1,-73.948361,40.779271,21.5
226264,2014-01-11 13:22:07,2014-01-11 13:40:16,1,81.7,-73.991685,40.759871,1,-73.994985,40.726054,13.5
290203,2014-01-11 20:12:50,2014-01-11 20:15:15,1,60.4,-73.968109,40.770878,1,-73.972598,40.764607,4.0


In [20]:
#Using only 2016-01 data for visuals.
#taxi_df_cdf = clean(cudf.read_csv(valid_files[0]),must_haves)

#Using entire 2016 data for visualization
#taxi_df_cdf = taxi_df

The plot below visualizes the histogram of trip_distance and we can see some abnormal trip_distance values for some records. Taking this and also the NYC map coordinates into consideration, we will only select records where tripdistance < 500 miles.

In [21]:
%%time
#Histogram using cupy and Holoviews
# frequencies, edges = cupy.histogram(x=cupy.array(taxi_df_cdf["trip_distance"]) , bins=20)
# hist = hv.Histogram((np.array(edges.tolist()), np.array(frequencies.tolist())))

#Histogram using hvplot
#hist = taxi_df_cdf._to_pandas().hvplot.hist("trip_distance", bins=20, bin_range=(0, 10))

#Customizing the plot
#hist.opts(xlabel="trip distance (miles)",ylabel="count",color="green",width=900, height=400)

CPU times: user 2 µs, sys: 1e+03 ns, total: 3 µs
Wall time: 5.48 µs


Similarly, the plot below visualizes the histogram of fare_amount and we can see some abnormal fare_amount values for some records. Taking this and also the NYC map coordinates into consideration, we will only select records where fare_amount < 500$.

In [22]:
%%time
#Histogram using cupy and Holoviews
# frequencies, edges = cupy.histogram(x=cupy.array(taxi_df_cdf["fare_amount"]) , bins=20)
# hist = hv.Histogram((np.array(edges.tolist()), np.array(frequencies.tolist())))

#Histogram using hvplot
#hist = taxi_df_cdf._to_pandas().hvplot.hist("fare_amount", bins=20, bin_range=(0, 50))

#Customizing the plot
#hist.opts(xlabel="fare amount ($)",ylabel="count",color="green",width=900, height=400)

CPU times: user 1e+03 ns, sys: 0 ns, total: 1e+03 ns
Wall time: 4.77 µs


In [23]:
%%time
# Plot the number of passengers per trip. We'll remove the records where passenger_count > 5.
# Plotting using Holoviews
#bar = hv.Bars(taxi_df_cdf.groupby("passenger_count").size().to_frame().rename(columns={0:"count"}))

# Plotting using hvplot
#df_bar = taxi_df_cdf.groupby("passenger_count").size().to_frame().rename(columns={0:"count"}).reset_index()
#bar = df_bar._to_pandas().hvplot.bar(x="passenger_count",y="count")

#Customizing the plot
#bar.opts(color="green",width=900, height=400)

CPU times: user 2 µs, sys: 1e+03 ns, total: 3 µs
Wall time: 4.53 µs


EDA visuals and additional analysis yield the filter logic below.

In [24]:
taxi_df.shape

(380633870, 10)

In [25]:
# apply a list of filter conditions to throw out records with missing or outlier values
taxi_df = taxi_df[
    (taxi_df.fare_amount > 1) &
    (taxi_df.fare_amount < 500) &
    (taxi_df.passenger_count > 0) &
    (taxi_df.passenger_count < 6) &
    (taxi_df.pickup_longitude > -75) &
    (taxi_df.pickup_longitude < -73) &
    (taxi_df.dropoff_longitude > -75) &
    (taxi_df.dropoff_longitude < -73) &
    (taxi_df.pickup_latitude > 40) &
    (taxi_df.pickup_latitude < 42) &
    (taxi_df.dropoff_latitude > 40) &
    (taxi_df.dropoff_latitude < 42) &
    (taxi_df.trip_distance > 0) &
    (taxi_df.trip_distance < 500) &
    ((taxi_df.trip_distance <= 50) | (taxi_df.fare_amount >= 50)) &
    ((taxi_df.trip_distance >= 10) | (taxi_df.fare_amount <= 300)) &
    (taxi_df.dropoff_datetime > taxi_df.pickup_datetime)]

In [26]:
# reset_index and drop index column
taxi_df = taxi_df.reset_index(drop=True)

# Adding Interesting Features

Dask & cuDF provide standard DataFrame operations, but also let you run "user defined functions" on the underlying data. Here we use [dask.dataframe's map_partitions](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions) to apply user defined python function on each DataFrame partition.

We'll use a Haversine Distance calculation to find total trip distance, and extract additional useful variables from the datetime fields.

In [None]:
## add features

#taxi_df['hour'] = taxi_df['pickup_datetime'].dt.hour
#taxi_df['year'] = taxi_df['pickup_datetime'].dt.year
#taxi_df['month'] = taxi_df['pickup_datetime'].dt.month
taxi_df['day'] = taxi_df['pickup_datetime'].dt.day
#taxi_df['day_of_week'] = taxi_df['pickup_datetime'].dt.weekday
#taxi_df['is_weekend'] = (taxi_df['day_of_week']>=5).astype('int32')

#calculate the time difference between dropoff and pickup.
taxi_df['diff'] = taxi_df['dropoff_datetime'].astype('int64') - taxi_df['pickup_datetime'].astype('int64')
taxi_df['diff']=(taxi_df['diff']/1000).astype('int64')

taxi_df['pickup_latitude_r'] = taxi_df['pickup_latitude']//.01*.01
taxi_df['pickup_longitude_r'] = taxi_df['pickup_longitude']//.01*.01
taxi_df['dropoff_latitude_r'] = taxi_df['dropoff_latitude']//.01*.01
taxi_df['dropoff_longitude_r'] = taxi_df['dropoff_longitude']//.01*.01

taxi_df = taxi_df.drop('pickup_datetime', axis=1)
taxi_df = taxi_df.drop('dropoff_datetime', axis=1)

In [28]:
taxi_df.shape

(358360330, 14)

In [29]:
#geo_columns = ['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude']
#radians = {x: np.radians(taxi_df[x]) for x in geo_columns}
#dlon = radians['pickup_longitude'] - radians['dropoff_longitude']
#dlat = radians['pickup_latitude'] - radians['dropoff_latitude']

#taxi_df['h_distance'] = 6367 * 2 * np.arcsin(
#    np.sqrt(
#        np.sin(dlat / 2)**2
#        + np.cos(radians['pickup_latitude']) * np.cos(radians['dropoff_latitude']) * np.sin(dlon / 2)**2))

dlon = taxi_df['dropoff_longitude'] - taxi_df['pickup_longitude']
dlat = taxi_df['dropoff_latitude'] - taxi_df['pickup_latitude']
taxi_df['m_distance'] = dlon * dlon + dlat * dlat

In [30]:
taxi_df.dtypes

passenger_count          int64
trip_distance          float64
pickup_longitude       float64
pickup_latitude        float64
rate_code                int64
dropoff_longitude      float64
dropoff_latitude       float64
fare_amount            float64
day                      int64
diff                     int64
pickup_latitude_r      float64
pickup_longitude_r     float64
dropoff_latitude_r     float64
dropoff_longitude_r    float64
m_distance             float64
dtype: object

In [31]:
taxi_df.shape

(358360330, 15)

# Pick a Training Set

Let's imagine you're making a trip to New York on the 25th and want to build a model to predict what fare prices will be like the last few days of the month based on the first part of the month. We'll use a query expression to identify the `day` of the month to use to divide the data into train and test sets.

The wall-time below represents how long it takes your GPU cluster to load data from the Google Cloud Storage bucket and the ETL portion of the workflow.

In [32]:
#since we calculated the h_distance let's drop the trip_distance column, and then do model training with XGB.
taxi_df = taxi_df.drop('trip_distance', axis=1)

In [None]:
# this is the original data partition for train and test sets.
X_train = taxi_df[taxi_df.day < 25]

# create a Y_train ddf with just the target variable
Y_train = X_train[['fare_amount']]
# drop the target variable from the training ddf
X_train = X_train[X_train.columns.difference(['fare_amount'])]

# Train the XGBoost Regression Model

The wall time output below indicates how long it took your GPU cluster to train an XGBoost model over the training set.

In [None]:
X_train.shape

In [None]:
X_train.dtypes

In [None]:
Y_train.shape

In [None]:
dtrain = xgb.DMatrix(X_train, Y_train)

In [None]:
%%time

trained_model = xgb.train({
    'learning_rate': 0.3,
    'max_depth': 8,
    'objective': 'reg:squarederror',
    'subsample': 0.6,
    'gamma': 1,
    'silent': True,
    'verbose_eval': True,
    'tree_method':'hist'
    },
    dtrain,
    num_boost_round=100, evals=[(dtrain, 'train')])

In [None]:
ax = xgb.plot_importance(trained_model, height=0.8, max_num_features=10, importance_type="gain")
ax.grid(False, axis="y")
ax.set_title('Estimated feature importance')
ax.set_xlabel('importance')
plt.show()

# How Good is Our Model?

Now that we have a trained model, we need to test it with the 25% of records we held out.

Based on the filtering conditions applied to this dataset, many of the DataFrame partitions will wind up having 0 rows. This is a problem for XGBoost which doesn't know what to do with 0 length arrays. We'll repartition the data. 

In [None]:
X_test = taxi_df[taxi_df.day >= 25]

# Create Y_test with just the fare amount
Y_test = X_test[['fare_amount']]

# Drop the fare amount from X_test
X_test = X_test[X_test.columns.difference(['fare_amount'])]

# display test set size
X_test.shape

## Calculate Prediction

In [None]:
# generate predictions on the test set
'''feed X_test as a dask.dataframe'''

booster = trained_model
# history = trained_model['history'] # "History" is a dictionary containing evaluation results 

# booster.set_param({'predictor': 'gpu_predictor'})

prediction = modin_omni_pd.Series(booster.predict(xgb.DMatrix(X_test)))

prediction.shape

In [None]:
# prediction = prediction.map_partitions(lambda part: cudf.Series(part)).reset_index(drop=True)
actual = Y_test['fare_amount'].reset_index(drop=True)

NOTE: We mapped each partition of the result from `xgb.dask.predict` into `cudf.Series` to be able to substract it from `actual` data. Here is the issue asking XGBoost to solve that before returning data from `xgb.dask.predict` https://github.com/dmlc/xgboost/issues/5823#issuecomment-648526888

In [None]:
prediction.head()

In [None]:
actual.head()

In [None]:
# Calculate RMSE
squared_error = ((prediction-actual)**2)

# compute the actual RMSE over the full test set
np.sqrt(squared_error.mean())

## Normalize data

In [None]:
from sklearn.preprocessing import MinMaxScaler, StandardScaler
scaler_x = MinMaxScaler()
scaler_y = StandardScaler()

In [None]:
scaler_x.fit(X_train)
X_train = scaler_x.transform(X_train)
X_test = scaler_x.transform(X_test)

In [None]:
scaler_y.fit(Y_train.to_numpy().reshape(-1, 1))
Y_train = scaler_y.transform(Y_train.to_numpy().reshape(-1, 1)).ravel()
Y_test = scaler_y.transform(Y_test.to_numpy().reshape(-1, 1)).ravel()

# Train Ridge regression model with  Intel® Extension for Scikit-learn*

In [None]:
from sklearnex import patch_sklearn
patch_sklearn()

In [None]:
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error

In [None]:
%%time
model_r_p = Ridge(random_state=123)
model_r_p.fit(X_train, Y_train)

# Train Ridge regression model without  Intel® Extension for Scikit-learn*

In [None]:
from sklearnex import unpatch_sklearn
unpatch_sklearn()

In [None]:
from sklearn.linear_model import Ridge

In [None]:
%%time
model_r_s = Ridge(random_state=123)
model_r_s.fit(X_train, Y_train)

# Let's look MSE metric of Our Models

In [None]:
y_pred_p = model_r_p.predict(X_test)
y_pred_s = model_r_s.predict(X_test)

mse_r_p = mean_squared_error(Y_test, y_pred_p)
mse_r_s = mean_squared_error(Y_test, y_pred_s)

print(f'MSE of pached Ridge: {mse_r_p}')
print(f'MSE of unpached Ridge: {mse_r_s}')

As we can see the model with Intel® Extension for Scikit-learn * learns much faster and has the same mse.