# Predicting NYC Taxi Fares with RAPIDS

[RAPIDS](https://rapids.ai/) is a suite of GPU accelerated data science libraries with APIs that should be familiar to users of Pandas, Dask, and Scikitlearn.

Anaconda has graciously made some of the NYC Taxi dataset available in [a public Google Cloud Storage bucket](https://console.cloud.google.com/storage/browser/anaconda-public-data/nyc-taxi/csv/).

This notebook builds a simple data pipeline to load the data with cuDF (or Pandas), analyze it with cuML (or scikit-learn), and then try some steps with multiple GPUs.

In [None]:
import numpy as np
import pandas as pd
import cuml
import cudf
import os

# Inspecting the Data

Let's start with a familiar Pandas approach then port it to RAPIDS in parallel

In [None]:
!ls ../../data/nyc-taxi/2014

In [None]:
base_path = '../../data/nyc-taxi/'

In [None]:
%%time
# Pandas

df_2014 = pd.read_csv(base_path+'2014/yellow_tripdata_2014-03.csv')
df_2014.head()

In [None]:
%%time
# TODO: Read the CSV with cudf into gdf_2014


# Data Cleanup

As usual, the data needs to be massaged a bit before we can start adding features that are useful to an ML model.

For example, in the 2014 taxi CSV files, there are `pickup_datetime` and `dropoff_datetime` columns.

Also, some CSV files have column names with extraneous spaces in them.

We'll do a little string manipulation, column renaming, and concatenating of DataFrames to sidestep the problems.

In [None]:
# list of column names that need to be re-mapped
remap = {}
remap['tpep_pickup_datetime'] = 'pickup_datetime'
remap['tpep_dropoff_datetime'] = 'dropoff_datetime'
remap['ratecodeid'] = 'rate_code'

# create a list of columns & dtypes the df must have
# note that float64 will be significantly slower on some GPUs (most GeForce, also Tesla T4)
must_haves = {
 'pickup_datetime': 'datetime64[ms]',
 'dropoff_datetime': 'datetime64[ms]',
 'passenger_count': 'int32',
 'trip_distance': 'float',
 'pickup_longitude': 'float',
 'pickup_latitude': 'float',
 'rate_code': 'int32',
 'dropoff_longitude': 'float',
 'dropoff_latitude': 'float',
 'fare_amount': 'float'
}

In [None]:
# helper function which takes a DataFrame and fixes column types
def clean_columns(df_part, remap, must_haves, float_type):    
    # iterate through columns in this df
    for col in df_part.columns:
        # drop anything not in our expected list
        if col not in must_haves:
            print(f"Dropping ({col})")
            df_part = df_part.drop(columns=col)
            continue
        
        if df_part[col].dtype == 'object' and col in ['pickup_datetime', 'dropoff_datetime']:
            df_part[col] = df_part[col].astype('datetime64[ms]')
            continue
                
        # if column was read as a string, recast as float
        if df_part[col].dtype == 'object':
            df_part[col] = df_part[col].str.fillna('-1')
            df_part[col] = df_part[col].astype('float32')
        else:
            # downcast from 64bit to 32bit types
            # Tesla T4 are faster on 32bit ops
            if 'int' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('int32')
            if 'float' in str(df_part[col].dtype):
                # CPU-based pandas has a bug preventing query use with fp32 columns
                df_part[col] = df_part[col].astype(float_type)
            df_part[col] = df_part[col].fillna(-1)
    
    return df_part

In [None]:
%%time
# Pandas approach

# some col-names include pre-pended spaces remove & lowercase column names
col_cleanup = {col: col.strip().lower() for col in list(df_2014.columns)}
df = df_2014.rename(columns=col_cleanup)
# rename columns using the supplied mapping
df = df.rename(remap)

df = clean_columns(df, remap, must_haves, np.float64)
print(df.__class__)
print(df.head(1))

In [None]:
%%time
# TODO: RAPIDS approach - same as Pandas, but generate 'gdf' as output


# Look at some key stats

In [None]:
import seaborn as sns

In [None]:
sns.boxplot(x="passenger_count", y="fare_amount", data=gdf.to_pandas())

In [None]:
sns.lmplot(x="pickup_longitude", y="pickup_latitude",
           data=gdf.head(100000).to_pandas(),
           fit_reg=False,
           x_jitter=0.01, y_jitter=0.01)

# Filter the data

In [None]:
%%time

# apply a list of filter conditions to throw out records with missing or outlier values
query_frags = [
    'fare_amount > 0 and fare_amount < 500',
    'passenger_count > 0 and passenger_count < 6',
    'pickup_longitude > -75.0 and pickup_longitude < -73.0',
    'dropoff_longitude > -75 and dropoff_longitude < -73',
    'pickup_latitude > 40 and pickup_latitude < 42',
    'dropoff_latitude > 40 and dropoff_latitude < 42'
]
df_subset = df.query(' and '.join(query_frags)).copy()

# inspect the results of cleaning
df_subset.head()

In [None]:
%%time

# TODO: RAPIDS version with "gdf_subset" as output


# Demo 3: UDFs to add rich features 

cuDF provides standard DataFrame operations, but also let you run "user defined functions" on the underlying data.

cuDF's [apply_rows](https://rapidsai.github.io/projects/cudf/en/0.6.0/api.html#cudf.dataframe.DataFrame.apply_rows) operation is similar to Pandas's [DataFrame.apply](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.apply.html), except that for cuDF, custom Python code is [JIT compiled by numba](https://numba.pydata.org/numba-doc/dev/cuda/kernels.html) into GPU kernels.

We'll use a Haversine Distance calculation to find total trip distance, and extract additional useful variables from the datetime fields.

In [None]:
from numpy import pi

def haversine_distance_kernel_cpu(row):
    x_1, y_1, x_2, y_2 = (row["pickup_latitude"], row["pickup_longitude"], row["dropoff_latitude"], row["dropoff_longitude"])
    x_1 = pi/180 * x_1
    y_1 = pi/180 * y_1
    x_2 = pi/180 * x_2
    y_2 = pi/180 * y_2

    dlon = y_2 - y_1
    dlat = x_2 - x_1
    a = np.sin(dlat/2)**2 + np.cos(x_1) * np.cos(x_2) * np.sin(dlon/2)**2

    c = 2 * np.arcsin(np.sqrt(a)) 
    r = 6371 # Radius of earth in kilometers
        
    return c * r
        
def add_features(df):
    df['hour'] = df['pickup_datetime'].dt.hour
    df['year'] = df['pickup_datetime'].dt.year
    df['month'] = df['pickup_datetime'].dt.month
    df['day'] = df['pickup_datetime'].dt.day
    
    df['pickup_latitude_r'] = df['pickup_latitude']//.01*.01
    df['pickup_longitude_r'] = df['pickup_longitude']//.01*.01
    df['dropoff_latitude_r'] = df['dropoff_latitude']//.01*.01
    df['dropoff_longitude_r'] = df['dropoff_longitude']//.01*.01
    df["day_of_week"] = df["pickup_datetime"].dt.dayofweek
    
    df = df.drop(columns=['pickup_datetime', 'dropoff_datetime'])    
    
    df['is_weekend'] = (df['day_of_week'] >= 5).astype(np.int32)
    return df

In [None]:
%%time

# actually add the features
taxi_df = add_features(df_subset)

In [None]:
%%time
# compute distance
taxi_df["h_distance"] = haversine_distance_kernel_cpu(taxi_df)

## cuDF version with UDF

cuDF's [apply_rows](https://rapidsai.github.io/projects/cudf/en/0.6.0/api.html#cudf.dataframe.DataFrame.apply_rows) operation is similar to Pandas's [DataFrame.apply](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.apply.html), except that for cuDF, custom Python code is [JIT compiled by numba](https://numba.pydata.org/numba-doc/dev/cuda/kernels.html) into GPU kernels.

In [None]:
import math
from math import cos, sin, asin, sqrt, pi

def haversine_distance_kernel_gpu(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude, h_distance):
    for i, (x_1, y_1, x_2, y_2) in enumerate(zip(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude)):
        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_2 = pi/180 * x_2
        y_2 = pi/180 * y_2
        
        dlon = y_2 - y_1
        dlat = x_2 - x_1
        a = sin(dlat/2)**2 + cos(x_1) * cos(x_2) * sin(dlon/2)**2
        
        c = 2 * asin(sqrt(a)) 
        r = 6371 # Radius of earth in kilometers
        
        h_distance[i] = c * r

def day_of_the_week_kernel(day, month, year, day_of_week):
    for i, (d_1, m_1, y_1) in enumerate(zip(day, month, year)):
        if month[i] <3:
            shift = month[i]
        else:
            shift = 0
        Y = year[i] - (month[i] < 3)
        y = Y - 2000
        c = 20
        d = day[i]
        m = month[i] + shift + 1
        day_of_week[i] = (d + math.floor(m*2.6) + y + (y//4) + (c//4) -2*c)%7
        
def add_features_gpu(df):
    df['hour'] = df['pickup_datetime'].dt.hour
    df['year'] = df['pickup_datetime'].dt.year
    df['month'] = df['pickup_datetime'].dt.month
    df['day'] = df['pickup_datetime'].dt.day
    
    df['pickup_latitude_r'] = df['pickup_latitude']//.01*.01
    df['pickup_longitude_r'] = df['pickup_longitude']//.01*.01
    df['dropoff_latitude_r'] = df['dropoff_latitude']//.01*.01
    df['dropoff_longitude_r'] = df['dropoff_longitude']//.01*.01
    # df["day_of_week"] = df["pickup_datetime"].dt.dayofweek
    # cuDF does not support dayofweek yet, coming soon though: https://github.com/rapidsai/cudf/pull/2814
    
    #
    # Auto-JIT kernels
    #
    df = df.apply_rows(day_of_the_week_kernel,
                      incols=['day', 'month', 'year'],
                      outcols=dict(day_of_week=np.float32),
                      kwargs=dict())
    df = df.drop(columns=['pickup_datetime', 'dropoff_datetime'])    

 
    df['is_weekend'] = (df['day_of_week'] >= 5).astype(np.int32)
    return df

def compute_distance_gpu(df):
    df = df.apply_rows(haversine_distance_kernel_gpu,
                   incols=['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude'],
                   outcols=dict(h_distance=np.float32),
                   kwargs=dict())
    return df

In [None]:
%%time

# TODO: actually add the features and create "taxi_gdf" from gdf_subset

In [None]:
%%time

# TODO: add the distance calculation

For more advanced spatial calculations, check out cuSpatial (https://medium.com/rapids-ai/releasing-cuspatial-to-accelerate-geospatial-and-spatiotemporal-processing-b686d8b32a9), the newest RAPIDS library.

In [None]:
# TODO: Print summary stats from "taxi_gdf"

# Pick a Training Set

Let's imagine you're making a trip to New York on the 24th and want to build a model to predict what fare prices will be like the last few days of the month based on the first part of the month. We'll use a query expression to identify the `day` of the month to use to divide the data into train and test sets.

The wall-time below represents how long it takes your GPU cluster to load data from the Google Cloud Storage bucket and the ETL portion of the workflow.

In [None]:
%%time
X_train_df = taxi_df.query('day < 24')
Y_train_df = X_train_df[['fare_amount']]
X_train_df = X_train_df.drop(columns='fare_amount')

# numpy versions for sklearn
X_train_np = X_train_df.to_numpy()
Y_train_np = Y_train_df.to_numpy()
len(X_train_df)

In [None]:
from cuml.utils import input_utils # Helpful functions for managing gpu arrays

In [None]:
%%time
X_train_gdf = taxi_gdf.query('day < 24')
Y_train_gdf = X_train_gdf[['fare_amount']]
X_train_gdf = X_train_gdf.drop(columns='fare_amount')

# gpu matrix versions for cuml
X_train_gpu = input_utils.input_to_dev_array(X_train_gdf, convert_to_dtype=np.float32).array
Y_train_gpu = input_utils.input_to_dev_array(Y_train_gdf, convert_to_dtype=np.float32).array

# test versions
X_test_gdf = taxi_gdf.query('day >= 24')
Y_test_gdf = X_test_gdf[['fare_amount']]
X_test_gdf = X_test_gdf.drop(columns='fare_amount')

X_test_gpu = input_utils.input_to_dev_array(X_test_gdf, convert_to_dtype=np.float32).array
Y_test_gpu = input_utils.input_to_dev_array(Y_test_gdf, convert_to_dtype=np.float32).array


len(X_train_gdf)

# Demo 4: Cluster and analyze with cuML

In [None]:
%matplotlib inline

In [None]:
import sklearn, sklearn.cluster
from matplotlib import pyplot as plt

In [None]:
%%time
# use scikit-learn on CPU

sk_kmeans = sklearn.cluster.KMeans(n_clusters=5, n_jobs=-1)
train_clusters_cpu = sk_kmeans.fit_predict(X_train_np[:200000,:])

In [None]:
%%time
n_samples = 400000

# TODO: use cuML on GPU to fit KMeans with 5 clusters (larger dataset)

In [None]:
# Just take a subset to speed plotting
gdf_train_head = X_train_gdf.iloc[:400000]
gdf_train_head["cluster"] = train_clusters_gpu[:400000]
gdf_train_head["short_trip"] = gdf_train_head["trip_distance"] < 1.01 # About the 25th percentile
gdf_train_head["is_rush_est"] = ((gdf_train_head.hour >= 10) & (gdf_train_head.hour <= 14)) | \
                                ((gdf_train_head.hour >= 21) & (gdf_train_head.hour <= 24))

# actually do the plot
sns.lmplot("pickup_longitude", "pickup_latitude", data=gdf_train_head.to_pandas(),
           hue="cluster", col="is_rush_est", row="short_trip", fit_reg=False, scatter_kws={"s": 10})

### Fit a simple supervised model with cuML

cuML supports a large range of supervised models, all emulating the scikit-learn interfaces. See the README (https://github.com/rapidsai/cuml) for a recent list. Here, we'll try a very simple model - a regularized linear regression with the ElasticNet approach that blends L1 and L2 penalties.

In [None]:
from sklearn.linear_model import ElasticNet as skElastic
from cuml.linear_model import ElasticNet as cuElastic

In [None]:
%%time
# Sklearn will parallelize over all CPU cores with n_jobs=-1
sk_model = skElastic(alpha=0.1)
sk_model.fit(X_train_np, Y_train_np)

In [None]:
%%time

# TODO: Build a similar model on GPU with cuML

In [None]:
%%time

# Predict on a test set (storing as "enet_predictions" in the Y_test_gdf df)
# and evaluate the predictions' R2 score

# Demo 5: Train an  XGBoost Regression Model

XGBoost is one of the most popular packages for gradient boosted decision trees. It comes with excellent GPU acceleration out of the box.

In [None]:
%%time
# Train on CPU (uses all CPUs by default)
import xgboost

params = {
 'learning_rate': 0.3,
  'max_depth': 6,

  'subsample': 0.6,
  'gamma': 1
}

train_dmat = xgboost.DMatrix(X_train_np, Y_train_np, feature_names=X_train_df.columns)
print("Converted to dmatrix")
trained_model = xgboost.train(params, train_dmat, num_boost_round=5)

In [None]:
%%time
# TODO: retrain on GPU for more rounds, saving model as trained_model_gpus

# How Good is Our Model?

Now that we have a trained model, we need to test it with the ecords we held out.

In [None]:
%%time

# TODO: generate predictions on the test set as Y_test_gdf['prediction']

# Compute Root Mean Squared Error

In [None]:
Y_test_gdf["squared_error"] = (Y_test_gdf['prediction'] - Y_test_gdf['fare_amount'])**2

# inspect the results to make sure our calculation looks right
Y_test_gdf.head()

In [None]:
# compute the actual RMSE over the full test set
np.sqrt(Y_test_gdf.squared_error.mean())

# Save Trained Model for Later Use

To make a model maximally useful, you need to be able to save it for later use.

In [None]:
# Save model
trained_model_gpu.save_model("output.model")

# Demo 6: A quick intro to Dask + RAPIDS

Dask is a sophisticated package for parallel computation with a number of different datatypes. For much more detail, see: https://tutorial.dask.org/

In these examples, we'll focus on the basics of `dask_cudf` and `dask_cuda`

In [None]:
import dask_cudf

In [None]:
import dask, dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait

In [None]:
# Setup a cluster and connect a client to it

cluster = LocalCUDACluster()
client = Client(cluster)

In [None]:
client

In [None]:
%%time
ddf_all2014 = dask_cudf.read_csv(base_path+'2014/yellow_*.csv')

In [None]:
len(ddf_all2014)

In [None]:
# Redo the same cleanup we used above but on Dask this time

col_cleanup = {col: col.strip().lower() for col in list(df_2014.columns)}
ddf = ddf_all2014.rename(columns=col_cleanup)
# rename columns using the supplied mapping
ddf = ddf.rename(columns=remap)

ddf = clean_columns(ddf, remap, must_haves, np.float64)

In [None]:
# Compute a simple histogram of passengers

value_counts = ddf.passenger_count.value_counts()
print(value_counts)
print(value_counts.compute())

## Machine learning with Dask

See also XGBoost's Dask interface docs: https://github.com/dmlc/xgboost/tree/master/demo/dask

In [None]:
# Use map_partitions to apply the same distance function we used before
ddf_with_dist = ddf.map_partitions(compute_distance_gpu)
ddf_with_dist.head()

In [None]:
kmeans_cols = ["passenger_count", "trip_distance", "rate_code", "fare_amount", "h_distance"]

In [None]:
%%time
X_ddf = ddf_with_dist[kmeans_cols]
for c in X_ddf.columns:
    X_ddf[c] = X_ddf[c].astype(np.float32)
Y_ddf = X_ddf["fare_amount"]
X_ddf = X_ddf.drop(columns="fare_amount")

X_ddf, y_ddf = client.persist([X_ddf, Y_ddf]) # Trigger the computation and cache in RAM
_ = wait([X_ddf, y_ddf]) # Actually wait for persistence to finish

In [None]:
import xgboost
from xgboost.dask import DaskDMatrix

In [None]:
%%time

params = {'verbosity': 2, 'nthread': 1, 'tree_method': 'gpu_hist', 'objective': 'reg:squarederror',}

# TODO: create a dmatrix, train a model, and store as xgb_model

In [None]:
# TODO: explore within-train-sample predictions