In [None]:
%%capture
!pip install pyspark_dist_explore
!pip install chart-studio
!pip install -q kaggle

In [None]:
# Work With Files
from google.cloud import storage
import os

# Useful libraries:
from time import time
import numpy as np
import pandas as pd
import math

# To Plot:
import matplotlib.pyplot as plt
import seaborn as sns
import chart_studio.plotly as py
import plotly.graph_objs as go

# Pyspark Lib:
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark import SparkContext, SparkConf

from pyspark_dist_explore import hist

# Preprocess:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import MinMaxScaler

# Pysparl ML:
from pyspark.ml.regression import LinearRegression

In [None]:
PATH_BUCKET = 'gs://nyc_comp_bk/'
PATH_DATA = '/home/ubuntu/NYC_Taxi/data/'

In [None]:
os.chdir('/home/ubuntu/NYC_Taxi/')

In [None]:
class Work_On_Bucket():
    
    def __init__(self, bucket_name):
        # Get access to the bucket:
        storage_client = storage.Client()
        self.bucket = storage_client.get_bucket(bucket_name)
        
    def get_file_from_bucket(self, file_name, save_path):
        # Download the file:
        blob = self.bucket.blob(file_name)
        blob.download_to_filename(''.join([save_path, file_name]))
            
    def upload_file_to_bucket(self, file_name, folder_path):
        # Upload the File
        object_to_save = self.bucket.blob(file_name)
        object_to_save.upload_from_filename(folder_path + file_name)


### Get the Dataset:

In [None]:
Bucket = Work_On_Bucket('nyc_comp_bk')

In [None]:
# Set kaggle:
! mkdir ~/.kaggle
Bucket.get_file_from_bucket('kaggle.json', '/home/ubuntu/NYC_Taxi/')
! cp /home/ubuntu/NYC_Taxi/kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json

In [None]:
# Download The Dataset
!kaggle competitions download -c new-york-city-taxi-fare-prediction

# Unzip the Files
! unzip new-york-city-taxi-fare-prediction.zip -d /home/ubuntu/NYC_Taxi/data/
! rm new-york-city-taxi-fare-prediction.zip

# Upload databses to bucket:
print('Start Uploding!')
Bucket.upload_file_to_bucket('train.csv', PATH_DATA)
Bucket.upload_file_to_bucket('test.csv', PATH_DATA)
print('Succesfully Uploaded!')

### Preliminary Steps (Load + Checks):

In [None]:
# Load Data:
train = spark.read.load(PATH_BUCKET+"train.csv", format="csv", inferSchema="true", header="true")
test = spark.read.load(PATH_BUCKET+"test.csv", format="csv", inferSchema="true", header="true")

# Load Test (Because pyspark changes the timestamp):
Bucket.get_file_from_bucket('test.csv', '')
original_test = pd.read_csv('test.csv')

In [None]:
# Get DB shape:
ncol = len(train.columns)
nrow = train.count()
print("The shape of the dataset is {:d} rows by {:d} columns".format(nrow, ncol))

In [None]:
# Get the schema:
train.printSchema()

In [None]:
# Show some basic Statistics:
stats = train.select(train.columns[1:]).describe()
stats.toPandas()

In [None]:
# Check Nulls:
for c in train.columns[2:]:
    nans = train.where(col(c).isNull()).count()
    print('{:s}: {:d}'.format(c, nans))

In [None]:
# Remove Rows with Missing Values:
train = train.na.drop(how='any')

In [None]:
# Check Duplicates:
print('The Duplicates are: {:d}'.format(train.count()-train.distinct().count()))

In [None]:
# Remove Duplicates:
train = train.distinct()

### Work With the Fare Price (Dependent Variale):

Checking the statistics, it is clear that some values are not possible, I am going to remove negatives values and all values that are lower that 2.50 dollars (https://nymag.com/nymetro/urban/features/taxi/n_20286/). Hence, I will keep all values greather than 2.50 dollars. By taking into account the max value, it is clear that some outlayers are present, nobady would want to spend 93,963 dollars for a trip. Working with the quantiles it is clear that there is somenthing wrong with the fare amount, the median is 999.9 dollars. As starting point let's remove what I am pretty sure that is an outlayer, namely everything over the fourth quntile, which represents all values greater than 93963.36 dollars. Computing again the quantiles now the fourth quantile is 999.99 dollars, Checking the other variables associated with these high fare amounts, it comes up that some of them doesn't make any sense: no changes in lat and long pick up and drop off, too small journey with one or two people for a so high price, missing values for lat and long and so on. I am going to keep al observations with fear amount less than 999.99 dollars, hopping that with the EDA I can clean more the data.

In [None]:
# Get statistics:
train.select('fare_amount').describe().toPandas()

In [None]:
# Remove Values lower than 2.50$:
train = train.filter('fare_amount > 2.50')

In [None]:
# Compute Quantiles:
train.approxQuantile('fare_amount', [0.25, 0.50, 0.75, 0.975], 0.25)

In [None]:
# Remove Outlayers:
train = train.filter('fare_amount < 93963.36')

In [None]:
# Compute Again Quantiles
train.approxQuantile('fare_amount', [0.25, 0.50, 0.75, 0.975], 0.25)

In [None]:
# Remove other Outlayers:
train = train.filter('fare_amount < 999.99')

In [None]:
# Check Quantiles:
train.approxQuantile('fare_amount', [0.25, 0.50, 0.75, 0.975], 0.25)

### Create My Base Line:

As Base Line I am going to use a Multiple Linear Regression that takes as input all the scaled (mean=0, sd=1) numerical variables.
As result I get an RMSE = 9.40719 on the Test.

In [None]:
NUMERICAL = ['pickup_longitude',
            'pickup_latitude',
            'dropoff_longitude',
            'dropoff_latitude',
            'passenger_count']
TARGET = 'fare_amount'

In [None]:
# Get the feature Vector:
assembler = VectorAssembler(inputCols=NUMERICAL, outputCol="features")
train_df = assembler.transform(train)
train_df = train_df.select('features', TARGET)

In [None]:
# Scale Data:
scaler = StandardScaler(inputCol="features", outputCol="ScaledFeatures")
scalerModel = scaler.fit(train_df)
train_df = scalerModel.transform(train_df)
train_df = train_df.select('ScaledFeatures', TARGET)

In [None]:
# Run the Linear Regression:
lr = LinearRegression(featuresCol="ScaledFeatures", labelCol=TARGET, maxIter=10)
lr_model = lr.fit(train_df)

# Print Stats:
training_result = lr_model.summary
print("***** Training Set *****")
print("RMSE: {:.3f}".format(training_result.rootMeanSquaredError))
print("MAE: {:.3f}".format(training_result.meanAbsoluteError))
print("R2: {:.3f}".format(training_result.r2))
print("***** Training Set *****")

In [None]:
# Prepare the test:
test_df = assembler.transform(test.select(NUMERICAL))
test_df = test_df.select('features')
test_df = scalerModel.transform(test_df)

# Make Predictions:
predictions = lr_model.transform(test_df).select('prediction').withColumnRenamed('prediction','fare_amount').toPandas()

# Prepare the Submission:
submission = pd.concat([original_test['key'], predictions['fare_amount']], axis=1)
submission.to_csv('submission.csv', index=False)

# Submit:
!kaggle competitions submit -c new-york-city-taxi-fare-prediction -f submission.csv -m "First Submission"
!kaggle competitions submissions -c new-york-city-taxi-fare-prediction

### EDA

#### 1) Latitude and Longitude

From the study of the quantiles it is clear that is plenty of outlayers, because the NYC coordinates are (lat=40.730610 lon=-73.935242). Even working with quantiles, removing anything out of the I and III quartile things don't change so much. Therefore, I opt for anothe option, I draw a triangle around the area of interest, a triangle because NY City is on an icleand which reminds a triangle, but the area is bigger than the iceland, to take into acocunt possible trips from the countriside. Then I remove all observations for which the pick up position or drop off position outside the area.
To decide whether a point is in the triangle I walk clockwise or counterclockwise around the triangle and project the point onto the segment I am crossing by using the dot product. Finally, check that the vector created is on the same side for each of the triangle's segments.

The it is possible to compute new features that can be useful both for the model and to remove more outlayers, namely the Absolute Distance, the Haversinee Distance and the Minkowski Distance (that for p=1 is the Manhattan and for p=2 is the Euclidean distance). All the distances are expressed in kilometers. To obtain some of them I need to know to how much km corresponds the variation of 1 degree for each coordinates and it turns put that for Latitude it is 111 Km and for Longitude is 85 Km. Checking the statistics of th emetrics it is clear that there are many outlayers. To drop them I am going to use a kind of voting, for each observation I get the mean of manhattan, euclidean and haversine distances metrics and remove the observation if its distance mean is less than 0.5 (namel less than 500m trip).

Exploiting the function to compute the haversian distance (which is the more realistic) I compute the distanes from the airports, both for pick up and drop off and I create a column that says whether the pick up or dorp off happened near by the airport. It is possible that there the costs are higher or maybe fixed. It is possible to check using a groupby that the average price is far away higher than the one for a standard trip. Furthermore, for rhe most of them the standard deviation shows that maybe the prices are not fixed or anyway that the prices can change up to +/-30$ for the most volatile airport (EWR).

Then I suppose that a there must be a relationship between fear amount and length of the trip (it maybe also connected with duration). Hence, I create a variable tha distinguish among trips under the mean and over the mean. It seems that my hypothesis is confirmed.

In [None]:
# Check I, II and III Quantiles:
np.array([train.approxQuantile('pickup_longitude', [0.25, 0.50, 0.75], 0.25),
          train.approxQuantile('pickup_latitude', [0.25, 0.50, 0.75], 0.25),
          train.approxQuantile('dropoff_longitude', [0.25, 0.50, 0.75], 0.25),
          train.approxQuantile('dropoff_latitude', [0.25, 0.50, 0.75], 0.25)])

In [None]:
# Write function to check if a point is a triangle:

def point_in_triangle(lat, long):
    """
    phuclv (https://stackoverflow.com/questions/2049582/how-to-determine-if-a-point-is-in-a-2d-triangle)
    Returns True if the point is inside the triangle
    and returns False if it falls outside.
    - The argument *point* is a tuple with two elements
    containing the X,Y coordinates respectively.
    - The argument *triangle* is a tuple with three elements each
    element consisting of a tuple of X,Y coordinates.
    """
    # Unpack arguments
    x = lat
    y = long
    ax, ay = (39.74274655286439, -75.26399018397262)
    bx, by = (41.58840050439113, -75.18708588709761)
    cx, cy = (41.07279488244855, -71.76484467616011)
    # Segment A to B
    side_1 = (x - bx) * (ay - by) - (ax - bx) * (y - by)
    # Segment B to C
    side_2 = (x - cx) * (by - cy) - (bx - cx) * (y - cy)
    # Segment C to A
    side_3 = (x - ax) * (cy - ay) - (cx - ax) * (y - ay)
    # All the signs must be positive or all negative
    return (side_1 < 0.0) == (side_2 < 0.0) == (side_3 < 0.0)

def is_in_area(lat1, lon1, lat2, lon2):
    
    pick = point_in_triangle(lat1, lon1)
    drop = point_in_triangle(lat2, lon2)
    
    return all([pick, drop])

is_in_area_udf = F.udf(is_in_area, BooleanType())

In [None]:
# Remove any point outside the given area:
train = train.withColumn('keep', is_in_area_udf( col('pickup_latitude'), col('pickup_longitude'), col('dropoff_latitude'), col('dropoff_longitude') ) )
train = train.filter("keep == True")
train = train.drop('keep')

In [None]:
train.select(col('pickup_latitude'), col('pickup_longitude'), col('dropoff_latitude'), col('dropoff_longitude')).describe().show()

In [None]:
# Write Functions needed to Create Coordinates Features

class Coordinates_Transform():

    def __init__(self, df):

        self.df = df
        self.cols = df.columns
        self.haversine_udf = F.udf(Coordinates_Transform.haversine_stat, DoubleType())
        self.direction_udf = F.udf(Coordinates_Transform.calculate_dir, DoubleType())
        self.airports_udf = F.udf(Coordinates_Transform.identify_airports, StringType())

    @staticmethod
    def haversine_stat(pick_lat, drop_lat, pick_long, drop_long):

        longit_a, latit_a, longit_b, latit_b = map(math.radians, [pick_long,  pick_lat, drop_long, drop_lat])
        dist_longit = longit_b - longit_a
        dist_latit = latit_b - latit_a
        # Calculate area
        area = math.sin(dist_latit/2)**2 + math.cos(latit_a) * math.cos(latit_b) * math.sin(dist_longit/2)**2
        # Calculate the central angle
        central_angle = 2 * math.asin(math.sqrt(area))
        RADIUS = 6371
        # Calculate Distance
        distance = central_angle * RADIUS
        return round(distance, 3)

    def add_dist_metrics(self):

        # Absolute Lat e Long Distance:
        self.df = self.df.withColumn( 'abs_dist_longitude', F.round(F.abs( col('pickup_longitude') - col('dropoff_longitude') ) * 85, 3 ) )
        self.df = self.df.withColumn( 'abs_dist_latitude', F.round( F.abs( col('pickup_latitude') - col('dropoff_latitude') ) * 111 , 3 ) )

        # Manhattan Distance:
        self.df = self.df.withColumn( 'manhattan_dist', F.round( col('abs_dist_longitude') + col('abs_dist_latitude'), 3 ) )

        # Euclidean Distance:
        self.df = self.df.withColumn( 'euclidean_dist',  F.round( F.sqrt( col('abs_dist_longitude')**2 + col('abs_dist_latitude')**2 ), 3 ) )

        # Haversine Distance:
        self.df = self.df.withColumn( 'haversine_dist', 
                                     self.haversine_udf( col('pickup_latitude'), col('dropoff_latitude'), col('pickup_longitude'), col('dropoff_longitude') ) ) 

        return self.df
    
    @staticmethod
    def calculate_dir(lat1, lon1, lat2, lon2, euclid):
        # To improve cmputational time, use the df column ad hoc
        # I cannot do it because I have compute the abs

        delta_lon = lon2 - lon1
        delta_lat = lat2 - lat1
        
        return math.atan2( delta_lat, delta_long) * 180/math.pi
        '''
        if delta_lon > 0:
            return (180/math.pi) * math.asin( delta_lat/(euclid+0.001) )
        elif delta_lon < 0 and delta_lat > 0:
            return 180 - (180/math.pi) * math.asin( delta_lat/(euclid+0.001) )
        elif delta_lon < 0 and delta_lat < 0:
            return -180 - (180/math.pi) * math.asin( delta_lat/(euclid+0.001) )
        '''
    def direction(self):
        self.df = self.df.withColumn('direction', self.direction_udf( col('pickup_latitude'), col('pickup_longitude'), col('dropoff_latitude'), 
                                                                     col('dropoff_longitude'), col('euclidean_dist') ))
        return self.df

    @staticmethod
    def identify_airports(pick_lat, drop_lat, pick_long, drop_long):

        # Set the coordinates of airports:
        JFK_LAT = 40.641766
        JFK_LON = -73.780968

        LGR_LAT = 40.773013
        LGR_LON = -73.870229

        EWR_LAT = 40.689531
        EWR_LON = -74.174462

        # Compute distances:
        pick_up_jfk = Coordinates_Transform.haversine_stat(JFK_LAT, pick_lat, JFK_LON, pick_long)
        drop_off_jfk = Coordinates_Transform.haversine_stat(drop_lat, JFK_LAT, drop_long, JFK_LON)

        pick_up_lgr = Coordinates_Transform.haversine_stat(LGR_LAT, pick_lat, LGR_LON, pick_long)
        drop_off_lgr = Coordinates_Transform.haversine_stat(drop_lat, LGR_LAT, drop_long, LGR_LON)

        pick_up_ewr = Coordinates_Transform.haversine_stat(EWR_LAT, pick_lat, EWR_LON, pick_long)
        drop_off_ewr = Coordinates_Transform.haversine_stat(drop_lat, EWR_LAT, drop_long, EWR_LON)

        # Assign a value:
        if pick_up_jfk < 1:
            return 'PICK_JFK'
        elif drop_off_jfk < 1:
            return 'DROP_JFK'
        if pick_up_lgr < 1:
            return 'PICK_LGR'
        elif drop_off_lgr < 1:
            return 'DROP_LGR'
        if pick_up_ewr < 1:
            return 'PICK_EWR'
        elif drop_off_ewr < 1:
            return 'DROP_EWR'
        else:
            return 'NO_AIRPORT'
  
    def airports(self):

        self.df = self.df.withColumn('airport', self.airports_udf( col('pickup_latitude'), col('dropoff_latitude'), col('pickup_longitude'), col('dropoff_longitude') ))
        return self.df
    
    def long_short_trip(self):
        
        mean_val = self.df.agg(F.avg(col('haversine_dist'))).collect()[0][0]
        self.df = self.df.withColumn( 'long_short', F.when(col('haversine_dist') <= mean_val, 'SHORT').otherwise('LONG') )
        return self.df


In [None]:
# Add Distance metrics:
coordTransform = Coordinates_Transform(train)
train = coordTransform.add_dist_metrics()

In [None]:
# Let's Check df:
train.select('abs_dist_longitude', 'abs_dist_latitude', 'manhattan_dist', 'euclidean_dist', 'haversine_dist').describe().toPandas()

In [None]:
# Let's remove outlayers:
DIST_METRICS = ['manhattan_dist', 'euclidean_dist', 'haversine_dist']
n = len(DIST_METRICS)
row_mean  = (sum(col(x) for x in DIST_METRICS) / n).alias("mean_dist")
train = train.where( (row_mean >= 0.5) )

In [None]:
train.select('abs_dist_longitude', 'abs_dist_latitude', 'manhattan_dist', 'euclidean_dist', 'haversine_dist').describe().toPandas()

In [None]:
# Add Directions:
train = coordTransform.direction()

In [None]:
fig, ax = plt.subplots()
hist(ax, train.select('direction'), bins = 180)
plt.show()

In [None]:
# Let's Add when starting point or arrival is at the airport:
train = coordTransform.airports()

In [None]:
# Check avg price for the airports:
train.select('airport','fare_amount').groupBy('airport').agg(F.mean('fare_amount'), F.stddev('fare_amount'), F.count('fare_amount')).sort('avg(fare_amount)').show()

In [None]:
# Add a binning for the length, based on the haversine distance:
train = coordTransform.long_short_trip()

In [None]:
# Check avg price for binned trip length:
train.select('long_short' ,'fare_amount').groupBy('long_short').agg(F.mean('fare_amount'), F.stddev('fare_amount'), F.count('fare_amount')).show()