## Define a function to get the real distance between to lat/long points
- Manhattan distance should be useful, but I think we can do better with real distance
- Here we compare a manual calculation to the geopy library

In [1]:
from math import sin, cos, sqrt, atan2, radians
import geopy.distance

def geo_manhattan_distance(lat1, lat2, long1, long2):
    """
    returns the manhattan distance between two geo points
    """
    return abs(lat2 - lat1) + abs(long2 - long1)

def geopy_dist(coord1, coord2):
    try:
        return geopy.distance.distance(coord1, coord2).kilometers
    except:
        return -1

def haversine(lat1, lon1, lat2, lon2, m_const=3958.8):
    lat1, lon1, lat2, lon2 = map(abs, [lat1, lon1, lat2, lon2])
    lat1, lon1, lat2, lon2 = map(np.deg2rad, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1 
    dlon = lon2 - lon1 
    a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
    c = 2 * np.arcsin(np.sqrt(a)) 
    mi = m_const * c
    return mi

# filter functions to reduce the dataset
def within_boundary(dataframe, boundary):
    return (dataframe['pickup_longitude'] >= boundary[0]) & (dataframe['pickup_longitude'] <= boundary[1]) & \
            (dataframe['pickup_latitude'] >= boundary[2]) & (dataframe['pickup_latitude'] <= boundary[3]) & \
            (dataframe['dropoff_longitude'] >= boundary[0]) & (dataframe['dropoff_longitude'] <= boundary[1]) & \
            (dataframe['dropoff_latitude'] >= boundary[2]) & (dataframe['dropoff_latitude'] <= boundary[3])

def not_at_airport(dataframe):
    return ~((dataframe['pickup_to_jfk'] < 1) | (dataframe['dropoff_to_jfk'] < 1)) & \
            ~((dataframe['pickup_to_laguardia'] < 1) | (dataframe['dropoff_to_laguardia'] < 1))


def has_passengers(dataframe):
    return (dataframe['passenger_count'] != 0)
            

## Import our dataset
- our dataset consists of several files:
    - train.csv: our training data
    - test.csv: our testing data
    - sample_submissions.csv: A sample submission file in the correct format (columns key and fare_amount). This dummy file 'predicts' fare_amount to be $11.35 for all rows, which is the mean fare_amount from the training set.

In [2]:
import pandas as pd
import os
import sys
import numpy as np
import random
from sklearn.model_selection import train_test_split

TOTAL_ROWS = 55423855

DATA_FILES_PATH = 'projectDataFiles/'

NYC_COORD = (40.7128, 74.0060)
JFK_COORD = (40.6413, 73.7781)
LAGUARDIA_COORD = (40.7769, 73.8740)

# NYC Boundaries
NYC_BOUNDARY = (-74.5, -72.8, 40.5, 41.8) # in format WEST, EAST, NORTH, SOUTH

# training data types
TRAINING_TYPES = {
    'fare_amount': 'float32',
    'pickup_datetime': 'str',
    'pickup_longitude': 'float32',
    'pickup_latitude': 'float32',
    'dropoff_longitude': 'float32',
    'dropoff_latitude': 'float32',
    'passenger_count': 'uint8'
}

COLUMNS = list(TRAINING_TYPES.keys()) + ['real_dist']

FEATURES = [item for item in COLUMNS if item != 'fare_amount']

LABEL = 'fare_amount'

def import_training_dataset_limit(file_path, row_limit=100000):
    """
    function to import the dataset into a pandas dataframe.

    Takes a row limit to limit the number of rows read.
    """
    if row_limit:
        return process_df(pd.read_csv(file_path, nrows=row_limit))
    else:
        return process_df(pd.read_csv(file_path))


def get_df_list(file_path, chunksize=1000000):
    df_list = []
    for df_chunk in pd.read_csv(file_path, chunksize=chunksize, dtype=TRAINING_TYPES):
        df_chunk = process_df(df_chunk)
        df_list.append(df_chunk)
    return df_list
        
def process_df(dataframe, train_data=True):
    pd.set_option('use_inf_as_na', True)
    dataframe['pickup_datetime'] = dataframe['pickup_datetime'].str.slice(0, 16)
    dataframe['pickup_datetime'] = pd.to_datetime(dataframe['pickup_datetime'], utc=True, format='%Y-%m-%d %H:%M')
    # the distance between the pickup and dropoff points
    dataframe['real_dist'] = haversine(dataframe['pickup_latitude'], dataframe['pickup_longitude'], dataframe['dropoff_latitude'], dataframe['dropoff_longitude'])

    # add the deconstructed date
    dataframe['hour'] = dataframe['pickup_datetime'].dt.hour
    dataframe['day'] = dataframe['pickup_datetime'].dt.day
    dataframe['month'] = dataframe['pickup_datetime'].dt.month
    dataframe['year'] = dataframe['pickup_datetime'].dt.year

    # add the distances to the airports
    dataframe['pickup_to_jfk'] = haversine(dataframe['pickup_latitude'], dataframe['pickup_longitude'], JFK_COORD[0], JFK_COORD[1])
    dataframe['dropoff_to_jfk'] = haversine(dataframe['dropoff_latitude'], dataframe['dropoff_longitude'], JFK_COORD[0], JFK_COORD[1])
    dataframe['pickup_to_laguardia'] = haversine(dataframe['pickup_latitude'], dataframe['pickup_longitude'], LAGUARDIA_COORD[0], LAGUARDIA_COORD[1])
    dataframe['dropoff_to_laguardia'] = haversine(dataframe['dropoff_latitude'], dataframe['dropoff_longitude'], LAGUARDIA_COORD[0], LAGUARDIA_COORD[1])

    # distance from center of new york
    dataframe['dropoff_from_center'] = haversine(dataframe['dropoff_latitude'], dataframe['dropoff_longitude'], NYC_COORD[0], NYC_COORD[1])
    dataframe['pickup_from_center'] = haversine(dataframe['pickup_latitude'], dataframe['pickup_longitude'], NYC_COORD[0], NYC_COORD[1])

    # adding fare per mile
    if train_data:
        dataframe['fare_per_mile'] = dataframe['fare_amount'] / dataframe['real_dist']
        
    # limit to the boundary
    dataframe = dataframe[within_boundary(dataframe, NYC_BOUNDARY) & has_passengers(dataframe) & not_at_airport(dataframe)].copy()

    # drop uneccessary columns
    dataframe.drop(['key', 'pickup_datetime', 'pickup_longitude', 'pickup_latitude', 'dropoff_latitude', 'dropoff_longitude'], axis=1, inplace=True)
    dataframe.dropna(axis=1, how='any', inplace=True)
    return dataframe

def read_feathered_data(file_path):
    return pd.read_feather(file_path)

def feather_dataset(dataframe, file_out):
    dataframe.to_feather(file_out)

# import the dataset as a list of chunks, from here we can do our processing at a chunk level
print('Importing Datasets...')
# DATA_LIST = get_df_list(f'{DATA_FILES_PATH}train.csv')

# train_split = int(len(DATA_LIST) * 0.8)

# random.shuffle(DATA_LIST)

# TRAINING_LIST = DATA_LIST[:train_split]

# TEST = pd.concat(DATA_LIST[train_split:])

# TRAINING_LIST[0].head()

# import the dataset for testing 
DF = import_training_dataset_limit(f'{DATA_FILES_PATH}train.csv')

DF.head()

TRAIN, TEST = train_test_split(DF, test_size=0.2)

TRAIN.head()
    

Importing Datasets...


Unnamed: 0,fare_amount,passenger_count,real_dist,hour,day,month,year,pickup_to_jfk,dropoff_to_jfk,pickup_to_laguardia,dropoff_to_laguardia,dropoff_from_center,pickup_from_center
11478,20.5,2,2.686101,16,27,9,2014,13.915098,12.568811,7.379848,4.717852,3.968604,2.067722
49496,17.0,1,2.755679,11,18,4,2013,13.413561,13.053351,5.564542,7.511679,0.950833,3.698974
63585,17.0,1,3.115377,12,28,6,2015,13.928647,14.060379,7.214093,5.106556,5.152266,2.273114
826,10.5,1,1.3957,17,20,3,2012,12.97242,14.12278,5.719373,7.063343,2.698994,3.069815
12389,7.7,1,0.580834,3,21,1,2012,12.552576,13.063415,6.213803,6.272833,2.468924,2.174333


In [3]:
TEST.head()

Unnamed: 0,fare_amount,passenger_count,real_dist,hour,day,month,year,pickup_to_jfk,dropoff_to_jfk,pickup_to_laguardia,dropoff_to_laguardia,dropoff_from_center,pickup_from_center
47970,8.0,2,0.803903,5,11,8,2014,13.468622,12.877165,5.484062,5.537908,3.212159,3.863885
61484,10.9,1,3.502995,0,3,9,2009,12.871857,14.346217,5.049871,4.178435,7.282764,3.809082
77936,6.1,1,0.0,8,21,8,2010,13.542083,13.542083,5.869239,5.869239,3.455426,3.455426
17298,7.7,1,1.31555,20,2,5,2009,13.177572,12.557696,8.724756,7.417214,0.794867,0.520713
67942,15.0,2,2.765729,17,21,10,2013,13.494769,12.30682,6.141811,7.51241,0.751082,3.059539


## Perform a SGD partial fit
- SGD stands for stochastic gradient descent
- Here we are feeding our chunks into the partial fit

In [4]:
from sklearn.linear_model import SGDRegressor
from sklearn.preprocessing import StandardScaler

def sgd_train(chunk_list, loss="squared_loss"):
    my_sgd_regressor = SGDRegressor(loss=loss)
    my_sgd_regressor.n_iter = np.ceil(10**6 / len(TEST[LABEL]))
    scaler = StandardScaler()
    for chunk in chunk_list:
        X_train = chunk[chunk.columns.difference([LABEL])]
        scaler.fit(X_train)
        my_sgd_regressor.partial_fit(scaler.transform(X_train), chunk[LABEL])
    X_test = TEST[TEST.columns.difference([LABEL])]
    y_predict = my_sgd_regressor.predict(scaler.transform(X_test))
    return y_predict

print('Getting SGD predictions...')
#Y_PREDICT_SGD = sgd_train(TRAINING_LIST)
Y_PREDICT_SGD = sgd_train([TRAIN])

Getting SGD predictions...


In [5]:
from sklearn.ensemble import RandomForestRegressor
from functools import reduce

def gen_rf(X_train, y_train):
    rf = RandomForestRegressor(n_estimators = 2, verbose=3)
    rf.fit(X_train, y_train)
    return rf

def combine_rf(a, b):
    a.estimators_ += b.estimators_
    a.n_estimators = len(a.estimators_)
    return a

def rf_train(chunk_list):
    rf_list = [gen_rf(chunk[chunk.columns.difference([LABEL])], chunk[LABEL]) for chunk in chunk_list]
    rf_total = reduce(combine_rf, rf_list)
    y_predict = rf_total.predict(TEST[TEST.columns.difference([LABEL])])
    return y_predict

def rf_train_warm_start(chunk_list, n_estimators = 5):
    rf = RandomForestRegressor(n_estimators = n_estimators, verbose=2, warm_start=True)
    for index, chunk in enumerate(chunk_list):
        X_train = chunk[chunk.columns.difference([LABEL])]
        rf.fit(X_train, chunk[LABEL])
        if index != len(chunk_list) - 1:
            rf.n_estimators += n_estimators
    X_test = TEST[TEST.columns.difference([LABEL])]
    y_predict = rf.predict(X_test)
    return y_predict
    

print('Getting RF Predictions...')
#Y_PREDICT_RF = rf_train_warm_start(TRAINING_LIST)
Y_PREDICT_RF = rf_train_warm_start([TRAIN], n_estimators=50)

Getting RF Predictions...
building tree 1 of 50


[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    0.6s remaining:    0.0s


building tree 2 of 50
building tree 3 of 50
building tree 4 of 50
building tree 5 of 50
building tree 6 of 50
building tree 7 of 50
building tree 8 of 50
building tree 9 of 50
building tree 10 of 50
building tree 11 of 50
building tree 12 of 50
building tree 13 of 50
building tree 14 of 50
building tree 15 of 50
building tree 16 of 50
building tree 17 of 50
building tree 18 of 50
building tree 19 of 50
building tree 20 of 50
building tree 21 of 50
building tree 22 of 50
building tree 23 of 50
building tree 24 of 50
building tree 25 of 50
building tree 26 of 50
building tree 27 of 50
building tree 28 of 50
building tree 29 of 50
building tree 30 of 50
building tree 31 of 50
building tree 32 of 50
building tree 33 of 50
building tree 34 of 50
building tree 35 of 50
building tree 36 of 50
building tree 37 of 50
building tree 38 of 50
building tree 39 of 50
building tree 40 of 50
building tree 41 of 50
building tree 42 of 50
building tree 43 of 50
building tree 44 of 50
building tree 45 of

[Parallel(n_jobs=1)]: Done  50 out of  50 | elapsed:   32.6s finished
[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=1)]: Done  50 out of  50 | elapsed:    0.3s finished


In [6]:
from sklearn import metrics
import numpy as np

def calc_rmse(y_test, y_prediction):
    # Calculating "Mean Square Error" (MSE):
    mse = metrics.mean_squared_error(y_test, y_prediction)

    # Using numpy sqrt function to take the square root and calculate "Root Mean Square Error" (RMSE)
    return np.sqrt(mse)

print(f'SGB RMSE: {calc_rmse(TEST[LABEL], Y_PREDICT_SGD)}')
print(f'RF RMSE: {calc_rmse(TEST[LABEL], Y_PREDICT_RF)}')

SGB RMSE: 7.099097980021045
RF RMSE: 3.7180514405873137
