In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

# Initialize Spark session
spark = SparkSession.builder.appName("PricePrediction").getOrCreate()

# Load the pre-processed data from the trusted folder
trusted_data_path = "gs://expedia-flight-prices/Trusted/itineraries_processed.parquet"
df = spark.read.parquet(trusted_data_path)

# Sample 50% of the data (fraction = 0.5)
df = df.sample(fraction=0.3, seed=123)

# Select features and target column
numeric_columns = ["travelDurationMinutes", "elapsedDays", "totalTravelDistance"]
categorical_columns = ["startingAirport_index", "destinationAirport_index"]  # Already indexed columns
target_column = "totalFare"

# If there is an existing "features" column, rename it to avoid conflict
df = df.withColumnRenamed("features", "old_features") if "features" in df.columns else df

# Assemble feature columns into a single feature vector
assembler = VectorAssembler(
    inputCols=categorical_columns + numeric_columns,  # Excluding "scaled_features" for feature importance
    outputCol="features"  # This can be renamed to something else if "features" already exists
)

df = assembler.transform(df)

# Define the Random Forest Regressor model
rf = RandomForestRegressor(featuresCol="features", labelCol=target_column)

# Select the feature columns and target column
features = categorical_columns + numeric_columns  # excluding "scaled_features"

# Split the data into training (80%) and testing (20%) sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=123)

# Set up cross-validation with hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

# Regression evaluators for totalFare
rmse_evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="rmse")
mae_evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="mae")
r2_evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="r2")

cv = CrossValidator(estimator=rf,
                    estimatorParamMaps=paramGrid,
                    evaluator=rmse_evaluator,  # Evaluator for RMSE
                    numFolds=3)

# Train the model using cross-validation
cvModel = cv.fit(train_data)

# Make predictions on the test set
predictions = cvModel.transform(test_data)

# Evaluate the model using RMSE, MAE, and R2
rmse = rmse_evaluator.evaluate(predictions)
mae = mae_evaluator.evaluate(predictions)
r2 = r2_evaluator.evaluate(predictions)

# Print the evaluation metrics
print(f"Root Mean Squared Error (RMSE) for Total Fare = {rmse}")
print(f"Mean Absolute Error (MAE) for Total Fare = {mae}")
print(f"R-Squared (R2) for Total Fare = {r2}")

# Get feature importance from the trained model
rf_model = cvModel.bestModel  # Best model after cross-validation

# Extract feature importances
feature_importances = rf_model.featureImportances

# Map feature importance to the feature names
feature_names = categorical_columns + numeric_columns
importance_map = dict(zip(feature_names, feature_importances))

# Print the feature importances
print("Feature Importances:")
for feature, importance in importance_map.items():
    print(f"{feature}: {importance}")

# Save the trained model to the /models folder
model_output_path = "gs://expedia-flight-prices/Models/random_forest_model"
cvModel.bestModel.write().overwrite().save(model_output_path)
print(f"Trained model saved to {model_output_path}")


24/12/02 23:01:27 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/12/02 23:06:17 WARN DAGScheduler: Broadcasting large task binary with size 1533.3 KiB
24/12/02 23:08:35 WARN DAGScheduler: Broadcasting large task binary with size 1533.3 KiB
24/12/02 23:08:55 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/12/02 23:09:18 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
24/12/02 23:09:43 WARN DAGScheduler: Broadcasting large task binary with size 6.1 MiB
24/12/02 23:10:11 WARN DAGScheduler: Broadcasting large task binary with size 1317.7 KiB
24/12/02 23:10:12 WARN DAGScheduler: Broadcasting large task binary with size 9.0 MiB
24/12/02 23:10:42 WARN DAGScheduler: Broadcasting large task binary with size 1795.1 KiB
24/12/02 23:10:45 WARN DAGScheduler: Broadcasting large task binary with size 12.7 MiB
24/12/02 23:11:17 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/12/0

24/12/03 00:31:18 WARN DAGScheduler: Broadcasting large task binary with size 37.4 MiB
24/12/03 00:32:57 WARN DAGScheduler: Broadcasting large task binary with size 6.6 MiB
24/12/03 00:37:28 WARN DAGScheduler: Broadcasting large task binary with size 1540.6 KiB
24/12/03 00:39:28 WARN DAGScheduler: Broadcasting large task binary with size 1540.7 KiB
24/12/03 00:39:49 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/12/03 00:40:10 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
24/12/03 00:40:33 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
24/12/03 00:41:00 WARN DAGScheduler: Broadcasting large task binary with size 1340.4 KiB
24/12/03 00:41:01 WARN DAGScheduler: Broadcasting large task binary with size 9.1 MiB
24/12/03 00:41:32 WARN DAGScheduler: Broadcasting large task binary with size 1849.5 KiB
24/12/03 00:41:34 WARN DAGScheduler: Broadcasting large task binary with size 13.0 MiB
24/12/03 00:42:03 WARN DAGScheduler: Bro

Root Mean Squared Error (RMSE) for Total Fare = 141.2332206468747
Mean Absolute Error (MAE) for Total Fare = 95.96636981753346
R-Squared (R2) for Total Fare = 0.4798803287902762
Feature Importances:
startingAirport_index: 0.18798516127023326
destinationAirport_index: 0.18063121218590714
travelDurationMinutes: 0.22117496704529838
elapsedDays: 0.010817843625748598
totalTravelDistance: 0.39939081587281255


24/12/03 01:38:42 WARN TaskSetManager: Stage 753 contains a task of very large size (2778 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Trained model saved to gs://expedia-flight-prices/Models/random_forest_model
