Based on:

https://github.com/rapidsai-community/notebooks-contrib/blob/main/community_tutorials_and_guides/taxi/NYCTaxi-E2E.ipynb

https://docs.rapids.ai/deployment/stable/examples/rapids-ec2-mnmg/notebook/


Let's import our dependencies

In [1]:
import os
import numpy as np
import dask
import dask_cudf
from dask_ml.model_selection import train_test_split
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
from cuml.dask.common import utils as dask_utils
from cuml.dask.ensemble import RandomForestRegressor
from cuml.metrics import mean_squared_error


# Machine Learning Workflow

- read and clean the data
- add features
- split into training and validation sets
- fit a Random Forest model
- predict on the validation set
- compute RMSE


Dask-CUDA configuration


In [2]:
dask.config.set({'distributed.rmm.pool-size': "30GB"})
dask.config.set({'distributed.ucx.cuda_copy': "True"})
dask.config.set({'distributed.ucx.nvlink': "True"})
dask.config.set({'distributed.ucx.infiniband': "True"})
dask.config.set({'distributed.ucx.net-devices': "ib0"})


<dask.config.set at 0x14cdbc288e30>

In [3]:
os.environ["DASK_LOGGING__DISTRIBUTED"]="info"
os.environ["DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT"]="True"
os.environ["DASK_DISTRIBUTED__COMM__UCX__CUDA_COPY"]="True"
os.environ["DASK_DISTRIBUTED__COMM__UCX__TCP"]="True"
os.environ["DASK_DISTRIBUTED__COMM__UCX__NVLINK"]="True"
os.environ["DASK_DISTRIBUTED__COMM__UCX__INFINIBAND"]="True"
os.environ["DASK_DISTRIBUTED__COMM__UCX__RDMACM"]="True"
os.environ["UCX_MEMTYPE_REG_WHOLE_ALLOC_TYPES"]="cuda"
os.environ["UCX_MEMTYPE_CACHE"]="n"


Connect to a cluster through a LocalCUDACluster


In [4]:
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES=[0, 1, 2, 3],
                           n_workers=4, 
                           threads_per_worker=8,
                           protocol="ucx", 
                           interface="ib0",
                           enable_tcp_over_ucx=True, 
                           enable_infiniband=True, 
                           enable_nvlink=True, 
                           enable_rdmacm=True,
                           rmm_pool_size="50GB",
                           rmm_managed_memory=True)

client = Client(cluster)


In [5]:
client

0,1
Connection method: Cluster object,Cluster type: dask_cuda.LocalCUDACluster
Dashboard: http://10.128.6.37:8787/status,

0,1
Dashboard: http://10.128.6.37:8787/status,Workers: 4
Total threads: 32,Total memory: 482.42 GiB
Status: running,Using processes: True

0,1
Comm: ucx://10.128.6.37:42009,Workers: 4
Dashboard: http://10.128.6.37:8787/status,Total threads: 32
Started: Just now,Total memory: 482.42 GiB

0,1
Comm: ucx://10.128.6.37:52521,Total threads: 8
Dashboard: http://10.128.6.37:40809/status,Memory: 120.61 GiB
Nanny: ucx://10.128.6.37:47549,
Local directory: /tmp/dask-scratch-space/worker-aci4yq7e,Local directory: /tmp/dask-scratch-space/worker-aci4yq7e

0,1
Comm: ucx://10.128.6.37:49758,Total threads: 8
Dashboard: http://10.128.6.37:42331/status,Memory: 120.61 GiB
Nanny: ucx://10.128.6.37:36654,
Local directory: /tmp/dask-scratch-space/worker-mjep7k5n,Local directory: /tmp/dask-scratch-space/worker-mjep7k5n

0,1
Comm: ucx://10.128.6.37:39974,Total threads: 8
Dashboard: http://10.128.6.37:42747/status,Memory: 120.61 GiB
Nanny: ucx://10.128.6.37:51465,
Local directory: /tmp/dask-scratch-space/worker-hz6usb9m,Local directory: /tmp/dask-scratch-space/worker-hz6usb9m

0,1
Comm: ucx://10.128.6.37:53623,Total threads: 8
Dashboard: http://10.128.6.37:37021/status,Memory: 120.61 GiB
Nanny: ucx://10.128.6.37:58717,
Local directory: /tmp/dask-scratch-space/worker-rzpuf7w0,Local directory: /tmp/dask-scratch-space/worker-rzpuf7w0


### 1. Read and Clean Data

On Leonardo we need to pre-download the data, and we assume that all the files are in the following directory


In [6]:
base_path = 'data/nyctaxi/'


The data needs to be cleaned up before it can be used in a meaningful way.

We verify the columns have appropriate datatypes to make it ready for computation using cuML.

We create a list of all columns & dtypes the df must have for reading


In [7]:
# create a list of all columns & dtypes the df must have for reading
col_dtype = {
    "VendorID": "int32",
    "tpep_pickup_datetime": "datetime64[ms]",
    "tpep_dropoff_datetime": "datetime64[ms]",
    "passenger_count": "int32",
    "trip_distance": "float32",
    "pickup_longitude": "float32",
    "pickup_latitude": "float32",
    "RatecodeID": "int32",
    "store_and_fwd_flag": "int32",
    "dropoff_longitude": "float32",
    "dropoff_latitude": "float32",
    "payment_type": "int32",
    "fare_amount": "float32",
    "extra": "float32",
    "mta_tax": "float32",
    "tip_amount": "float32",
    "total_amount": "float32",
    "tolls_amount": "float32",
    "improvement_surcharge": "float32",
}


We define the folowing dictionary of required columns and their datatypes


In [8]:
# Dictionary of required columns and their datatypes
must_haves = {
    "pickup_datetime": "datetime64[ms]",
    "dropoff_datetime": "datetime64[ms]",
    "passenger_count": "int32",
    "trip_distance": "float32",
    "pickup_longitude": "float32",
    "pickup_latitude": "float32",
    "rate_code": "int32",
    "dropoff_longitude": "float32",
    "dropoff_latitude": "float32",
    "fare_amount": "float32",
}


We read the csv files into a dask_cudf for 2014

In [9]:
df_2014 = dask_cudf.read_csv(base_path+'2014/yellow_*.csv', dtype=col_dtype,)


In [10]:
df_2014.head()


Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
0,CMT,2014-01-09 20:45:25,2014-01-09 20:52:31,1,0.7,-73.99477,40.736828,1,N,-73.982227,40.73179,CRD,6.5,0.5,0.5,1.4,0.0,8.9
1,CMT,2014-01-09 20:46:12,2014-01-09 20:55:12,1,1.4,-73.982392,40.773382,1,N,-73.960449,40.763995,CRD,8.5,0.5,0.5,1.9,0.0,11.4
2,CMT,2014-01-09 20:44:47,2014-01-09 20:59:46,2,2.3,-73.98857,40.739406,1,N,-73.986626,40.765217,CRD,11.5,0.5,0.5,1.5,0.0,14.0
3,CMT,2014-01-09 20:44:57,2014-01-09 20:51:40,1,1.7,-73.960213,40.770464,1,N,-73.979863,40.77705,CRD,7.5,0.5,0.5,1.7,0.0,10.2
4,CMT,2014-01-09 20:47:09,2014-01-09 20:53:32,1,0.9,-73.995371,40.717248,1,N,-73.984367,40.720524,CRD,6.0,0.5,0.5,1.75,0.0,8.75


For data cleanup, we define the following function


In [11]:
def clean(ddf, must_haves):
    # replace the extraneous spaces in column names and lower the font type
    tmp = {col: col.strip().lower() for col in list(ddf.columns)}
    ddf = ddf.rename(columns=tmp)

    ddf = ddf.rename(
        columns={
            "tpep_pickup_datetime": "pickup_datetime",
            "tpep_dropoff_datetime": "dropoff_datetime",
            "ratecodeid": "rate_code",
        }
    )

    ddf["pickup_datetime"]  = ddf["pickup_datetime"].astype("datetime64[ms]")
    ddf["dropoff_datetime"] = ddf["dropoff_datetime"].astype("datetime64[ms]")

    for col in ddf.columns:
        if col not in must_haves:
            ddf = ddf.drop(columns=col)
            continue
        if ddf[col].dtype == "object":
            # Fixing error: could not convert arg to str
            ddf = ddf.drop(columns=col)
        else:
            # downcast from 64bit to 32bit types
            # Tesla T4 are faster on 32bit ops
            if "int" in str(ddf[col].dtype):
                ddf[col] = ddf[col].astype("int32")
            if "float" in str(ddf[col].dtype):
                ddf[col] = ddf[col].astype("float32")
            ddf[col] = ddf[col].fillna(-1)

    return ddf


And we clean the data using the map_partitions

In [12]:
df_2014 = df_2014.map_partitions(clean, must_haves, meta=must_haves)


Similarly, we follow the same procedure also for 2015 data


In [13]:
df_2015 = dask_cudf.read_csv(base_path+'2015/yellow_*.csv')
df_2015 = df_2015.map_partitions(clean, must_haves, meta=must_haves)
df_2015.compute()


Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount
0,2015-01-15 19:05:39,2015-01-15 19:23:42,1,1.59,-73.993896,40.750111,1,-73.974785,40.750618,12.0
1,2015-01-10 20:33:38,2015-01-10 20:53:28,1,3.30,-74.001648,40.724243,1,-73.994415,40.759109,14.5
2,2015-01-10 20:33:38,2015-01-10 20:43:41,1,1.80,-73.963341,40.802788,1,-73.951820,40.824413,9.5
3,2015-01-10 20:33:39,2015-01-10 20:35:31,1,0.50,-74.009087,40.713818,1,-74.004326,40.719986,3.5
4,2015-01-10 20:33:39,2015-01-10 20:52:58,1,3.00,-73.971176,40.762428,1,-74.004181,40.742653,15.0
...,...,...,...,...,...,...,...,...,...,...
1186799,2015-12-31 23:59:56,2016-01-01 00:08:18,5,1.20,-73.993813,40.720871,1,-73.986214,40.722469,7.5
1186800,2015-12-31 23:59:58,2016-01-01 00:05:19,2,2.00,-73.965271,40.760281,1,-73.939514,40.752388,7.5
1186801,2015-12-31 23:59:59,2016-01-01 00:12:55,2,3.80,-73.987297,40.739079,1,-73.988670,40.693298,13.5
1186802,2015-12-31 23:59:59,2016-01-01 00:10:26,1,1.96,-73.997559,40.725693,1,-74.017120,40.705322,8.5


In [14]:
df_2015.head()


Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount
0,2015-01-15 19:05:39,2015-01-15 19:23:42,1,1.59,-73.993896,40.750111,1,-73.974785,40.750618,12.0
1,2015-01-10 20:33:38,2015-01-10 20:53:28,1,3.3,-74.001648,40.724243,1,-73.994415,40.759109,14.5
2,2015-01-10 20:33:38,2015-01-10 20:43:41,1,1.8,-73.963341,40.802788,1,-73.95182,40.824413,9.5
3,2015-01-10 20:33:39,2015-01-10 20:35:31,1,0.5,-74.009087,40.713818,1,-74.004326,40.719986,3.5
4,2015-01-10 20:33:39,2015-01-10 20:52:58,1,3.0,-73.971176,40.762428,1,-74.004181,40.742653,15.0


In 2016, only January - June CSVs have the columns we need.

If we try to read base_path+2016/yellow_*.csv, Dask will not appreciate having differing schemas in the same DataFrame.

Instead, we'll need to create a list of the valid months and read them independently.


In [15]:
months = [str(x).rjust(2, '0') for x in range(1, 7)]
valid_files = [base_path+'2016/yellow_tripdata_2016-'+month+'.csv' for month in months]

# Read and clean 2016 data

df_2016 = dask_cudf.read_csv(valid_files).map_partitions(clean, must_haves, meta=must_haves)


Finally, we concatenate multiple DataFrames into one bigger one

In [16]:
taxi_df= dask_cudf.concat([df_2014, df_2015, df_2016], axis=0)
taxi_df= taxi_df.persist()


Now, we need to filter out any non-sensical records and outliers.


In [17]:
# check out if there is any negative total trip time
taxi_df[taxi_df.dropoff_datetime <= taxi_df.pickup_datetime].head()


Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount
3832,2014-01-09 20:27:41,2014-01-09 19:05:21,1,1.8,0.0,0.0,1,-73.969589,40.789001,9.5
16134,2014-01-09 22:15:40,2014-01-09 22:15:14,1,1.3,0.0,0.0,1,-74.002861,40.760624,7.5
16181,2014-01-09 22:27:13,2014-01-09 22:26:49,1,1.9,0.0,0.0,1,-73.976288,40.741631,8.5
16873,2014-01-10 00:14:23,2014-01-10 00:14:23,1,0.7,-74.005135,40.718693,1,-74.003876,40.728504,4.0
25248,2014-01-10 00:39:16,2014-01-10 00:38:59,1,1.5,0.0,0.0,1,-73.999283,40.733898,6.5


In [18]:
# check out if there is any abnormal data where trip distance is short, but the fare is very high.
taxi_df[(taxi_df.trip_distance < 10) & (taxi_df.fare_amount > 300)].head()


Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount
373953,2014-01-12 15:03:38,2014-01-12 15:05:31,1,0.0,-73.823845,40.690258,5,-73.823845,40.690281,400.0
520658,2014-01-06 23:44:07,2014-01-06 23:45:01,1,0.0,-73.927902,41.677242,5,-73.927902,41.677242,370.0
525419,2014-01-06 23:50:28,2014-01-06 23:51:40,1,0.0,-73.927902,41.677242,5,-73.927902,41.677242,370.0
562916,2014-01-07 12:09:38,2014-01-07 12:10:27,1,0.0,-73.929695,40.812031,5,-73.929695,40.812031,500.0
576409,2014-01-07 16:35:10,2014-01-07 16:35:41,1,0.0,-73.996002,40.720932,5,-73.99601,40.720928,500.0


In [19]:
# check out if there is any abnormal data where trip distance is long, but the fare is very low.
taxi_df[(taxi_df.trip_distance > 50) & (taxi_df.fare_amount < 50)].head()


Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount
11580,2014-01-09 22:22:14,2014-01-09 22:42:59,1,82.099998,-74.002174,40.751755,1,-73.964218,40.719555,20.0
33723,2014-01-10 05:43:48,2014-01-10 06:12:20,1,84.199997,-73.96505,40.806671,1,-73.957977,40.713139,34.5
122880,2014-01-10 18:16:53,2014-01-10 18:48:53,1,89.900002,-73.997185,40.742355,1,-73.948364,40.77927,21.5
226264,2014-01-11 13:22:07,2014-01-11 13:40:16,1,81.699997,-73.991684,40.759872,1,-73.994987,40.726055,13.5
290203,2014-01-11 20:12:50,2014-01-11 20:15:15,1,60.400002,-73.968109,40.770878,1,-73.972595,40.764606,4.0


In the following, we will only select records where tripdistance < 500 miles.

Similarly, we need to check abnormal fare_amount values for some records.

We will only select records where fare_amount < 500$.

Since we are interested in NYC, we also have to take coordinates into consideration

Exploratory data analysis yield the filter logic below.

Apply a list of filter conditions to throw out records with missing or outlier values.


In [20]:
# apply a list of filter conditions to throw out records with missing or outlier values
query_frags = [
    'fare_amount > 1 and fare_amount < 500',
    'passenger_count > 0 and passenger_count < 6',
    'pickup_longitude > -75 and pickup_longitude < -73',
    'dropoff_longitude > -75 and dropoff_longitude < -73',
    'pickup_latitude > 40 and pickup_latitude < 42',
    'dropoff_latitude > 40 and dropoff_latitude < 42',
    'trip_distance > 0 and trip_distance < 500',
    'not (trip_distance > 50 and fare_amount < 50)',
    'not (trip_distance < 10 and fare_amount > 300)',
    'not dropoff_datetime <= pickup_datetime'
]
taxi_df = taxi_df.query(' and '.join(query_frags))
# reset_index and drop index column
taxi_df = taxi_df.reset_index(drop=True)


In [21]:
taxi_df.head()


Unnamed: 0,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount
0,2014-01-09 20:45:25,2014-01-09 20:52:31,1,0.7,-73.994766,40.736828,1,-73.982224,40.731789,6.5
1,2014-01-09 20:46:12,2014-01-09 20:55:12,1,1.4,-73.982391,40.77338,1,-73.960449,40.763996,8.5
2,2014-01-09 20:44:47,2014-01-09 20:59:46,2,2.3,-73.988571,40.739407,1,-73.986626,40.765217,11.5
3,2014-01-09 20:44:57,2014-01-09 20:51:40,1,1.7,-73.960213,40.770466,1,-73.979866,40.77705,7.5
4,2014-01-09 20:47:09,2014-01-09 20:53:32,1,0.9,-73.995369,40.717247,1,-73.984367,40.720524,6.0


### 2. Add features

We'll add new features to the dataframe:

We can split the datetime column to retrieve year, month, day, hour, day_of_week columns.

Find the difference between pickup time and drop off time.


In [22]:
## add features
taxi_df["hour"] = taxi_df["pickup_datetime"].dt.hour.astype("int32")
taxi_df["year"] = taxi_df["pickup_datetime"].dt.year.astype("int32")
taxi_df["month"] = taxi_df["pickup_datetime"].dt.month.astype("int32")
taxi_df["day"] = taxi_df["pickup_datetime"].dt.day.astype("int32")
taxi_df["day_of_week"] = taxi_df["pickup_datetime"].dt.weekday.astype("int32")
taxi_df["is_weekend"] = (taxi_df["day_of_week"] >= 5).astype("int32")


Haversine Distance between the pick-up and drop-off coordinates.

That is, we'll use a Haversine Distance calculation to find total trip distance


In [23]:
def haversine_dist(df):
    import cuspatial

    pickup = cuspatial.GeoSeries.from_points_xy(df[["pickup_longitude", "pickup_latitude"]].interleave_columns())
    dropoff = cuspatial.GeoSeries.from_points_xy(df[["dropoff_longitude", "dropoff_latitude"]].interleave_columns())
    df["h_distance"] = cuspatial.haversine_distance(pickup, dropoff)
    df["h_distance"] = df["h_distance"].astype("float32")
    return df


In [24]:
# calculate the time difference between dropoff and pickup.
taxi_df["diff"] = taxi_df["dropoff_datetime"].astype("int32") - taxi_df["pickup_datetime"].astype("int32")

taxi_df["diff"] = (taxi_df["diff"] / 1000).astype("int32")

taxi_df["pickup_latitude_r"] = taxi_df["pickup_latitude"] // 0.01 * 0.01
taxi_df["pickup_longitude_r"] = taxi_df["pickup_longitude"] // 0.01 * 0.01
taxi_df["dropoff_latitude_r"] = taxi_df["dropoff_latitude"] // 0.01 * 0.01
taxi_df["dropoff_longitude_r"] = taxi_df["dropoff_longitude"] // 0.01 * 0.01

taxi_df = taxi_df.drop("pickup_datetime", axis=1)
taxi_df = taxi_df.drop("dropoff_datetime", axis=1)

taxi_df = taxi_df.map_partitions(haversine_dist)
taxi_df = taxi_df.persist()


In [25]:
taxi_df.head()


Unnamed: 0,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount,hour,year,month,day,day_of_week,is_weekend,diff,pickup_latitude_r,pickup_longitude_r,dropoff_latitude_r,dropoff_longitude_r,h_distance
0,1,0.7,-73.994766,40.736828,1,-73.982224,40.731789,6.5,20,2014,1,9,3,0,426,40.73,-74.0,40.73,-73.989998,1.196175
1,1,1.4,-73.982391,40.77338,1,-73.960449,40.763996,8.5,20,2014,1,9,3,0,540,40.77,-73.989998,40.759998,-73.970001,2.122098
2,2,2.3,-73.988571,40.739407,1,-73.986626,40.765217,11.5,20,2014,1,9,3,0,899,40.73,-73.989998,40.759998,-73.989998,2.874643
3,1,1.7,-73.960213,40.770466,1,-73.979866,40.77705,7.5,20,2014,1,9,3,0,403,40.77,-73.970001,40.77,-73.979996,1.809662
4,1,0.9,-73.995369,40.717247,1,-73.984367,40.720524,6.0,20,2014,1,9,3,0,383,40.709999,-74.0,40.719997,-73.989998,0.996204


### 3. Split Data

Now, we split into training and validation sets

In [26]:
# Split into training and validation sets
X, y = taxi_df.drop(["fare_amount"], axis=1).astype("float32"), taxi_df["fare_amount"].astype("float32")
X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=True)
workers = client.has_what().keys()
X_train, X_test, y_train, y_test = dask_utils.persist_across_workers(client, [X_train, X_test, y_train, y_test], workers=workers)




### 4. Create and fit a Random Forest Model

Now, we create cuml.dask RandomForest Regressor

In [27]:
# create cuml.dask.ensemble RandomForestRegressor 
cu_dask_rf = RandomForestRegressor(ignore_empty_partitions=True)


  return init_func(self, *args, **kwargs)
  return init_func(self, *args, **kwargs)
  return init_func(self, *args, **kwargs)
  return init_func(self, *args, **kwargs)


Fit the RF model over the train dataset

In [28]:
# Fit RF model
cu_dask_rf = cu_dask_rf.fit(X_train, y_train)


### 5. Predict on validation set

In [29]:
# predict on validation set
y_pred = cu_dask_rf.predict(X_test)


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


### 6. Compute RMSE

In [30]:
# compute RMSE
score = mean_squared_error(y_pred.compute().to_numpy(), y_test.compute().to_numpy())
print("Workflow Complete - RMSE: ", np.sqrt(score))


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Workflow Complete - RMSE:  1.4880993348928098


Clean up resources


In [31]:
# Clean up resources
client.close()