# NVIDIA RAPIDS on Azure ML
## MLADS Fall'19

In this notebook we use NYC Taxi dataset to showcase some the speedup and the ease of converting the single-threaded `pandas` execution with GPU-accellerated ETL workload using `cudf` from RAPIDS.

**AUTHORS**
* Tom Drabas (Microsoft)
* Brad Rees (NVIDIA)
* Keith Kraus (NVIDIA)
* Paul Mahler (NVIDIA)
* John Zedlewski (NVIDIA)
* Chau Dang (NVIDIA)

**GREATER TEAM**
* Joshua Patterson (NVIDIA)
* Michael Beaumont (NVIDIA)
* Manuel Reyes Gomez (NVIDIA)

# Import modules

In [97]:
import os
import cudf
import pandas as pd
import datetime
import time
import numpy as np
from collections import OrderedDict

pd.options.mode.chained_assignment = None 

# Define global vars and methods

In [53]:
columns_dtypes = OrderedDict(
    [
        ('vendor_id', 'int32'),
        ('pickup_datetime', 'date'),
        ('dropoff_datetime', 'date'),
        ('passenger_count', 'int32'),
        ('trip_distance', 'int32'),
        ('pickup_longitude', 'float64'),
        ('pickup_latitude', 'float64'),
        ('rate_code', 'int32'),
        ('store_and_fwd_flag', 'int32'),
        ('dropoff_longitude', 'float64'),
        ('dropoff_latitude', 'float64'),
        ('payment_type', 'int32'),
        ('fare_amount', 'float64'),
        ('extra', 'float64'),
        ('mta_tax', 'float64'),
        ('tip_amount', 'float64'),
        ('tolls_amount', 'float64'),
        ('surcharge', 'float64'),
        ('total_amount', 'float64')
    ]
)

use_col  = [
      'pickup_datetime'
    , 'dropoff_datetime'
    , 'passenger_count'
    , 'trip_distance'
    , 'pickup_longitude'
    , 'pickup_latitude'
    , 'rate_code'
    , 'dropoff_longitude'
    , 'dropoff_latitude'
    , 'fare_amount'
]

query_frags = [
    'fare_amount > 0 and fare_amount < 500',
    'passenger_count > 0 and passenger_count < 6',
    'pickup_longitude > -75 and pickup_longitude < -73',
    'dropoff_longitude > -75 and dropoff_longitude < -73',
    'pickup_latitude > 40 and pickup_latitude < 42',
    'dropoff_latitude > 40 and dropoff_latitude < 42'
]

In [83]:
def print_message(msg, length=80, filler='#', pre_post=''):
    print(f'{pre_post} {msg} {pre_post}'.center(length, filler))
    
def print_time(t_curr, t_next, t_start, length=80):
    print('> Step time: {0}, elapsed time: {1}'
          .format(str(t_curr - t_next), str(t_curr - t_start)).rjust(length, '-'))
    
def add_features(df, gpu):
    df['pickup_datetime'] = df['pickup_datetime'].astype('datetime64[ms]')
    
    df['hour']  = df['pickup_datetime'].dt.hour
    df['year']  = df['pickup_datetime'].dt.year
    df['month'] = df['pickup_datetime'].dt.month
    df['day']   = df['pickup_datetime'].dt.day
    
    df['pickup_latitude_r']   = df['pickup_latitude']   // .01 * .01
    df['pickup_longitude_r']  = df['pickup_longitude']  // .01 * .01
    df['dropoff_latitude_r']  = df['dropoff_latitude']  // .01 * .01
    df['dropoff_longitude_r'] = df['dropoff_longitude'] // .01 * .01
    
    if gpu:
        df = df.drop('pickup_datetime')
        df = df.drop('dropoff_datetime')
    else:
        df = df.drop('pickup_datetime', axis=1)
        df = df.drop('dropoff_datetime', axis=1)
        
    return df

# Define GPU workflow

In [84]:
def run_gpu_workflow(data_path):
    t_start = datetime.datetime.now()
    print_message('LOADING DATA')
    
    taxi_df = cudf.read_csv(
              os.path.join(data_path, '2016/yellow_tripdata_2016-01.csv')
            , names=list(columns_dtypes.keys())
            , dtype=list(columns_dtypes.values())
            , skiprows=1
            , usecols=use_col
        )
    t_next = datetime.datetime.now()
    print_time(t_next, t_start, t_start)
    
    print()
    print_message('NUMBER OF ROWS: {0:,}'.format(len(taxi_df)), pre_post='+', filler='-')
    print()
    
    print_message('SUBSETTING DATA')
    # apply a list of filter conditions to throw out records with missing or outlier values
    taxi_df = taxi_df.query(' and '.join(query_frags))
    t_curr = datetime.datetime.now()
    print_time(t_curr, t_next, t_start)
    t_next = t_curr
    
    print_message('FEATURIZING DATA')
    taxi_df = add_features(taxi_df, gpu=1)
    t_curr = datetime.datetime.now()
    print_time(t_curr, t_next, t_start)
    
    return t_curr - t_start

# Define CPU workflow

In [88]:
def run_cpu_workflow(data_path):
    t_start = datetime.datetime.now()
    print_message('LOADING DATA')
    
    taxi_df = pd.read_csv(
              os.path.join(data_path, '2016/yellow_tripdata_2016-01.csv')
            , names=list(columns_dtypes.keys())
            , parse_dates=True
            , skiprows=1
            , usecols=use_col
        )
    t_next = datetime.datetime.now()
    print_time(t_next, t_start, t_start)
    
    print()
    print_message('NUMBER OF ROWS: {0:,}'.format(len(taxi_df)), pre_post='+', filler='-')
    print()
    
    print_message('SUBSETTING DATA')
    # apply a list of filter conditions to throw out records with missing or outlier values
    taxi_df = taxi_df.query(' and '.join(query_frags))
    t_curr = datetime.datetime.now()
    print_time(t_curr, t_next, t_start)
    t_next = t_curr
    
    print_message('FEATURIZING DATA')
    taxi_df = add_features(taxi_df, gpu=0)
    t_curr = datetime.datetime.now()
    print_time(t_curr, t_next, t_start)
    
    return t_curr - t_start

In [91]:
data_dir = '../../../'     #### REPLACE WITH THE DATA STORE PATH
data_path = os.path.join(data_dir, "data/nyctaxi")

gpu_runtime = run_gpu_workflow(data_path)

################################# LOADING DATA #################################
-----------------------> Step time: 0:00:01.337533, elapsed time: 0:00:01.337533

-------------------------+ NUMBER OF ROWS: 10,906,858 +-------------------------

############################### SUBSETTING DATA ################################
-----------------------> Step time: 0:00:00.080865, elapsed time: 0:00:01.418398
############################### FEATURIZING DATA ###############################
-----------------------> Step time: 0:00:00.116331, elapsed time: 0:00:01.534729


In [98]:
cpu_runtime = run_cpu_workflow(data_path)

################################# LOADING DATA #################################
-----------------------> Step time: 0:00:17.586886, elapsed time: 0:00:17.586886

-------------------------+ NUMBER OF ROWS: 10,906,858 +-------------------------

############################### SUBSETTING DATA ################################
-----------------------> Step time: 0:00:00.748321, elapsed time: 0:00:18.335207
############################### FEATURIZING DATA ###############################
-----------------------> Step time: 0:00:06.446727, elapsed time: 0:00:24.781934


In [102]:
print_message('Total CPU time: {0}'.format(str(cpu_runtime)))
print_message('Total GPU time: {0}'.format(str(gpu_runtime)))
print_message('Speedup over CPU: {0:.3f}'.format(cpu_runtime / gpu_runtime))

######################## Total CPU time: 0:00:24.781934 ########################
######################## Total GPU time: 0:00:01.534729 ########################
########################### Speedup over CPU: 16.147 ###########################
