# Licencing Information

(C) Cloudera, Inc. 2020-2021
All rights reserved.

Applicable Open Source License: GNU Affero General Public License v3.0

NOTE: Cloudera open source products are modular software products
made up of hundreds of individual components, each of which was
individually copyrighted.  Each Cloudera open source product is a
collective work under U.S. Copyright Law. Your license to use the
collective work is as provided in your written agreement with
Cloudera.  Used apart from the collective work, this file is
licensed for your use pursuant to the open source license
identified above.

This code is provided to you pursuant a written agreement with
(i) Cloudera, Inc. or (ii) a third-party authorized to distribute
this code. If you do not have a written agreement with Cloudera nor
with an authorized and properly licensed third party, you do not
have any rights to access nor to use this code.

Absent a written agreement with Cloudera, Inc. (“Cloudera”) to the
contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
DATA.

Source File Name: gpu_fare_prediction.ipynb

Description: Explore how you can leverage NVIDIA's RAPIDS framework
             using Cloudera Machine Learning (CML), on the Cloudera Data Platform (CDP).

Author(s): Jacob (Jake) Bengtson


# Program Initialization

In [None]:
# set variable that will determine if CPUs or GPUs will be used
mode = 'cpu'

In [None]:

import pickle
import os
from glob import glob
from datetime import datetime
from math import cos, sin, asin, sqrt, pi

if mode is 'cpu':
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    from sklearn.linear_model import LinearRegression
    from sklearn.metrics import r2_score
    pd.set_option('display.max_columns', None)
    pd.set_option('display.float_format', lambda x: '%.2f' % x)
elif mode is 'gpu':
    import cudf as pd
    import cupy as np
    from cuml import train_test_split
    from cuml.linear_model import LinearRegression
    from cuml.metrics.regression import r2_score

In [None]:
random_state = 24
year = 2015

# Import Data
Import data only once. After you import the data, this step can be skipped or commented out.

In [None]:
%%time
from urllib import request
for month in range(1,3):
    request.urlretrieve("https://s3.amazonaws.com/nyc-tlc/trip+data/" + \
                        "yellow_tripdata_%s-{0:0=2d}.csv".format(month) % year,
                        "./yellow_tripdata_%s-{0:0=2d}.csv".format(month) % year)

# Data Preparation

In [None]:
%%time
start_time = datetime.now()
# pandas does not handle null values in int columns, so I had to use floats
dtypes = {
    'tpep_pickup_datetime': 'str',
    'tpep_dropoff_datetime': 'str',
    'passenger_count': 'float32',
    'trip_distance': 'float32',
    'RateCodeID': 'str',
    'pickup_longitude': 'float32',
    'pickup_latitude': 'float32',
    'dropoff_longitude': 'float32',
    'dropoff_latitude': 'float32',
    'payment_type': 'str',
    'fare_amount': 'float32'
}

# the one difference that I found between cudf and pandas
if mode is 'gpu':
    dtypes.update({
        'tpep_pickup_datetime': 'date',
        'tpep_dropoff_datetime': 'date',
    })

taxi_glob = glob('./*%s*.csv' % year)
li = []
for file in taxi_glob:
    temp_df = pd.read_csv(file,
                          usecols=list(dtypes.keys()),
                          dtype=dtypes,
                          parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime']
                         )
    li.append(temp_df)

taxi_df = pd.concat(li, ignore_index=True)
del li
del temp_df

print('Dataframe row count: ' + str(taxi_df.shape[0]))
taxi_df.head()

In [None]:
%%time
# clean up column names
taxi_df.rename(
    columns={
        'tpep_pickup_datetime': 'pickup_datetime',
        'tpep_dropoff_datetime': 'dropoff_datetime',
        'RateCodeID': 'ratecode_id'
    },inplace=True)

In [None]:
%%time
# filter out outliers based on previously performed EDA
# filter out lot/long outside of NYC
filters = [
    'fare_amount > 1 and fare_amount < 500',
    'passenger_count > 0 and passenger_count < 6',
    'trip_distance > 0 and trip_distance < 500',
    'not (trip_distance > 50 and fare_amount < 50)',
    'not (trip_distance < 10 and fare_amount > 300)',
    'not dropoff_datetime <= pickup_datetime',
    'pickup_longitude <= 73.4 and pickup_longitude >= -74.4',
    'pickup_latitude <= 41.2 and pickup_latitude >= 40.2'
]
taxi_df = taxi_df.query(' and '.join(filters))

In [None]:
%%time
# one hot encode: ratecode_id, payment_type
dummy_df = pd.get_dummies(taxi_df['ratecode_id'], prefix='ratecode_id')
dummy_df2 = pd.get_dummies(taxi_df['payment_type'], prefix='payment_type')
taxi_df.drop(['ratecode_id', 'payment_type'], axis=1, inplace=True)
taxi_df = pd.concat([taxi_df, dummy_df, dummy_df2], axis=1)
del dummy_df, dummy_df2

In [None]:
%%time
# add time features
taxi_df['hour'] = taxi_df['pickup_datetime'].dt.hour
taxi_df['month'] = taxi_df['pickup_datetime'].dt.month
taxi_df['day'] = taxi_df['pickup_datetime'].dt.day
taxi_df['day_of_week'] = taxi_df['pickup_datetime'].dt.weekday
taxi_df['is_weekend'] = (taxi_df['day_of_week']>=5).astype('int32')
taxi_df['diff'] = taxi_df['dropoff_datetime'].astype('int64') - taxi_df['pickup_datetime'].astype('int64')
taxi_df['diff']=(taxi_df['diff']/1000).astype('int64')
taxi_df = taxi_df.drop(['pickup_datetime', 'dropoff_datetime'], axis=1)
# add trip direction features
taxi_df['toward_east'] = ((taxi_df["dropoff_longitude"] - taxi_df["pickup_longitude"]) * 85) > 0
taxi_df['toward_north'] = ((taxi_df["dropoff_latitude"] - taxi_df["pickup_latitude"]) * 111) > 0

In [None]:
%%time
# add haversine distance feature
# Haversine distance formula taken from Michael Dunn's StackOverflow post:
# https://stackoverflow.com/questions/4913349/haversine-formula-in-python-bearing-and-distance-between-two-gps-points
if mode is 'cpu':
    def haversine_distance(x_1, y_1, x_2, y_2):
        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_2 = pi/180 * x_2
        y_2 = pi/180 * y_2
        
        dlon = y_2 - y_1
        dlat = x_2 - x_1
        a = sin(dlat/2)**2 + cos(x_1) * cos(x_2) * sin(dlon/2)**2
        
        c = 2 * asin(sqrt(a)) 
        r = 6371 # Radius of earth in kilometers
        return c * r
    
    taxi_df['hav_distance'] = taxi_df.apply(lambda row:haversine_distance(row['pickup_latitude'],
                                                                        row['pickup_longitude'],
                                                                        row['dropoff_latitude'],
                                                                        row['dropoff_longitude']),axis=1)
    
elif mode is 'gpu':
    def haversine_distance(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude, hav_distance):
        for i, (x_1, y_1, x_2, y_2) in enumerate(zip(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude)):

            x_1 = pi/180 * x_1
            y_1 = pi/180 * y_1
            x_2 = pi/180 * x_2
            y_2 = pi/180 * y_2

            dlon = y_2 - y_1
            dlat = x_2 - x_1
            a = sin(dlat/2)**2 + cos(x_1) * cos(x_2) * sin(dlon/2)**2

            c = 2 * asin(sqrt(a)) 
            r = 6371 # Radius of earth in kilometers

            hav_distance[i] = c * r

    taxi_df = taxi_df.apply_rows(haversine_distance,
                               incols=['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude'],
                               outcols=dict(hav_distance=np.float64),
                               kwargs=dict())
    

In [None]:
print('Dataset Row Count: %s' % taxi_df.shape[0])
taxi_df.head()

# Train and Evaluate a Regression Model

In [None]:
%%time
# create a test set for evaluation
y = taxi_df.pop('fare_amount')
X_df_train, X_df_test, y_df_train, y_df_test = train_test_split(taxi_df, y, test_size=.2, random_state=random_state)

In [None]:
%%time
# fit and score model
lr_model = LinearRegression(fit_intercept=True,
                            normalize=True)
lr_model.fit(X_df_train, y_df_train)
y_hat = lr_model.predict(X_df_test)
print(r2_score(y_df_test, y_hat))
del X_df_train, X_df_test, y_df_train, y_df_test

# Save Model

In [None]:
%%time
# train model on full data set
lr_model_full = LinearRegression(fit_intercept=True,
                                 normalize=True)
lr_model_full.fit(taxi_df, y)

# Save Model
mdl = pickle.dumps(lr_model_full)
with open('%s_taxi_model.pickle' % mode, 'wb') as handle:
    pickle.dump(mdl, handle)

In [None]:
print('%s run time: %s' % (mode, (datetime.now() - start_time)))