# Business Problem 1: Hourly Fare Amount Prediction


In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
from pyspark.sql.functions import col, lower, trim, date_format, dayofweek, month, hour, minute, datediff, count, avg, max, min, to_date, to_timestamp, sum, when
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.regression import RandomForestRegressor

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

In [2]:
spark = SparkSession.builder.appName('FullPredict')\
        .config("spark.executor.cores",'16')\
        .config("spark.executor.memory", '60g')\
        .getOrCreate()

## Data Processing

In [3]:
# load data
df2021 = spark.read.option("header", "true").option("inferSchema", "true").csv("gs://project_hyrn/2021_Yellow_Taxi_Trip_Data_full.csv")
df2020 =spark.read.option("header", "true").option("inferSchema", "true").csv("gs://project_hynr2/2020_Yellow_Taxi_Trip_Data_full.csv")
df2019 =spark.read.option("header", "true").option("inferSchema", "true").csv("gs://project_hynr2/2019_Yellow_Taxi_Trip_Data_full.csv")

                                                                                

In [4]:
df2021 = df2021.withColumn("Datayear", lit(2021))
df2020 = df2020.withColumn("Datayear", lit(2020))
df2019 = df2019.withColumn("Datayear", lit(2019))

# merge data
full_df = df2021.union(df2020).union(df2019)
full_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Datayear: integer (nullable = false)



In [5]:
# partition

#display number of records by partition
def displaypartitions(df):
    #number of records by partition
    num = df.rdd.getNumPartitions()
    print("Partitions:", num)
    df.withColumn("partitionId", F.spark_partition_id()).groupBy("partitionId").count()\
        .orderBy(F.asc("count"))\
        .show(num)

In [6]:
#number of partitions
full_df.rdd.getNumPartitions()

103

In [7]:
#re-partition the data to evenly distributed across 103 partitions
full_df = full_df.repartition(64)
displaypartitions(full_df)



Partitions: 64




+-----------+-------+
|partitionId|  count|
+-----------+-------+
|         46|2186696|
|         45|2186698|
|         44|2186700|
|         43|2186702|
|         42|2186705|
|         41|2186706|
|         40|2186708|
|         39|2186710|
|         38|2186711|
|         36|2186713|
|         37|2186713|
|         35|2186713|
|         34|2186713|
|         33|2186714|
|         32|2186715|
|         31|2186716|
|         30|2186717|
|         29|2186718|
|         28|2186721|
|         27|2186725|
|         26|2186726|
|         25|2186727|
|         24|2186729|
|         23|2186730|
|         22|2186731|
|         20|2186735|
|         21|2186735|
|         19|2186736|
|         18|2186737|
|         17|2186738|
|         16|2186738|
|         15|2186740|
|         14|2186742|
|         13|2186743|
|         11|2186745|
|         12|2186745|
|         10|2186747|
|          8|2186748|
|          9|2186748|
|          7|2186749|
|          6|2186751|
|          5|2186753|
|         

                                                                                

In [8]:
# change to date type
full_df = full_df.withColumn("parsed_pickup_datetime", to_timestamp("tpep_pickup_datetime", "MM/dd/yyyy hh:mm:ss a"))
full_df = full_df.withColumn("parsed_dropoff_datetime", to_timestamp("tpep_dropoff_datetime", "MM/dd/yyyy hh:mm:ss a"))

# Extract the hour, month, day from pickup time
full_df = full_df.withColumn("pickup_hour", hour("parsed_pickup_datetime"))
full_df = full_df.withColumn("pickup_month", month("parsed_pickup_datetime"))
full_df = full_df.withColumn("pickup_dayofweek", dayofweek("parsed_pickup_datetime"))

In [9]:
# Calculate trip time
full_df = full_df.withColumn("Dur", datediff("parsed_dropoff_datetime", "parsed_pickup_datetime") * 24 * 60 +hour("parsed_dropoff_datetime")* 60-hour("parsed_pickup_datetime")*60
+ minute("parsed_dropoff_datetime") - minute("parsed_pickup_datetime"))

In [10]:
# Calculate speed
full_df = full_df.withColumn("speed", col("Dur")/col("trip_distance"))

In [11]:
# Create a dummy variable of weekday
full_df = full_df.withColumn("pickup_weekday", when((full_df["pickup_dayofweek"] >= 2) & (full_df["pickup_dayofweek"] <= 6), 1).otherwise(0))

In [12]:
# Create a season variable
full_df = full_df.withColumn(
    "season",
    when((full_df.pickup_month <= 2) | (full_df.pickup_month == 12), 1)
    .when((full_df.pickup_month >= 3) & (full_df.pickup_month <= 5), 2)
    .when((full_df.pickup_month >= 6) & (full_df.pickup_month <= 8), 3)
    .otherwise(4),
)

Helena: According to the dustribution of trip distances (very right skewed) and the fact that it takes about 30 miles to drive across the whole New York City, we decided to use 30 as the number to split the trips into short or long distance trips.
Will need to adjust threshold when using FULL Datase

In [13]:
# Create a dummy variable of long_trip
full_df = full_df.withColumn("long_trip", when(full_df["trip_distance"] >= 30, 1).otherwise(0))

In [14]:
full_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Datayear: integer (nullable = false)
 |-- parsed_pickup_datetime: timestamp (nullable = true)
 |-- parsed_dropoff_datetime: timestamp (nullable = true)
 |-- pickup_hour:

In [15]:
full_df.show(10)

23/05/18 22:07:19 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+--------+----------------------+-----------------------+-----------+------------+----------------+---+-----------------+--------------+------+---------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Datayear|parsed_pickup_datetime|parsed_dropoff_datetime|pickup_hour|pickup_month|pickup_dayofweek|Dur|            speed|pickup_weekday|season|long_trip|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+

                                                                                

In [15]:
# Filter for fare_amount >=3.0, starting fare
full_df = full_df.filter(condition = col("fare_amount") >= 3.0)


In [16]:
# Filter out extreme points
full_df = full_df.filter((col("trip_distance") >= 0) & (col("trip_distance") < 500) & (col("fare_amount") < 400))

In [17]:
# Check date variables: Looks good
full_df.describe('Datayear', 'pickup_month','pickup_dayofweek').show()



+-------+------------------+-----------------+------------------+
|summary|          Datayear|     pickup_month|  pickup_dayofweek|
+-------+------------------+-----------------+------------------+
|  count|         138867891|        138867891|         138867891|
|   mean| 2019.616798018485|6.255678211459264| 4.129873694128472|
| stddev|0.8229403454606475|3.620464463384807|1.9425838227962384|
|    min|              2019|                1|                 1|
|    max|              2021|               12|                 7|
+-------+------------------+-----------------+------------------+



                                                                                

In [22]:
# Check for missing values in a fare column: No missing values
full_df.select([count(when(col("fare_amount").isNull(), "fare_amount")).alias("missing_fare_amount")]).show()



+-------------------+
|missing_fare_amount|
+-------------------+
|                  0|
+-------------------+



                                                                                

In [18]:
# log fare amount, tip amount, trip distance
from pyspark.sql.functions import log1p

# Add a new column with log-transformed fare_amount values
full_df = full_df.withColumn("lfare_amount", log1p(full_df["fare_amount"]))
full_df = full_df.withColumn("ltip_amount", log1p(full_df["tip_amount"]))
full_df = full_df.withColumn("ltrip_distance", log1p(full_df["trip_distance"]))

# Drop the original fare_amount column
# full_df = full_df.drop("fare_amount","tip_amount","trip_distance")

In [19]:
# Drop useless variables
full_df = full_df.drop('total_amount', 'tpep_pickup_datetime','tpep_dropoff_datetime', 'parsed_pickup_datetime', 'parsed_dropoff_datetime')


In [20]:
full_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Datayear: integer (nullable = false)
 |-- pickup_hour: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)
 |-- pickup_dayofweek: integer (nullable = true)
 |-- Dur: integer (nullable = true)
 |-- speed: double (nullable = true)
 |-- pickup_weekday: integer (nullable = false)
 |-- season: integer (

In [21]:
# Delete missing values and choose only those virtual values according to the official variable description
filtered_df = full_df.filter(col('VendorID').isin([1,2]))
filtered_df = filtered_df.filter(col('store_and_fwd_flag').isin(['N', 'Y']))
filtered_df = filtered_df.filter(col('RatecodeID').isin([1,2,3,4,5,6]))
filtered_df = filtered_df.filter(col('payment_type').isin([1,2,3,4,5]))

In [22]:
# Convert store_and_fwd_flag to a dummy variable
filtered_df = filtered_df.withColumn("store_and_fwd_flag", when(filtered_df.store_and_fwd_flag == "Y", 1).otherwise(0))

In [23]:
# only save the log values
filtered_df = filtered_df.drop('trip_distance', 'fare_amount', 'tip_amount')

In [None]:
# Check missing values before one-hot encoding

# Specify the categorical variables
categorical_vars = ['VendorID', 'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID', 'payment_type', 'season','pickup_dayofweek','Datayear']

# Iterate over each categorical variable
for var in categorical_vars:
    # Count the occurrences of each value
    value_counts = filtered_df.groupBy(var).count().orderBy(col('count').desc())
    
    # Identify missing values
    missing_count = filtered_df.filter(col(var).isNull() | (col(var) == '')).count()
    
    # Display the results
    print(f"Variable: {var}")
    value_counts.show()
    print(f"Missing values: {missing_count}")
    print()


In [33]:
# drop_df = filtered_df.dropna()

#for column in drop_df.columns:
#    drop_df = drop_df.filter(col(column).isNotNull())

In [24]:
filtered_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: integer (nullable = false)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Datayear: integer (nullable = false)
 |-- pickup_hour: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)
 |-- pickup_dayofweek: integer (nullable = true)
 |-- Dur: integer (nullable = true)
 |-- speed: double (nullable = true)
 |-- pickup_weekday: integer (nullable = false)
 |-- season: integer (nullable = false)
 |-- long_trip: integer (nullable = false)
 |-- lfare_amount: double (nullable = true)
 |-- ltip_amount: doubl

## Model

In [28]:
# Split the data into training and testing sets
train_data, test_data = filtered_df.randomSplit([0.7, 0.3], seed=27)

In [29]:
# One-hot encoding
categorical_columns = ['VendorID', 'RatecodeID', 'PULocationID', 'DOLocationID', 'payment_type',
                       'season','pickup_dayofweek','Datayear'] # categorical columns

indexers = [
  StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c), handleInvalid="keep")
  for c in categorical_columns
]

encoders = [
  OneHotEncoder(inputCol=indexer.getOutputCol(),
                outputCol="{0}_encoded".format(indexer.getOutputCol()))
  for indexer in indexers
]

assemblerInputs = [encoder.getOutputCol() for encoder in encoders] + ['passenger_count','store_and_fwd_flag',
                                                                      'extra', 'mta_tax', 'tolls_amount', 'improvement_surcharge', 
                                                                      'congestion_surcharge', 'pickup_hour', 'pickup_month',
                                                                      'Dur', 'pickup_weekday', 'long_trip', 
                                                                      'ltip_amount', 'ltrip_distance', 'speed']

vectorAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features", handleInvalid="keep")


### Lasso

In [30]:
# Define the Lasso model (Linear regression with L1 regularization is equivalent to Lasso)
lasso = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=1, featuresCol="features", labelCol="lfare_amount")

# Define a pipeline model
pipeline_lasso = Pipeline(stages=indexers + encoders + [vectorAssembler, lasso])

# Define a grid of hyperparameters to test:
paramGrid_lasso = ParamGridBuilder() \
    .addGrid(lasso.regParam, [0.1, 0.3, 0.5]) \
    .addGrid(lasso.elasticNetParam, [0.8, 1.0]) \
    .build()

# Define cross-validation:
crossval_lasso = CrossValidator(estimator=pipeline_lasso,
                          estimatorParamMaps=paramGrid_lasso,
                          evaluator=RegressionEvaluator(),
                          numFolds=5)

# Run cross-validation, and choose the best set of parameters:
cvModel_lasso = crossval_lasso.fit(train_data)


23/05/19 00:52:07 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/05/19 00:52:07 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/05/19 00:52:08 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/19 00:52:08 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/19 00:52:08 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/19 00:52:08 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/19 00:52:08 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/19 00:52:08 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/19 00:52:08 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/19 00

IllegalArgumentException: label does not exist. Available: VendorID, passenger_count, RatecodeID, store_and_fwd_flag, PULocationID, DOLocationID, payment_type, extra, mta_tax, tolls_amount, improvement_surcharge, congestion_surcharge, Datayear, pickup_hour, pickup_month, pickup_dayofweek, Dur, speed, pickup_weekday, season, long_trip, lfare_amount, ltip_amount, ltrip_distance, CrossValidator_e29d1c28e796_rand, VendorID_indexed, RatecodeID_indexed, PULocationID_indexed, DOLocationID_indexed, payment_type_indexed, season_indexed, pickup_dayofweek_indexed, Datayear_indexed, VendorID_indexed_encoded, RatecodeID_indexed_encoded, PULocationID_indexed_encoded, DOLocationID_indexed_encoded, payment_type_indexed_encoded, season_indexed_encoded, pickup_dayofweek_indexed_encoded, Datayear_indexed_encoded, features, prediction

23/05/19 00:55:20 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/19 00:55:20 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/19 00:55:20 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/19 00:55:20 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/19 00:55:20 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/19 00:55:20 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/19 00:55:20 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/19 00:55:20 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/19 00:55:20 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
23/05/19 00:58:18 ERROR breeze.optimize.OWLQN: Failure! Resettin

In [None]:
# Make predictions on the test data
predictions_lasso = cvModel_lasso.transform(test_data)


In [None]:
# Evaluate the Lasso model
evaluator = RegressionEvaluator()
rmse = evaluator.evaluate(predictions_lasso)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [None]:
# Visualization

# Extract the cross-validation results
cvModel_lasso = cv_model.avgMetrics

# Extract the regularization parameters from the parameter grid
reg_params = [param[lasso.regParam] for param in param_grid]

# Plot the cross-validation results
plt.figure(figsize=(8, 6))
plt.plot(reg_params, cv_results, marker='o')
plt.xscale('log')
plt.xlabel('Regularization Parameter')
plt.ylabel('RMSE')
plt.title('Cross-Validation Results for Lasso')
plt.grid(True)
plt.show()

### Ransom Forest

In [None]:
# Extract the features selected by Lasso
coefficients = cvModel_lasso.bestModel.stages[-1].coefficients
selected_features = [assemblerInputs[i] for i in range(len(coefficients)) if coefficients[i] != 0]

# Build a new VectorAssembler with the selected features
vectorAssembler_rf = VectorAssembler(inputCols=selected_features, outputCol="features")

In [65]:
# Create a VectorAssembler to combine the features into a single feature vector
#assembler = VectorAssembler(inputCols=[ ], outputCol='features') # Use feature selected from lasso

# Create a Random Forest regressor
rf = RandomForestRegressor(featuresCol='features', labelCol='lfare_amount')

# Define the parameter grid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50,500,1000]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .addGrid(rf.minInfoGain, [0.0, 0.1, 0.2]) \
    .build()

# Create the evaluator
evaluator = RegressionEvaluator(labelCol='lfare_amount', predictionCol='prediction', metricName='rmse')

# Create a CrossValidator
cv = CrossValidator(estimator=rf,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=5)  # Specify the number of folds for cross-validation

# Create a pipeline
pipeline = Pipeline(stages=[assembler, cv])

# Fit the pipeline to the data
model = pipeline.fit(train)

# Make predictions
predictions = model.transform(test)

# Evaluate the model using the evaluator
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE):", rmse)

IllegalArgumentException: passenger_count does not exist. Available: features, lfare_amount