# Predicting NYC Taxi Fares with Intel Optimizations on Full Dataset

This is a notebook originally written for Rapids but converted to use Modin on Omnisci.

In [7]:
%matplotlib inline
import glob
#import matplotlib.pyplot as plt
#import socket, time
import pandas as modin_omni_pd
#import xgboost as xgb

#To install Holoviews and hvplot
#conda install -c conda-forge holoviews
#conda install -c pyviz hvplot
#import holoviews as hv
#from holoviews import opts
import numpy as np
#import hvplot.pandas
#import hvplot.dask

# Inspecting the Data

We'll use Modin on Omnisci to load and parse all CSV files into a DataFrame. It makes it 30 files overall.

# Data Cleanup

As usual, the data needs to be massaged a bit before we can start adding features that are useful to an ML model.

For example, in the 2014 taxi CSV files, there are `pickup_datetime` and `dropoff_datetime` columns. The 2015 CSVs have `tpep_pickup_datetime` and `tpep_dropoff_datetime`, which are the same columns. One year has `rate_code`, and another `RateCodeID`.

Also, some CSV files have column names with extraneous spaces in them.

Worst of all, starting in the July 2016 CSVs, pickup & dropoff latitude and longitude data were replaced by location IDs, making the second half of the year useless to us.

We'll do a little string manipulation, column renaming, and concatenating of DataFrames to sidestep the problems.

In [8]:
#Dictionary of required columns and their datatypes
drop_columns = [
    "surcharge", "extra", "improvement_surcharge"
]

In [9]:
def clean(ddf):
    # replace the extraneous spaces in column names and lower the font type
    tmp = {col:col.strip().lower() for col in list(ddf.columns)}
    ddf = ddf.rename(columns=tmp)

    ddf = ddf.rename(columns={
        'tpep_pickup_datetime': 'pickup_datetime',
        'tpep_dropoff_datetime': 'dropoff_datetime',
        'ratecodeid': 'rate_code',
        'vendorid': 'vendor_id'
    })

    print("columns = ", ddf.columns)
    for col in ddf.columns:
        if col in drop_columns:
            print("dropping column = ", col)
            ddf = ddf.drop(columns=col)
            continue
        if ddf[col].dtype == 'object':
            ddf[col] = ddf[col].fillna('-1')

    return ddf

In [10]:
base_path = '/localdisk/benchmark_datasets/yellow-taxi-dataset/'

df_2014 = modin_omni_pd.concat([
    clean(modin_omni_pd.read_csv(x, parse_dates=[' pickup_datetime', ' dropoff_datetime']))
    for x in glob.glob(base_path+'2014/yellow_*.csv')], ignore_index=True)

columns =  Index(['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count',
       'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code',
       'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude',
       'payment_type', 'fare_amount', 'surcharge', 'mta_tax', 'tip_amount',
       'tolls_amount', 'total_amount'],
      dtype='object')
dropping column =  surcharge
columns =  Index(['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count',
       'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code',
       'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude',
       'payment_type', 'fare_amount', 'surcharge', 'mta_tax', 'tip_amount',
       'tolls_amount', 'total_amount'],
      dtype='object')
dropping column =  surcharge
columns =  Index(['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count',
       'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code',
       'store_and_fwd_flag', 'd

  df_2014 = modin_omni_pd.concat([


columns =  Index(['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count',
       'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code',
       'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude',
       'payment_type', 'fare_amount', 'surcharge', 'mta_tax', 'tip_amount',
       'tolls_amount', 'total_amount'],
      dtype='object')
dropping column =  surcharge
columns =  Index(['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count',
       'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code',
       'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude',
       'payment_type', 'fare_amount', 'surcharge', 'mta_tax', 'tip_amount',
       'tolls_amount', 'total_amount'],
      dtype='object')
dropping column =  surcharge
columns =  Index(['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count',
       'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code',
       'store_and_fwd_flag', 'd

In [11]:
df_2014.dtypes

vendor_id                     object
pickup_datetime       datetime64[ns]
dropoff_datetime      datetime64[ns]
passenger_count                int64
trip_distance                float64
pickup_longitude             float64
pickup_latitude              float64
rate_code                      int64
store_and_fwd_flag            object
dropoff_longitude            float64
dropoff_latitude             float64
payment_type                  object
fare_amount                  float64
mta_tax                      float64
tip_amount                   float64
tolls_amount                 float64
total_amount                 float64
dtype: object

<b> NOTE: </b>We will realize that some of 2015 data has column name as `RateCodeID` and others have `RatecodeID`. When we rename the columns in the clean function, it internally doesn't pass meta while calling map_partitions(). This leads to the error of column name mismatch in the returned data. For this reason, we will call the clean function with map_partition and pass the meta to it. Here is the link to the bug created for that: https://github.com/rapidsai/cudf/issues/5413

In [12]:
df_2014.shape

(165114361, 17)

We still have 2015 and the first half of 2016's data to read and clean. Let's increase our dataset.

In [13]:
df_2015 = modin_omni_pd.concat([
    clean(modin_omni_pd.read_csv(x, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime']))
    for x in glob.glob(base_path + '2015/yellow_*.csv')], ignore_index=True)

columns =  Index(['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count',
       'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code',
       'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude',
       'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount',
       'tolls_amount', 'improvement_surcharge', 'total_amount'],
      dtype='object')
dropping column =  extra
dropping column =  improvement_surcharge
columns =  Index(['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count',
       'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code',
       'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude',
       'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount',
       'tolls_amount', 'improvement_surcharge', 'total_amount'],
      dtype='object')
dropping column =  extra
dropping column =  improvement_surcharge
columns =  Index(['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger

In [14]:
df_2015.shape

(146112989, 17)

# Handling 2016's Mid-Year Schema Change

In 2016, only January - June CSVs have the columns we need. If we try to read base_path+2016/yellow_*.csv, Dask will not appreciate having differing schemas in the same DataFrame.

Instead, we'll need to create a list of the valid months and read them independently.

In [15]:
months = [str(x).rjust(2, '0') for x in range(1, 7)]
valid_files = [base_path+'2016/yellow_tripdata_2016-'+month+'.csv' for month in months]

In [16]:
#read & clean 2016 data and concat all DFs
df_2016 = modin_omni_pd.concat([
    clean(modin_omni_pd.read_csv(x, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime']))
    for x in valid_files], ignore_index=True)

columns =  Index(['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count',
       'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code',
       'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude',
       'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount',
       'tolls_amount', 'improvement_surcharge', 'total_amount'],
      dtype='object')
dropping column =  extra
dropping column =  improvement_surcharge
columns =  Index(['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count',
       'trip_distance', 'pickup_longitude', 'pickup_latitude', 'rate_code',
       'store_and_fwd_flag', 'dropoff_longitude', 'dropoff_latitude',
       'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount',
       'tolls_amount', 'improvement_surcharge', 'total_amount'],
      dtype='object')
dropping column =  extra
dropping column =  improvement_surcharge
columns =  Index(['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger

In [17]:
#concatenate multiple DataFrames into one bigger one
taxi_df = modin_omni_pd.concat([df_2014, df_2015, df_2016], ignore_index=True)

In [18]:
def print_all(df):
    def print_df(s, name):
        print(name, " = \n", s)
        print(name, "shape = ", s.shape)

    dt = df.dtypes
    print_df(dt, "dtypes")
    mi = df.min()
    print_df(mi, "minimum")
    ma = df.max()
    print_df(ma, "maximum")

    result = modin_omni_pd.DataFrame({"types": dt, "min": mi, "max": ma})
    result = result.reindex(dt.index)
    print_df(result, "result")

    for index, row in result.iterrows():
        print("\"{0}\": (\"{1}\", {2}, {3}),".format(index,
            row[0],
            np.int64(row[1]) if row[0].name == "int64" or row[0].name == "int32" else row[1],
            np.int64(row[2]) if row[0].name == "int64" or row[0].name == "int32" else row[2]))

In [19]:
print_all(taxi_df)

dtypes  = 
 vendor_id                     object
pickup_datetime       datetime64[ns]
dropoff_datetime      datetime64[ns]
passenger_count                int64
trip_distance                float64
pickup_longitude             float64
pickup_latitude              float64
rate_code                      int64
store_and_fwd_flag            object
dropoff_longitude            float64
dropoff_latitude             float64
payment_type                  object
fare_amount                  float64
mta_tax                      float64
tip_amount                   float64
tolls_amount                 float64
total_amount                 float64
dtype: object
dtypes shape =  (17,)


  mi = df.min()


minimum  = 
 pickup_datetime       2014-01-01 00:00:00
dropoff_datetime      1900-01-01 00:00:00
passenger_count                         0
trip_distance                 -40840124.4
pickup_longitude              -874.002625
pickup_latitude                    -180.0
rate_code                               0
store_and_fwd_flag                     -1
dropoff_longitude             -781.833313
dropoff_latitude                   -180.0
fare_amount                        -957.6
mta_tax                              -3.0
tip_amount                         -440.0
tolls_amount                       -99.99
total_amount                       -958.4
dtype: object
minimum shape =  (15,)
maximum  = 
 pickup_datetime       2016-06-30 23:59:59
dropoff_datetime      2253-08-23 07:56:38
passenger_count                       208
trip_distance                 198623013.6
pickup_longitude               172.600006
pickup_latitude                405.016667
rate_code                             221
store_and_fwd

  ma = df.max()
