# **New York Taxi Fare Prediction**

In [None]:
!pwd
!ls
!python --version
#Hadoop Installation
!wget https://dlcdn.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xvzf spark-3.4.1-bin-hadoop3.tgz
!pip install findspark

In [None]:
import os
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NYC Taxi Fare Prediction").getOrCreate()
print(spark.sparkContext.appName)

NYC Taxi Fare Prediction


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
df = spark.read.csv("/content/drive/MyDrive/Big Data Project/NYC CSV.csv", header=True, inferSchema=True)

In [None]:
df.show()

+-------+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+
|    key|fare_amount|    pickup_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|
+-------+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+
|26:21.0|        4.5|2009-06-15 17:26:21|      -73.844311|      40.721319|        -73.84161|       40.712278|              1|
|52:16.0|       16.9|2010-01-05 16:52:16|      -74.016048|      40.711303|       -73.979268|       40.782004|              1|
|35:00.0|        5.7|2011-08-18 00:35:00|      -73.982738|       40.76127|       -73.991242|       40.750562|              2|
|30:42.0|        7.7|2012-04-21 04:30:42|       -73.98713|      40.733143|       -73.991567|       40.758092|              1|
|51:00.0|        5.3|2010-03-09 07:51:00|      -73.968095|      40.768008|       -73.956655|       40.783762|         

In [None]:
df.printSchema()

root
 |-- key: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, hour, dayofweek,count, when, isnan
from pyspark.sql.types import FloatType, DoubleType
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from geopy.distance import great_circle
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from scipy.stats import iqr

Null Value detection and Removal

In [None]:
def col_numeric(df, col_name):
    return isinstance(df.schema[col_name].dataType, (FloatType, DoubleType))

In [None]:
conditions = [
    count(when(isnan(c) | col(c).isNull(), c)).alias(c)
    if col_numeric(df, c) else count(when(col(c).isNull(), c)).alias(c)
    for c in df.columns
]

In [None]:
null_counts = df.select(conditions).show()

+---+-----------+---------------+----------------+---------------+-----------------+----------------+---------------+
|key|fare_amount|pickup_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|
+---+-----------+---------------+----------------+---------------+-----------------+----------------+---------------+
|  0|          0|              0|               0|              0|               10|              10|              0|
+---+-----------+---------------+----------------+---------------+-----------------+----------------+---------------+



Outlier Detection and removal

In [None]:
from pyspark.sql.functions import col, lit

import pyspark.sql.functions as F

# IQR Calculation
q1, q3 = df.approxQuantile("fare_amount", [0.25, 0.75], 0)
fare_iqr = q3 - q1


lower_bound = q1 - 1.5 * fare_iqr
upper_bound = q3 + 1.5 * fare_iqr


df = df.filter((col("fare_amount") >= lit(lower_bound)) & (col("fare_amount") <= lit(upper_bound)))


Calculate Distance

In [None]:
def calculate_distance(pickup_lat, pickup_long, dropoff_lat, dropoff_long):
    if pickup_lat is None or pickup_long is None or dropoff_lat is None or dropoff_long is None:
        return None
    if -90 <= pickup_lat <= 90 and -180 <= pickup_long <= 180 and -90 <= dropoff_lat <= 90 and -180 <= dropoff_long <= 180:
        pickup = (pickup_lat, pickup_long)
        dropoff = (dropoff_lat, dropoff_long)
        return great_circle(pickup, dropoff).kilometers
    else:
        return None

In [None]:
udf_calculate_distance = udf(calculate_distance, DoubleType())

In [None]:
# Add a new column for the trip distance
df = df.withColumn("trip_distance", udf_calculate_distance(
    col("pickup_latitude"), col("pickup_longitude"),
    col("dropoff_latitude"), col("dropoff_longitude")))

In [None]:
df = df.na.drop()

In [None]:
df = df.withColumn("pickup_hour", hour(col("pickup_datetime")))
df = df.withColumn("pickup_day_of_week", dayofweek(col("pickup_datetime")))

In [None]:
df.show(5)

+-------+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+------------------+-----------+------------------+
|    key|fare_amount|    pickup_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|     trip_distance|pickup_hour|pickup_day_of_week|
+-------+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+------------------+-----------+------------------+
|26:21.0|        4.5|2009-06-15 17:26:21|      -73.844311|      40.721319|        -73.84161|       40.712278|              1|  1.03076539115918|         17|                 2|
|52:16.0|       16.9|2010-01-05 16:52:16|      -74.016048|      40.711303|       -73.979268|       40.782004|              1|  8.45014553289593|         16|                 3|
|35:00.0|        5.7|2011-08-18 00:35:00|      -73.982738|       40.76127|       -73.991242|       40.750562|           

Vector assembler

In [None]:
assembler = VectorAssembler(inputCols=["pickup_longitude", "pickup_latitude",
                                       "dropoff_longitude", "dropoff_latitude",
                                       "passenger_count", "trip_distance",
                                       "pickup_hour", "pickup_day_of_week"],
                            outputCol="assembledFeatures")
df_assembled = assembler.transform(df)

Scaling on assembled features


In [None]:
scaler = StandardScaler(inputCol="assembledFeatures", outputCol="scaledFeatures", withStd=True, withMean=False)
sc_md = scaler.fit(df_assembled)
df_scaled = sc_md.transform(df_assembled)

# **Model Evaluation time**

In [None]:
train_data, test_data = df_scaled.randomSplit([0.8, 0.2], seed=42)

# **Random Forest**

In [None]:
rf = RandomForestRegressor(featuresCol="scaledFeatures", labelCol="fare_amount")
rf_model = rf.fit(train_data)

In [None]:
predictions = rf_model.transform(test_data)

In [None]:
# Evaluate the model
evaluator = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="rmse")
mse_evaluator = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="mse")
r2_evaluator = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="r2")

In [None]:
rmse = evaluator.evaluate(predictions)
mse = mse_evaluator.evaluate(predictions)
r2_score = r2_evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data =", rmse)
print("Mean Squared Error (MSE) on test data =", mse)
print("R-squared on test data =", r2_score)


Root Mean Squared Error (RMSE) on test data = 2.4518007194210765
Mean Squared Error (MSE) on test data = 6.011326767753709
R-squared on test data = 0.6502862763351845


Linear Regression

In [None]:
# Ilinear
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="fare_amount")
lr_model = lr.fit(train_data)

In [None]:
predictions_lin = lr_model.transform(test_data)

In [None]:
# Evaluate the model
evaluator_rmse = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="rmse")
evaluator_mse = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="mse")
evaluator_r2 = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="r2")

In [None]:
rmse = evaluator_rmse.evaluate(predictions_lin)
mse = evaluator_mse.evaluate(predictions_lin)
r2_score = evaluator_r2.evaluate(predictions_lin)

In [None]:
print("Root Mean Squared Error (RMSE) on test data =", rmse)
print("Mean Squared Error (MSE) on test data =", mse)
print("R-squared on test data =", r2_score)

Root Mean Squared Error (RMSE) on test data = 9.728702690435124
Mean Squared Error (MSE) on test data = 94.64765603887963
R-squared on test data = 0.10607634764581153


In [None]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

gbt = GBTRegressor(featuresCol="scaledFeatures", labelCol="fare_amount", maxIter=10)
gbt_model = gbt.fit(train_data)

In [None]:
gbt_predictions = gbt_model.transform(test_data)

In [None]:
# Evaluate the model
gbt_evaluator_rmse = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="rmse")
gbt_evaluator_r2 = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="r2")
gbt_evaluator_mse = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="mse")

In [None]:
gbt_rmse = gbt_evaluator_rmse.evaluate(gbt_predictions)
r2_score = gbt_evaluator_r2.evaluate(gbt_predictions)
mse = gbt_evaluator_mse.evaluate(gbt_predictions)

In [None]:
print("rmse = %g" % gbt_rmse)
print("R-squared on test data = %g" % r2_score)
print("Mean Squared Error (MSE) on test data = %g" % mse)

rmse = 2.31859
R-squared on test data = 0.687256
Mean Squared Error (MSE) on test data = 5.37585
