# Predicting NYC Taxi Fares with RAPIDS

[RAPIDS](https://rapids.ai/) is a suite of GPU accelerated data science libraries with APIs that should be familiar to users of Pandas, Dask, and Scikitlearn.

Anaconda has graciously made some of the NYC Taxi dataset available in [a public Google Cloud Storage bucket](https://console.cloud.google.com/storage/browser/anaconda-public-data/nyc-taxi/csv/). We have already cleaned up the column names a bit and re-saved a single month of the data. (For a larger-scale exercise, consider adapting this script to handle a full year via Dask!) We saved the converted version in an S3 bucket here: https://odsc-sample-data.s3-us-west-2.amazonaws.com/yellow_tripdata_2014-03-cleaned.orc.

This notebook builds a simple data pipeline to load the data with cuDF (or Pandas), analyze it with cuML (or scikit-learn), and then try some steps with multiple GPUs.

In [None]:
import numpy as np
import pandas as pd
import cuml
import cudf
import os

# Demo 1: Inspecting the Data

Let's start with a familiar Pandas approach then port it to RAPIDS in parallel. Note that both Pandas and cuDF can read directly from an S3 bucket. If you're going to re-run this example many times or run outside of AWS, consider saving the file locally (it's about 400MB).

In [None]:
%%time
# Pandas

df = pd.read_orc('yellow_tripdata_2014-03-cleaned.orc') # Alternative for local read
# df = pd.read_orc('https://odsc-sample-data.s3-us-west-2.amazonaws.com/yellow_tripdata_2014-03-cleaned.orc')
df.head()

In [None]:
%%time
# TODO: Read the CSV with cudf into 'gdf' and display the first few rows

# *** Answer: ***

gdf = cudf.read_orc('yellow_tripdata_2014-03-cleaned.orc')
# gdf = cudf.read_orc('https://odsc-sample-data.s3-us-west-2.amazonaws.com/yellow_tripdata_2014-03-cleaned.orc')


gdf.head()

# Look at some key stats

When passing data to functions that expect Pandas DataFrames, we just use ".to_pandas()"

In [None]:
import seaborn as sns

sns.boxplot(x="passenger_count", y="fare_amount", data=gdf.to_pandas())

In [None]:
sns.lmplot(x="pickup_longitude", y="pickup_latitude",
           data=gdf.head(100000).to_pandas(),
           fit_reg=False,
           x_jitter=0.01, y_jitter=0.01)

# Filter the data

Ok, there are some WEIRD latitudes and longitudes in that data. Let's filter to just sane stuff.

In [None]:
%%time

# apply a list of filter conditions to throw out records with missing or outlier values
query_frags = [
    'fare_amount > 0 and fare_amount < 500',
    'passenger_count > 0 and passenger_count < 6',
    'pickup_longitude > -75.0 and pickup_longitude < -73.0',
    'dropoff_longitude > -75 and dropoff_longitude < -73',
    'pickup_latitude > 40 and pickup_latitude < 42',
    'dropoff_latitude > 40 and dropoff_latitude < 42'
]
df_subset = df.query(' and '.join(query_frags)).copy()

# inspect the results of cleaning
df_subset.head()

In [None]:
%%time

# TODO: RAPIDS version with "gdf_subset" as output

# *** Answer:
gdf_subset = gdf.query(' and '.join(query_frags)).copy()
gdf_subset.head()

# Demo 2: UDFs to add rich features (more advanced topic)

cuDF provides standard DataFrame operations, but also let you run "user defined functions" on the underlying data. For simple operations on a single column, you can just pass in a standard Python function or lambda.

cuDF's [apply_rows](https://docs.rapids.ai/api/cudf/stable/guide-to-udfs.html#DataFrame-UDFs) operation is similar to Pandas's [DataFrame.apply](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.apply.html), except that for cuDF, custom Python code is [JIT compiled by numba](https://numba.pydata.org/numba-doc/dev/cuda/kernels.html) into GPU kernels. This allows you to port complex functions that use multiple columns from the DataFrame.

We'll use a Haversine Distance calculation to find total trip distance, and extract additional useful variables from the datetime fields.

In [None]:
from numpy import pi

def haversine_distance_kernel_cpu(row):
    x_1, y_1, x_2, y_2 = (row["pickup_latitude"], row["pickup_longitude"], row["dropoff_latitude"], row["dropoff_longitude"])
    x_1 = pi/180 * x_1
    y_1 = pi/180 * y_1
    x_2 = pi/180 * x_2
    y_2 = pi/180 * y_2

    dlon = y_2 - y_1
    dlat = x_2 - x_1
    a = np.sin(dlat/2)**2 + np.cos(x_1) * np.cos(x_2) * np.sin(dlon/2)**2

    c = 2 * np.arcsin(np.sqrt(a)) 
    r = 6371 # Radius of earth in kilometers
        
    return c * r
        
def add_features(df):
    df['hour'] = df['pickup_datetime'].dt.hour
    df['year'] = df['pickup_datetime'].dt.year
    df['month'] = df['pickup_datetime'].dt.month
    df['day'] = df['pickup_datetime'].dt.day
    
    df['pickup_latitude_r'] = df['pickup_latitude']//.01*.01
    df['pickup_longitude_r'] = df['pickup_longitude']//.01*.01
    df['dropoff_latitude_r'] = df['dropoff_latitude']//.01*.01
    df['dropoff_longitude_r'] = df['dropoff_longitude']//.01*.01
    df["day_of_week"] = df["pickup_datetime"].dt.dayofweek
    
    df = df.drop(columns=['pickup_datetime', 'dropoff_datetime'])    
    
    df['is_weekend'] = (df['day_of_week'] >= 5).astype(np.int32)
    return df

In [None]:
%%time

# actually add the features
taxi_df = add_features(df_subset)

In [None]:
%%time
# compute distance
taxi_df["h_distance"] = haversine_distance_kernel_cpu(taxi_df)

## cuDF version with UDF

cuDF's [apply_rows](https://rapidsai.github.io/projects/cudf/en/0.6.0/api.html#cudf.dataframe.DataFrame.apply_rows) operation is similar to Pandas's [DataFrame.apply](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.apply.html), except that for cuDF, custom Python code is [JIT compiled by numba](https://numba.pydata.org/numba-doc/dev/cuda/kernels.html) into GPU kernels.

In [None]:
import math
from math import cos, sin, asin, sqrt, pi

def haversine_distance_kernel_gpu(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude, h_distance):
    for i, (x_1, y_1, x_2, y_2) in enumerate(zip(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude)):
        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_2 = pi/180 * x_2
        y_2 = pi/180 * y_2
        
        dlon = y_2 - y_1
        dlat = x_2 - x_1
        a = sin(dlat/2)**2 + cos(x_1) * cos(x_2) * sin(dlon/2)**2
        
        c = 2 * asin(sqrt(a)) 
        r = 6371 # Radius of earth in kilometers
        
        h_distance[i] = c * r
  
def add_features_gpu(df):
    df['hour'] = df['pickup_datetime'].dt.hour
    df['year'] = df['pickup_datetime'].dt.year
    df['month'] = df['pickup_datetime'].dt.month
    df['day'] = df['pickup_datetime'].dt.day
    
    df['pickup_latitude_r'] = df['pickup_latitude']//.01*.01
    df['pickup_longitude_r'] = df['pickup_longitude']//.01*.01
    df['dropoff_latitude_r'] = df['dropoff_latitude']//.01*.01
    df['dropoff_longitude_r'] = df['dropoff_longitude']//.01*.01
    df["day_of_week"] = df["pickup_datetime"].dt.dayofweek
    
    df = df.drop(columns=['pickup_datetime', 'dropoff_datetime'])    
 
    df['is_weekend'] = (df['day_of_week'] >= 5).astype(np.int32)
    return df

def compute_distance_gpu(df):
    df = df.apply_rows(haversine_distance_kernel_gpu,
                   incols=['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude'],
                   outcols=dict(h_distance=np.float32),
                   kwargs=dict())
    return df

In [None]:
%%time

# TODO: actually add the features and create "taxi_gdf" from gdf_subset

# Answer
taxi_gdf = add_features_gpu(gdf_subset)

In [None]:
%%time

# TODO: add the distance calculation

# *** Answer
taxi_gdf = compute_distance_gpu(taxi_gdf)

For more advanced spatial calculations, check out cuSpatial (https://medium.com/rapids-ai/releasing-cuspatial-to-accelerate-geospatial-and-spatiotemporal-processing-b686d8b32a9), the newest RAPIDS library.

In [None]:
%%time

# TODO: Print summary stats from "taxi_gdf"

# *** Answer:
taxi_gdf.describe()

# Demo 4: Cluster and analyze with cuML

In [None]:
%matplotlib inline

In [None]:
import sklearn, sklearn.cluster
from matplotlib import pyplot as plt

In [None]:
%%time
# use scikit-learn on CPU
n_samples = len(taxi_df)
from sklearn.preprocessing import StandardScaler

sk_kmeans = sklearn.cluster.KMeans(n_clusters=5)
scaler = StandardScaler()
taxi_subset = taxi_df.iloc[:n_samples]
#train_clusters_cpu = sk_kmeans.fit_predict(
#    scaler.fit_transform(taxi_subset))

In [None]:
%%time
# TODO: use cuML on GPU to fit KMeans with 5 clusters (larger dataset)
# Note that preprocessing is an experimental module for cuML in 0.16

# *** Answer:
import cuml.cluster
from cuml.experimental.preprocessing import StandardScaler
cu_kmeans = cuml.cluster.KMeans(n_clusters=5)
scaler = StandardScaler()
train_clusters_gpu = cu_kmeans.fit_predict(
    scaler.fit_transform(taxi_gdf))

In [None]:
# Just take a subset to speed plotting
gdf_head = taxi_gdf.iloc[:400000]
gdf_head["cluster"] = train_clusters_gpu[:400000]
gdf_head["short_trip"] = gdf_head["trip_distance"] < 1.01 # About the 25th percentile
gdf_head["is_rush_est"] = ((gdf_head.hour >= 10) & (gdf_head.hour <= 14)) | \
                                ((gdf_head.hour >= 21) & (gdf_head.hour <= 24))

# actually do the plot
sns.lmplot("pickup_longitude", "pickup_latitude", data=gdf_head.to_pandas(),
           hue="cluster", col="is_rush_est", row="short_trip", fit_reg=False, scatter_kws={"s": 10})

# Pick a Training Set

Let's imagine we want to be able predict fare prices based on the available data. We'll start by splitting the data into train and test subsets.

In [None]:
%%time
from sklearn.model_selection import train_test_split as sk_train_test_split

X_np = taxi_df.drop(columns='fare_amount').to_numpy()
Y_np = taxi_df[["fare_amount"]].to_numpy()

X_train_np, X_test_np, Y_train_np, Y_test_np = sk_train_test_split(X_np, Y_np, test_size=0.2)

In [None]:
%%time
from cuml.preprocessing.model_selection import train_test_split as cu_train_test_split

X_gpu = taxi_gdf.drop(columns='fare_amount').values.astype("float32")
Y_gpu = taxi_gdf[["fare_amount"]].values.astype("float32")

X_col_names = taxi_gdf.columns.tolist()
X_col_names.remove('fare_amount')

X_train_gpu, X_test_gpu, Y_train_gpu, Y_test_gpu = cu_train_test_split(X_gpu,
                                                                        Y_gpu,
                                                                        test_size=0.2)

# Fit a simple supervised model with cuML

cuML supports a large range of supervised models, all emulating the scikit-learn interfaces. See the README (https://github.com/rapidsai/cuml) for a recent list. Here, we'll try a very simple model - an ElasticNet regularized regression with both L1 and L2 regularization. As a user exercise, try replacing this with a RandomForestRegressor or a simpler LinearRegression.

In [None]:
from sklearn.linear_model import ElasticNet as skElasticNet

In [None]:
%%time
# Sklearn will parallelize over all CPU cores with n_jobs=-1
sk_model = skElasticNet()
sk_model.fit(X_train_np, Y_train_np)

In [None]:
from sklearn.metrics.regression import r2_score as sk_r2_score

print("Out-of-sample (test) R2: ", sk_r2_score(Y_test_np, sk_model.predict(X_test_np)))

In [None]:
%%time

# TODO: Build a similar model on GPU with cuML

# *** Answer:
from cuml.linear_model import ElasticNet as cuElasticNet
cu_model = cuElasticNet(alpha=0.1)
cu_model.fit(X_train_gpu, Y_train_gpu)

In [None]:
%%time

# TODO: Predict on the test set and evaluate the predictions' R2 score

# *** Answer
from cuml.metrics.regression import r2_score

Y_hat_gpu = cu_model.predict(X_test_gpu)
r2_score(Y_test_gpu, Y_hat_gpu)

# Demo 5: Train an  XGBoost Regression Model

XGBoost is one of the most popular packages for gradient boosted decision trees. It comes with excellent GPU acceleration out of the box.

In [None]:
%%time
# Train on CPU (uses all CPUs by default)
import xgboost

params = {
 'learning_rate': 0.3,
  'max_depth': 6,

  'subsample': 0.6,
  'gamma': 1
}

train_dmat = xgboost.DMatrix(X_train_np, Y_train_np, feature_names=X_col_names)
print("Converted to dmatrix")
trained_model = xgboost.train(params, train_dmat, num_boost_round=5)

In [None]:
%%time

# TODO: retrain on GPU for more rounds, saving model as trained_model_gpus

# *** Answer:
params = {
 'learning_rate': 0.3,
  'max_depth': 6,
  'tree_method': 'gpu_hist',
  'subsample': 0.6,
  'gamma': 1
}

train_dmat = xgboost.DMatrix(X_train_gpu, Y_train_gpu, feature_names=X_col_names)
print("Converted to dmatrix")
trained_model_gpu = xgboost.train(params, train_dmat, num_boost_round=50)

# How Good is Our Model?

Now that we have a trained model, we need to test it with the ecords we held out.Y_train_gpu

In [None]:
%%time

# TODO: generate predictions on the test set as Y_test_prediction

# *** Answer:
# Note that we can pass in a cuDF dataframe without conversion
test_dmat = xgboost.DMatrix(X_test_np, feature_names=X_col_names)

Y_test_prediction = trained_model_gpu.predict(test_dmat)

# Compute Root Mean Squared Error

In [1]:
# TODO: compute RMSE

# Save Trained Model for Later Use

To make a model maximally useful, you need to be able to save it for later use.

In [None]:
# Save model
trained_model_gpu.save_model("output.model")