In [1]:
import numpy as np
import pandas as pd
import scipy
import matplotlib.pyplot as plt
import datetime
import math
from sklearn.datasets import dump_svmlight_file

# initialize spark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName("taxi_task").getOrCreate()

In [2]:
# this section is for helper functions, wrappers and so
def compute_distance(start_lat, start_lon, end_lat, end_lon):
    """ returns distance between the points in km """
    R = 6373.0
    lon1 = math.radians(start_lon)
    lat1 = math.radians(start_lat)
    lat2 = math.radians(end_lat)
    lon2 = math.radians(end_lon)

    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
    return round(R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)), 2)


def datetime_wrapper(datetime_string):
    return datetime.datetime.strptime(datetime_string, '%Y-%m-%d %H:%M:%S')


def convert_date_to_numbers(df, column_name, custom_format):
    """ datetime to float.timestamp """
    df[column_name] = df[column_name].apply(lambda x:datetime.datetime.strptime(x, custom_format).timestamp())

    
def from_timestamp_to_date(timestamp, date_type=False):
    """ reverse encoding of the datetime column """
    res = datetime.datetime.fromtimestamp(timestamp)
    return res if date_type else str(res)
    
    
def convert_date_to_features(df, column_name, prefix):
    """
    Converts datetime into set of features
    features: day of the week, day of the month, month, hour
    """
    df['{}_week_day'.format(prefix)] = df[column_name].apply(lambda x: datetime_wrapper(x).isoweekday()) 
    df['{}_month'.format(prefix)] = df[column_name].apply(lambda x: datetime_wrapper(x).month) 
    df['{}_hour'.format(prefix)] = df[column_name].apply(lambda x: datetime_wrapper(x).time().hour) 
    df['{}_time_min'.format(prefix)] = df[column_name].apply(lambda x: datetime_wrapper(x).time().minute) 

    
def create_groups(df, column, step):
    """creates a list of groups with assigned distance"""
    # get max, get min, create {goup_id:()}
    max_dist = np.amax(df[column])
    min_dist = np.amin(df[column])
    groups = [(min_dist + i*step, min_dist + step+i*step) for i in range(0, int(max_dist/step))]
    return {i+1:groups[i] for i in range(0,len(groups))}


def encode_group(variable, groups):
    """ technically is a copmosed feature """
    for key, value in groups.items():
        if value[0] <= variable < value[1]:
            return int(key)
        else:
            -1

            
def vendor_code(x):
    """
    encoding created manually with the help of:
    pd.DataFrame({'count' : original_data.groupby( ["vendor_id"] ).size()}).reset_index()
    """
    coding = {'Quito':1, 'Quito Cabify Executive':2, 'Quito Cabify Lite':3, 'Quito UberX':4}
    if x in coding.keys():
        return coding[x]
    else:
        return -1
            
            
def encode_vendor(df, column_to_update):
    """ replace string features with numeric """
    df[column_to_update] = df[column_to_update].apply(lambda x: vendor_code(x))
            
            
def add_groupping_feature(df, group, new_column, base_column):
    """ adds column based on group """
    df[new_column] = df[base_column].apply(lambda x: encode_group(x, group)) 
            

def split_data(df, split_size, seed=None, fixed_split=False):
    """ random split """
    rows = df.shape[0]
    if not fixed_split:
        if seed is not None:
            train = df.sample(frac=split_size, random_state=seed)
        else:
            train = df.sample(frac=split_size)
        test = df.drop(train.index)
    else:
        train = df[:math.floor(rows * split_size)]
        test = df[math.floor(rows * split_size):]
    return train, test


def convert_to_libsvm(df_features, flat_array_target, path_to_save):
    """ write dataframe to txt in libsvm form """
    dump_svmlight_file(df_features, flat_array_target, path_to_save, zero_based=False)


def train_and_predict(train_set, test_set, seed, trees=10, depth=20, output=None):
    rf = RandomForestRegressor(numTrees=trees, maxDepth=depth, seed=seed)
    model = rf.fit(train_set)
    predictions = model.transform(test_set)
    predictions.select("prediction", "label", "features").show(10)
    if output is not None:
        predictions_formatted = np.array(predictions.select('prediction').collect()).tolist()
        import json
        with open("tmp/{}".format(output), 'w') as fp:
            json.dump({"predict":predictions_formatted}, fp)
    return predictions
    
    
def collect_error(prediction, expected_value):
    pred = np.array(prediction.select('prediction').collect())
    MSE = np.sum(np.square(np.subtract(expected_value, pred))) / expected_value.shape[0]
    mean = np.mean(expected_value)
    total = np.sum(np.square(np.subtract(expected_value, mean)))
    explained = np.sum(np.square(np.subtract(expected_value, pred)))
    return np.sqrt(MSE), 1 - (explained/total)
    
    
def combined_steps(original_data, target_column, seed, trees=10, depth=20, skip_save=False, 
                   train_set=None, test_set=None, output=None, fixed_split=True):
    # split
    train, test = split_data(original_data, 0.8, fixed_split=fixed_split)

    # format
    train_target = train[[target_column]]
    train_features = train.drop([target_column], axis=1, inplace=False)
    test_target = test[[target_column]]
    test_features = test.drop([target_column], axis=1, inplace=False)

    if not skip_save:
        convert_to_libsvm(train_features.as_matrix(), train_target.as_matrix().ravel(), 'data/train_set.txt')
        convert_to_libsvm(test_features.as_matrix(), test_target.as_matrix().ravel(), 'data/test_set.txt')
    train_set = spark.read.format("libsvm").load("data/train_set.txt")
    test_set = spark.read.format("libsvm").load("data/test_set.txt")

    # train and predict
    predictions = train_and_predict(train_set, test_set, seed, trees, depth, output)
      
    #collect error
    mse, r_2 = collect_error(predictions, test_target.as_matrix())
    print("MSE is {}".format(mse))
    print("r_2 is {}".format(r_2))
    return predictions, mse, r_2
        

In [3]:
original_data = pd.read_csv("data/raw.csv", encoding='utf-8')

convert_date_to_numbers(original_data, 'pickup_datetime', '%Y-%m-%d %H:%M:%S')
convert_date_to_numbers(original_data, 'dropoff_datetime', '%Y-%m-%d %H:%M:%S')
encode_vendor(original_data, 'vendor_id')
original_data = original_data.drop(['id', 'store_and_fwd_flag', 'dropoff_datetime'], axis=1, inplace=False)
# let's also scale all to minutes and km (humanly it will be asier to see the problems if any left)
original_data['trip_duration'] = original_data['trip_duration'].apply(lambda x: float("{0:.2f}".format(x / 60)))
original_data['wait_min'] = original_data['wait_sec'].apply(lambda x: float("{0:.2f}".format(x / 60)))
original_data['dist_km'] = original_data['dist_meters'].apply(lambda x: float("{0:.2f}".format(x / 1000)))
original_data.drop(['wait_sec', 'dist_meters'],axis=1, inplace=True)


original_data.head()


Unnamed: 0,vendor_id,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,trip_duration,wait_min,dist_km
0,1,1474105000.0,-78.503922,-0.232824,-78.549447,-0.361363,33.75,5.77,24.23
1,1,1474171000.0,-78.440306,0.006789,-78.490786,-0.104836,23.17,7.6,16.68
2,1,1474007000.0,-78.469551,-0.143636,-78.470277,-0.131083,2171.95,4.83,1.99
3,1,1474178000.0,-78.558076,-0.278241,-78.43022,-0.201934,32.22,5.03,25.15
4,1,1474186000.0,-78.521818,-0.20848,-78.504558,-0.222462,15.8,5.23,4.62


In [4]:
# Three is definetly something wrong with the data. The outliers are insane -> pre-cleaning is required 
# This section makes sense regarding distance traveled
# since we are talking about city rides we should throw out all distances bigger than diameter of the city
# i've taken bounding box from opencage as the measure:
# "bounds": {
#         "northeast": {
#           "lat": -0.3586514,
#           "lng": -78.480172
#         },
#         "southwest": {
#           "lat": -0.462567,
#           "lng": -78.5506486
#         }
#       },
# although the shape of the city is very much not circle, i still will take it as an approximation
# distance computed via http://boulter.com/gps/distance. I could have implemented the function, 
# but it's one time use
# diameter = 13.91km = 13910m
# people might take taxi to the airport, which is approx 45km away from center -> thresold = 2*(45000-13910/2) = 76090 
original_data = original_data[original_data['dist_km'] <= 76090 / 1000]
# original_data = original_data[original_data['dist_km'] >= 500 / 1000]
max_dist = np.amax(original_data['dist_km'])
min_dist = np.amin(original_data['dist_km'])
avg_dist = np.average(original_data['dist_km'])
print("Raw distance in km. Max:{}, avg:{}, min:{} km".format(max_dist, avg_dist, min_dist))
groups = create_groups(original_data, 'dist_km', 6)
add_groupping_feature(original_data, groups, 'dist_group', 'dist_km')
pd.DataFrame({'count' : original_data.groupby( ["dist_group"] ).size()}).reset_index()


Raw distance in km. Max:75.48, avg:6.055341867096454, min:0.01 km


Unnamed: 0,dist_group,count
0,1.0,21629
1,2.0,6962
2,3.0,2012
3,4.0,760
4,5.0,380
5,6.0,212
6,7.0,122
7,8.0,82
8,9.0,32
9,10.0,27


In [5]:
# Here trip_duartion is cleaned
original_data = original_data[original_data['trip_duration'] <= 4*60]# saw a negative number where it should not be
original_data = original_data[original_data['trip_duration'] >= 5] # I don;t think calling a taxi for 5 minutes ride is a thing
max_time = np.amax(original_data['trip_duration'])
min_time = np.amin(original_data['trip_duration'])
avg_time = np.average(original_data['trip_duration'])
print("Raw time in s. Max:{}, avg:{}, min:{}".format(max_time, avg_time, min_time))
groups = create_groups(original_data, 'trip_duration', 15)
add_groupping_feature(original_data, groups, 'trip_duration_group', 'trip_duration')
pd.DataFrame({'count' : original_data.groupby( ["trip_duration_group"] ).size()}).reset_index()


Raw time in s. Max:239.43, avg:24.119570977432026, min:5.0


Unnamed: 0,trip_duration_group,count
0,1.0,16288
1,2.0,5581
2,3.0,1842
3,4.0,788
4,5.0,417
5,6.0,250
6,7.0,179
7,8.0,132
8,9.0,113
9,10.0,86


In [6]:
# we still can see very long taxi rides, 
# but we can't throw them out yet -> one of the additional features actually will take care of it 

In [7]:
# wait_sec causes very weird problems -> clean it
# let's have a sanity check and do not allow waiting larger that duration
original_data = original_data[original_data['wait_min'] < original_data['trip_duration']]
original_data = original_data[original_data['wait_min'] >= 5]
original_data = original_data[original_data['wait_min'] <= 3*60]
max_time = np.amax(original_data['wait_min'])
min_time = np.amin(original_data['wait_min'])
avg_time = np.average(original_data['wait_min'])
print("Raw time in min. Max:{}, avg:{}, min:{}".format(max_time, avg_time, min_time))

# the biggest waiting time is about 3 Hours, which can happen unfortunately 
groups = create_groups(original_data, 'wait_min', 10)
add_groupping_feature(original_data, groups, 'waiting_group', 'wait_min')
pd.DataFrame({'count' : original_data.groupby( ["waiting_group"] ).size()}).reset_index()

Raw time in min. Max:179.43, avg:14.033071020547363, min:5.0


Unnamed: 0,waiting_group,count
0,1.0,9094
1,2.0,1456
2,3.0,452
3,4.0,220
4,5.0,138
5,6.0,92
6,7.0,64
7,8.0,50
8,9.0,47
9,10.0,28


In [8]:
original_data.drop(['waiting_group', 'dist_group', 'trip_duration_group'], axis=1, inplace=True)
print("{} variables given to the model".format(original_data.shape[1]))
original_data.head()


9 variables given to the model


Unnamed: 0,vendor_id,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,trip_duration,wait_min,dist_km
0,1,1474105000.0,-78.503922,-0.232824,-78.549447,-0.361363,33.75,5.77,24.23
1,1,1474171000.0,-78.440306,0.006789,-78.490786,-0.104836,23.17,7.6,16.68
3,1,1474178000.0,-78.558076,-0.278241,-78.43022,-0.201934,32.22,5.03,25.15
4,1,1474186000.0,-78.521818,-0.20848,-78.504558,-0.222462,15.8,5.23,4.62
5,1,1472272000.0,-78.509202,-0.194839,-78.518764,-0.228794,25.17,10.5,5.82


In [9]:
predictions, MSE, r_2 = combined_steps(original_data, 'wait_min', 42, 100, 8, False)

+------------------+------+--------------------+
|        prediction| label|            features|
+------------------+------+--------------------+
|36.771297458577074|  67.0|(8,[0,1,2,3,4,5,6...|
|12.311912212649261|  6.73|(8,[0,1,2,3,4,5,6...|
| 9.146151168208458|  5.97|(8,[0,1,2,3,4,5,6...|
|13.056215779239103| 12.57|(8,[0,1,2,3,4,5,6...|
| 55.99799072293616|118.95|(8,[0,1,2,3,4,5,6...|
| 55.99799072293616|118.95|(8,[0,1,2,3,4,5,6...|
| 9.179673247802718|   5.5|(8,[0,1,2,3,4,5,6...|
|  7.54440628965364|  5.93|(8,[0,1,2,3,4,5,6...|
|10.431819014278478| 11.17|(8,[0,1,2,3,4,5,6...|
|10.252380014277664| 15.83|(8,[0,1,2,3,4,5,6...|
+------------------+------+--------------------+
only showing top 10 rows

MSE is 10.80004374770733
r_2 is 0.567790915246017


In [10]:
# simple analysis and same trick as in the previous example
data = original_data.copy()
data['pickup_datetime'] = data['pickup_datetime'].apply(lambda x: from_timestamp_to_date(x))
convert_date_to_features(data, 'pickup_datetime', 'pick')
data.drop(['pickup_datetime'], axis=1, inplace=True)  # 'pick_time_min'
print("{} variables given to the model".format(data.shape[1]))
data.head()


12 variables given to the model


Unnamed: 0,vendor_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,trip_duration,wait_min,dist_km,pick_week_day,pick_month,pick_hour,pick_time_min
0,1,-78.503922,-0.232824,-78.549447,-0.361363,33.75,5.77,24.23,6,9,9,32
1,1,-78.440306,0.006789,-78.490786,-0.104836,23.17,7.6,16.68,7,9,4,3
3,1,-78.558076,-0.278241,-78.43022,-0.201934,32.22,5.03,25.15,7,9,5,57
4,1,-78.521818,-0.20848,-78.504558,-0.222462,15.8,5.23,4.62,7,9,7,58
5,1,-78.509202,-0.194839,-78.518764,-0.228794,25.17,10.5,5.82,6,8,4,19


In [11]:
predictions, MSE, r_2 = combined_steps(original_data=data, target_column='wait_min', seed=42, trees=100, depth=8, skip_save=False)


+------------------+------+--------------------+
|        prediction| label|            features|
+------------------+------+--------------------+
| 35.15021411564223|  67.0|(11,[0,1,2,3,4,5,...|
|11.012338174781698|  6.73|(11,[0,1,2,3,4,5,...|
| 9.318797224035038|  5.97|(11,[0,1,2,3,4,5,...|
| 12.85191960709612| 12.57|(11,[0,1,2,3,4,5,...|
|50.836190524238255|118.95|(11,[0,1,2,3,4,5,...|
|50.836190524238255|118.95|(11,[0,1,2,3,4,5,...|
| 9.667977478764813|   5.5|(11,[0,1,2,3,4,5,...|
| 7.421413264284455|  5.93|(11,[0,1,2,3,4,5,...|
| 10.84115216577494| 11.17|(11,[0,1,2,3,4,5,...|
|10.755804907164173| 15.83|(11,[0,1,2,3,4,5,...|
+------------------+------+--------------------+
only showing top 10 rows

MSE is 10.774298902038707
r_2 is 0.5698490354054475


In [12]:
# datetime extraction did not bring us too much in this form, thus let's use a bit different approach
# differently encoded features
# day of the week: 1 if working day, -1 if weekend 
# day of the month is taken out -> we will use the holiday features
# hour -> use 3 numbers: 1 (rush hour), 0 (working hour/evening), -1 - also depends on day of the week
# some information gathred: http://www.expat.com/forum/viewtopic.php?id=730124
# holiday -> 0 if a normal day, 1 if a holiday (information is static, taken from www.officeholidays.com)
# for this presentation i didn't consider "compensated days"
HOLIDAYS = [
    datetime.date(2016, 1, 1),
    datetime.date(2016, 2, 8),
    datetime.date(2016, 2, 9),
    datetime.date(2016, 3, 25),
    datetime.date(2016, 3, 27),
    datetime.date(2016, 5, 1),
    datetime.date(2016, 5, 27),
    datetime.date(2016, 8, 10),
    datetime.date(2016, 11, 2),
    datetime.date(2016, 11, 3),
    datetime.date(2016, 12, 6),
    datetime.date(2016, 12, 25),
    datetime.date(2017, 1, 1),
    datetime.date(2017, 2, 27),
    datetime.date(2017, 2, 28),
    datetime.date(2017, 4, 14),
    datetime.date(2017, 4, 16),
    datetime.date(2017, 5, 1),
    datetime.date(2017, 5, 24),
    datetime.date(2017, 8, 10),
    datetime.date(2017, 10, 9),
    datetime.date(2017, 11, 2),
    datetime.date(2017, 11, 3),
    datetime.date(2017, 12, 6),
    datetime.date(2017, 12, 25)
]

# Seasons/encoding: winter:4, spring/autumn:1, summer:2

def additional_time_features(timestamp):
    """
    1. Detects holiday -> sets appropriate weight to a variable 'holiday'
    2. Detects day of the week
    3. Detects hour
    4. based on the above collected information "traffic" feature is estimated
    if complex_depedency is chosen, day of the week and 

    """
    date_obj = from_timestamp_to_date(timestamp, True)
    return 1 if date_obj in HOLIDAYS else 0


In [13]:
data1 = original_data.copy()
data1['pickup_datetime'] = data1['pickup_datetime'].apply(lambda x: from_timestamp_to_date(x))
convert_date_to_features(data1, 'pickup_datetime', 'pick')
convert_date_to_numbers(data1, 'pickup_datetime', '%Y-%m-%d %H:%M:%S')
holidays = data1['pickup_datetime'].apply(lambda x: additional_time_features(x)).sum()
print("data set contains {} rides during holidays examples".format(holidays))
data1.drop(['pickup_datetime'], axis=1, inplace=True)
# Next feature won't work, since we don't have any rides on the holidays (weird?)

data set contains 0 rides during holidays examples


In [14]:
# According to some internet surfing people actually take city-to-city taxi rides quite often
# I actually think we should use 2 differently trained models for in-city and city-to-city rides 
# due to differences in the infrastructure (speed, distance duh, obstacles) and different distributions of the samples
# Quito is about 40 km (25 mi) long and 5 km (3.1 mi) at its widest -> 
# the next feature construction will work on coordinates and will pack within the bounding box
# the coordinates were set manually with the help of https://www.openstreetmap.org/
# bound_quito = np.array([
#     [-0.0742, -78.4499],
#     [-0.0745, -78.5447],
#     [-0.3619, -78.5145],
#     [-0.3557, -78.6185]
# ])


# simple hack - if dist from pick-up/drop to the center is larger than Radius+delta (28km) then the ride does not count
def in_city_trip(start_lat, start_lon):
    max_distance = 20  # km distance
    actual_distance = compute_distance(start_lat, start_lon, -0.2156, -78.5172)
    return 1 if actual_distance <= max_distance else 0

In [15]:
# clean up data and retrain the model 
data2 = data1.copy()
data2['pickup_coordinate'] = data2.apply(lambda x: in_city_trip(x.pickup_latitude, x.pickup_longitude), axis=1)
data2['dropoff_coordinate'] = data2.apply(lambda x: in_city_trip(x.dropoff_latitude, x.dropoff_longitude), axis=1)
data2 = data2[data2['pickup_coordinate'] == 1]
data2 = data2[data2['dropoff_coordinate'] == 1]
data2.drop(['pickup_coordinate', 'dropoff_coordinate'], axis=1, inplace=True)
# data2.drop(['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude'], axis=1, inplace=True)
print("{} variables given to the model".format(data2.shape[1]))


12 variables given to the model


In [16]:
data2.head()

Unnamed: 0,vendor_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,trip_duration,wait_min,dist_km,pick_week_day,pick_month,pick_hour,pick_time_min
0,1,-78.503922,-0.232824,-78.549447,-0.361363,33.75,5.77,24.23,6,9,9,32
3,1,-78.558076,-0.278241,-78.43022,-0.201934,32.22,5.03,25.15,7,9,5,57
4,1,-78.521818,-0.20848,-78.504558,-0.222462,15.8,5.23,4.62,7,9,7,58
5,1,-78.509202,-0.194839,-78.518764,-0.228794,25.17,10.5,5.82,6,8,4,19
6,1,-78.481542,-0.154317,-78.51842,-0.191547,22.35,8.5,7.59,7,9,10,8


In [17]:
predictions, MSE, r_2 = combined_steps(original_data=data2, target_column='wait_min', seed=42, trees=100, depth=8, skip_save=False)

+------------------+------+--------------------+
|        prediction| label|            features|
+------------------+------+--------------------+
| 7.937915087798783|  5.83|(11,[0,1,2,3,4,5,...|
|34.248656883819656|  67.0|(11,[0,1,2,3,4,5,...|
| 9.143328010281062|  5.97|(11,[0,1,2,3,4,5,...|
|12.655303382328492| 12.57|(11,[0,1,2,3,4,5,...|
| 53.16464171397478|118.95|(11,[0,1,2,3,4,5,...|
| 53.16464171397478|118.95|(11,[0,1,2,3,4,5,...|
| 9.370034323110355|   5.5|(11,[0,1,2,3,4,5,...|
|7.5911905674825535|  5.93|(11,[0,1,2,3,4,5,...|
|11.909439958033065| 11.17|(11,[0,1,2,3,4,5,...|
|10.296696675596515| 15.83|(11,[0,1,2,3,4,5,...|
+------------------+------+--------------------+
only showing top 10 rows

MSE is 10.504103769212618
r_2 is 0.5867303380353371


In [None]:
# TODO check the bounding box?