# Big Data Analytics Project - Airbnb Pricing Prediction <br>

### Group 63 <br>
André Lourenço - 20240743 <br>
Fábio Dos Santos - 20240678 <br>
Rafael Borges - 20240497 <br>
Rui Reis - 20240854 <br>
Victor Silva - 20240663 

## 1. Import Libraries

In [0]:
from pyspark.sql.functions import col, year, to_date, avg, regexp_replace, current_date, datediff, countDistinct, sum, round, when, lit
from pyspark.sql import functions as F
import matplotlib.pyplot as plt
import seaborn as sns
from functools import reduce
import numpy as np
from sklearn.cluster import KMeans
import matplotlib.patches as mpatches
import pandas as pd 


from pyspark.sql import SparkSession
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import (
    VectorAssembler,
    StringIndexer,
    OneHotEncoder,
    StandardScaler,
    Imputer,
    RobustScaler
)
from pyspark.ml.regression import RandomForestRegressor, LinearRegression, GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.param.shared import Param
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import when

import csv
import os
from datetime import datetime
from pyspark.ml.param import Param # To get param names correctly

# --- Configuration for Logging ---
LOG_FILE_PATH = "model_run_log_generic.csv"
LOG_HEADER = [
    "timestamp",
    "run_type",      # e.g., "cv_trial", "final_test_evaluation"
    "model_name",    # Name of the estimator (e.g., RandomForestRegressor, LinearRegression)
    "params_json",   # All hyperparameters for the trial as a JSON string
    "cv_metric_name",
    "cv_metric_value",
    "test_rmse",
    "test_r2",
    "notes"
]

# Import json for serializing params
import json



In [0]:
def log_run_data(log_path, header, data_dict):
    """Appends a dictionary of data as a new row to a CSV file.
    Creates the file with a header if it doesn't exist.
    """
    file_exists = os.path.isfile(log_path)
    try:
        with open(log_path, 'a', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=header)
            if not file_exists:
                writer.writeheader()
            # Ensure all header fields are present in data_dict, with None if missing
            row_to_write = {h: data_dict.get(h) for h in header}
            writer.writerow(row_to_write)
    except IOError:
        print(f"IOError: Could not write to log file {log_path}")
    except Exception as e:
        print(f"Error during logging: {e}")

In [0]:
%run "/Project/utils" 

In [0]:
%run "/Project/transformers" 

## 2. Data Load

In [0]:
from pyspark.sql import functions as F

df = (
    spark.read
         .option("header", "true")      # keep column names
         .option("inferSchema", "true") # let Spark inspect values & choose types
         .option("samplingRatio", "1")  # (optional) scan 100 % of the rows
         .csv("dbfs:/FileStore/tables/Listings_cleaned")
)

display(df.limit(5))


host_is_superhost,host_total_listings_count,host_has_profile_pic,host_identity_verified,neighbourhood,property_type,room_type,accommodates,bedrooms,price,minimum_nights,review_scores_rating,instant_bookable,host_days_active,host_years_active,amenities_length,living_entertainment,kitchen_dining,bedroom,bathroom,baby_family,laundry_cleaning,safety_security,outdoor_garden,heating_cooling,travel_access,wellness_leisure,workspace_tech,guest_services,misc_essentials
0,1,1,0,Buttes-Montmartre,Entire apartment,Entire place,2,1,53,2,100,0,3354,9,5,0,1,0,0,0,1,0,0,1,1,0,1,0,0
0,1,1,1,Buttes-Montmartre,Entire apartment,Entire place,2,1,120,2,100,0,2627,7,8,0,1,0,1,0,2,0,0,1,1,0,1,0,1
0,1,1,0,Elysee,Entire apartment,Entire place,2,1,89,2,100,0,2383,6,6,1,1,0,0,0,1,0,0,1,1,0,1,0,0
0,1,1,1,Vaugirard,Entire apartment,Entire place,2,1,58,2,100,0,2609,7,5,1,1,0,0,0,0,0,0,1,1,0,1,0,0
0,1,1,0,Passy,Entire apartment,Entire place,2,1,60,2,100,0,2247,6,12,1,1,0,1,0,2,0,0,1,1,0,1,0,1


In [0]:
df_shape(df)

63750 rows × 30 columns


# PIPELINE 

In [0]:
summarize_nulls_and_dtype(df, only_missing=True)

column                         |    # nulls | dtype
---------------------------------------------------
bedrooms                       |     13,293 | int
review_scores_rating           |     16,199 | int


In [0]:
categorical_cols = [
    #'property_type', 
    'room_type', 
    'neighbourhood',
]

numeric_cols = [
    'host_total_listings_count',
    'accommodates', 
    'bedrooms', 
    'minimum_nights', 
    'review_scores_rating', 
    'host_days_active',
    'host_years_active',
    'amenities_length', 
    "living_entertainment", 
    "kitchen_dining", 
    "bedroom", 
    "bathroom", 
    "baby_family", 
    "laundry_cleaning", 
    "safety_security", 
    "outdoor_garden", 
    "heating_cooling", 
    "travel_access", 
    "wellness_leisure", 
    "workspace_tech", 
    "guest_services", 
    "misc_essentials",
]

binary_cols = [
    "host_is_superhost",
    "host_has_profile_pic",
    "host_identity_verified",
    "instant_bookable"
]

label_col = "price"

# 4. Train/test split
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# 5. Build pipeline stages (applied per CV fold)
stages = []

# 5a. Categorical processing
for cat in categorical_cols:
    indexer = StringIndexer(inputCol=cat, outputCol=f"{cat}_idx")
    ohe = OneHotEncoder(inputCol=f"{cat}_idx", outputCol=f"{cat}_ohe")
    stages += [indexer, ohe]

# 5b. Numeric imputation
current_numeric_cols = list(numeric_cols) # Make a copy to modify
processed_cols_map = {col: col for col in numeric_cols}


imputer_input_actual = ["review_scores_rating", "bedrooms"] 
imputer_output_actual = [f"{c}_imputed" for c in imputer_input_actual]
imputer = Imputer(inputCols=imputer_input_actual, outputCols=imputer_output_actual).setStrategy("median")
stages.append(imputer)

# Update dict
for orig_col, imputed_col in zip(imputer_input_actual, imputer_output_actual):
    processed_cols_map[orig_col] = imputed_col

# 5c. Winsorization

winsor_input_original_names = ["bedrooms", "minimum_nights"] # Original names
winsor_input_current_names = [processed_cols_map[c] for c in winsor_input_original_names]
winsor_output = [f"{processed_cols_map[c]}_wins" for c in winsor_input_original_names]

winsor = Winsorizer(
    inputCols=winsor_input_current_names,
    outputCols=winsor_output,
    lowerQuantile=0.05,
    upperQuantile=0.95
)
stages.append(winsor)


# Update dict 
for orig_col, wins_col in zip(winsor_input_original_names, winsor_output):
    processed_cols_map[orig_col] = wins_col

final_numeric_features = list(processed_cols_map.values())

# 5e. Assemble features
onehot_cols  = [f"{c}_ohe" for c in categorical_cols]


# Final feature input list
feature_inputs = final_numeric_features + onehot_cols + binary_cols
assembler = VectorAssembler(inputCols=feature_inputs, outputCol="assembled_features")
stages.append(assembler)

# 5f. Feature scaling ( Unnecessary for tree based models )
#scaler = RobustScaler(inputCol="assembled_features", outputCol="features")
#stages.append(scaler)

# 6. Estimator: RandomForestRegressor (or swap LinearRegression)
# regressor = RandomForestRegressor(featuresCol="features", labelCol=label_col, seed=42)

regressor =  GBTRegressor(featuresCol="assembled_features", labelCol=label_col, seed=42)
stages.append(regressor)

# 7. Pipeline creation
pipeline = Pipeline(stages=stages)

# 8. Hyperparameter grid for Random Forest
# paramGrid = ParamGridBuilder() \
#     .addGrid(regressor.numTrees, [20, 50]) \
#     .addGrid(regressor.maxDepth, [5, 10]) \
#     .build()

# 8. Hyperparameter grid for GBTRegressor
paramGrid = ParamGridBuilder() \
    .addGrid(regressor.maxIter, [20, 50]) \
    .addGrid(regressor.maxDepth, [3, 5]) \
    .addGrid(regressor.stepSize, [0.1, 0.05]) \
    .build()

# 9. Cross-validation setup
evaluator = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="rmse")
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5,
    parallelism=2,
    seed = 42 
)

# 10. Model fitting
cv_model = crossval.fit(train_df)

# 11. Best model & params
best_model = cv_model.bestModel
best_rf = best_model.stages[-1]
print(f"Best numTrees: {best_rf.getNumTrees}")
print(f"Best maxDepth: {best_rf.getOrDefault('maxDepth')}")

# 12. Test set evaluation
preds = best_model.transform(test_df)
rmse = evaluator.evaluate(preds)
r2 = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="r2").evaluate(preds)
print(f"Test RMSE = {rmse:.4f}, R2 = {r2:.4f}")

Best numTrees: 50
Best maxDepth: 5
Test RMSE = 47.9895, R2 = 0.5303
