In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col, unix_timestamp, dayofweek, hour, month, udf
from pyspark.sql.types import FloatType
import math

In [2]:
# Initialize Spark Session
spark = SparkSession.builder.appName("RF and GBT Model").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/15 11:25:16 INFO SparkEnv: Registering MapOutputTracker
24/05/15 11:25:16 INFO SparkEnv: Registering BlockManagerMaster
24/05/15 11:25:16 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/05/15 11:25:16 INFO SparkEnv: Registering OutputCommitCoordinator
----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 41606)
Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/miniconda3/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/miniconda3/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/o

In [3]:
# Load data
data = spark.read.csv('gs://228bucket/processed_train_2.0.csv/processed_dataset_2.0.csv', header=True, inferSchema=True)

                                                                                

In [4]:
data = data.sample(withReplacement=False, fraction=0.1, seed=42)

In [5]:
data.count()

                                                                                

4283391

In [6]:
data.printSchema()

root
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)



In [7]:
data.describe().show()

24/05/15 11:28:43 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+-----------------+------------------+------------------+------------------+------------------+------------------+
|summary|      fare_amount|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|   passenger_count|
+-------+-----------------+------------------+------------------+------------------+------------------+------------------+
|  count|          4283391|           4283391|           4283391|           4283391|           4283391|           4283391|
|   mean|11.45865025863937|-72.37123187140257|39.841392686071345|-72.36926431473576|39.839520175316125|1.6920607994927384|
| stddev|9.901330284884985|10.910671410207991|  6.44615301071534|10.914090016025693| 6.452158642910057|1.3258650378198835|
|    min|              0.0|       -128.107047|        -74.031213|       -121.925747|        -74.177085|                 1|
|    max|            900.0|        148.498615|          81.48045|        148.498615|         83.283332|               208|
+-------+-------

                                                                                

In [8]:
# Convert datetime and extract features
data = data.withColumn("pickup_datetime", unix_timestamp(col("pickup_datetime"), "yyyy-MM-dd'T'HH:mm:ss.SSSX").cast("timestamp"))
data = data.withColumn("pickup_hour", hour(col("pickup_datetime")))
data = data.withColumn("day_of_week", dayofweek(col("pickup_datetime")))
data = data.withColumn("month", month(col("pickup_datetime")))

def calculate_distance(lat1, lon1, lat2, lon2):
    if lat1 == lat2 and lon1 == lon2:
        return 0.0
    else:
        R = 6371.0  # Radius of the Earth in kilometers
        lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])  # Convert degrees to radians
        dlat = lat2 - lat1
        dlon = lon2 - lon1
        a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2
        c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
        distance = R * c
        return float(distance)

udf_calculate_distance = udf(calculate_distance, FloatType())
data = data.withColumn("distance", udf_calculate_distance(col("pickup_latitude"), col("pickup_longitude"), col("dropoff_latitude"), col("dropoff_longitude")))

In [9]:
data.show(5)

[Stage 8:>                                                          (0 + 1) / 1]

+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+-----------+-----------+-----+---------+
|fare_amount|    pickup_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|passenger_count|pickup_hour|day_of_week|month| distance|
+-----------+-------------------+----------------+---------------+-----------------+----------------+---------------+-----------+-----------+-----+---------+
|        9.0|2012-12-03 13:10:00|      -74.006462|      40.726713|       -73.993078|       40.731628|              1|         13|          2|   12|1.2532315|
|       10.5|2010-09-07 13:18:00|      -73.985382|      40.747858|       -73.978377|        40.76207|              1|         13|          3|    9|1.6868613|
|        4.9|2010-12-06 12:29:00|      -74.000632|      40.747473|       -73.986672|       40.740577|              1|         12|          2|   12|1.4039582|
|        7.0|2014-05-01 09:12:00|      -73.966203|  

                                                                                

In [10]:
data.printSchema()

root
 |-- fare_amount: double (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- distance: float (nullable = true)



In [11]:
feature_columns = ["passenger_count", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "distance", "pickup_hour", "day_of_week", "month"]
# Assemble the feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)

# Split the data into training and testing sets (70% training, 30% testing)
(training_data, testing_data) = data.randomSplit([0.7, 0.3], seed=42)

In [22]:
# Define the MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Fit the MinMaxScaler to the training data
scaler_model = scaler.fit(training_data)

# Transform the training data
training_data_scaled = scaler_model.transform(training_data)

# Transform the testing data
testing_data_scaled = scaler_model.transform(testing_data)

                                                                                

## Random Forest Model

In [23]:
# Define the Random Forest model
rf = RandomForestRegressor(featuresCol="scaledFeatures", labelCol="fare_amount", numTrees=100, seed=42)

# Construct the pipeline
pipeline = Pipeline(stages=[assembler, scaler, rf])

# Train the Random Forest model
rf_model = rf.fit(training_data_scaled)

# Make predictions on the testing data
predictions = rf_model.transform(testing_data_scaled)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = {:.2f}".format(rmse))

# Evaluate the model - R2
evaluator_r2 = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print("R-squared (R2) on test data = {:.2f}".format(r2))

                                                                                

Root Mean Squared Error (RMSE) on test data = 5.58




R-squared (R2) on test data = 0.68


                                                                                

In [25]:
# Calculate the range of the label variable
max_fare = predictions.agg({"fare_amount": "max"}).collect()[0][0]
min_fare = predictions.agg({"fare_amount": "min"}).collect()[0][0]
fare_range = max_fare - min_fare

# Normalize RMSE
normalized_rmse = rmse / fare_range

print("Root Mean Squared Error (RMSE) on test data = {:.2f}".format(rmse))
print("Normalized RMSE on test data = {:.4f}".format(normalized_rmse))



Root Mean Squared Error (RMSE) on test data = 5.58
Normalized RMSE on test data = 0.0117


                                                                                

In [26]:
from pyspark.ml.regression import RandomForestRegressor
#save model
model_path = "gs://228bucket/Models/rf_model"

rf_model.write().overwrite().save(model_path)

                                                                                

# GBT Model

In [27]:
# Define the GBT model
gbt = GBTRegressor(featuresCol="scaledFeatures", labelCol="fare_amount", maxIter=100, seed=42)

# Construct the pipeline
pipeline = Pipeline(stages=[assembler, scaler, gbt])

# Train the GBT model
gbt_model = gbt.fit(training_data_scaled)

# Make predictions on the testing data
gbt_predictions = gbt_model.transform(testing_data_scaled)

# Evaluate the model
gbt_evaluator = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="rmse")
gbt_rmse = evaluator.evaluate(gbt_predictions)

# Evaluate the model - R2
gbt_evaluator_r2 = RegressionEvaluator(labelCol="fare_amount", predictionCol="prediction", metricName="r2")
r2 = gbt_evaluator_r2.evaluate(gbt_predictions)
print("R-squared (R2) on test data = {:.2f}".format(r2))

# Calculate the range of the label variable
max_fare_gbt = gbt_predictions.agg({"fare_amount": "max"}).collect()[0][0]
min_fare_gbt = gbt_predictions.agg({"fare_amount": "min"}).collect()[0][0]
fare_range_gbt = max_fare_gbt - min_fare_gbt

# Normalize RMSE for GBT
normalized_rmse_gbt = gbt_rmse / fare_range_gbt

print("Root Mean Squared Error (RMSE) on test data = {:.2f}".format(gbt_rmse))
print("Normalized RMSE on test data = {:.4f}".format(normalized_rmse_gbt))

24/05/15 12:03:25 WARN YarnAllocator: Container from a bad node: container_1715754161652_0010_01_000016 on host: cluster-789c-w-1.us-central1-f.c.vertical-kayak-423108-t5.internal. Exit status: 143. Diagnostics: [2024-05-15 12:03:25.089]Container killed on request. Exit code is 143
[2024-05-15 12:03:25.089]Container exited with a non-zero exit code 143. 
[2024-05-15 12:03:25.090]Killed by external signal
.
24/05/15 12:03:25 ERROR YarnScheduler: Lost executor 15 on cluster-789c-w-1.us-central1-f.c.vertical-kayak-423108-t5.internal: Container from a bad node: container_1715754161652_0010_01_000016 on host: cluster-789c-w-1.us-central1-f.c.vertical-kayak-423108-t5.internal. Exit status: 143. Diagnostics: [2024-05-15 12:03:25.089]Container killed on request. Exit code is 143
[2024-05-15 12:03:25.089]Container exited with a non-zero exit code 143. 
[2024-05-15 12:03:25.090]Killed by external signal
.
24/05/15 12:03:25 WARN TaskSetManager: Lost task 0.0 in stage 1047.0 (TID 24576) (cluster-7

R-squared (R2) on test data = 0.76




Root Mean Squared Error (RMSE) on test data = 4.88
Normalized RMSE on test data = 0.0103


                                                                                

In [28]:
from pyspark.ml.regression import GBTRegressor

# Define the path to save the model
model_path = "gs://228bucket/Models/gbt_model"

# Save the model
gbt_model.write().overwrite().save(model_path)

                                                                                