In [0]:
%run ../ConfigFolder/ConfigSAS

# Notebook Description

This notebook provides a comprehensive analysis and visualization of the dataset. It includes data preprocessing, exploratory data analysis, and the implementation of various machine learning models to predict outcomes. The notebook is structured to guide the user through each step, ensuring a clear understanding of the process and results.



In [0]:
# 1) Load Delta DataFrame
model_combined_Path = generate_path('g-model-madrid-min-features-combined', 'goldlayer')
df_model_combined = spark.read.format("delta").load(model_combined_Path)

# 2) Extract 'year' and 'month' from 'period'
from pyspark.sql.functions import col, substring, when

df_model_combined = df_model_combined.withColumn("year", substring(col("period"), 1, 4)) \
                                     .withColumn("month", substring(col("period"), 6, 2))

# 3) Derive 'quarter' (based on 'month') and drop 'month' and 'period'
df_model_combined = df_model_combined.withColumn(
    "quarter",
    when(col("month") == "3", "1")
     .when(col("month") == "6", "2")
     .when(col("month") == "9", "3")
     .when(col("month") == "2", "4")
).drop("month", "period")

# 4) Cast columns to appropriate types
df_cleaned = (
    df_model_combined
    .withColumn('bathrooms',  col('bathrooms').cast('int'))
    .withColumn('isparking',   col('isparkingspaceincludedinprice').cast('int'))
    .withColumn('latitude',    col('latitude').cast('float'))
    .withColumn('longitude',   col('longitude').cast('float'))
    .withColumn('price',       col('price').cast('float'))
    .withColumn('rooms',       col('rooms').cast('int'))
    .withColumn('size',        col('size').cast('float'))
    .withColumn('year',        col('year').cast('int'))
    .withColumn('quarter',     col('quarter').cast('int'))
)

df_cleaned.printSchema()

# 5) Create feature vector with VectorAssembler
from pyspark.ml.feature import VectorAssembler
feature_cols = ['bathrooms', 'isparking', 'latitude', 'longitude', 'rooms', 'size', 'year', 'quarter']

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol='features',
    handleInvalid='skip'
)

# 6) Transform DataFrame to obtain 'features' and 'price'
df_prepared = assembler.transform(df_cleaned).select('features', 'price')

# 7) Split into training and test sets
train_data, test_data = df_prepared.randomSplit([0.8, 0.2], seed=42)

# 8) Train Gradient Boosting model
import mlflow
import mlflow.spark
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

xgb = GBTRegressor(featuresCol='features', labelCol='price', maxIter=100, maxDepth=5)

with mlflow.start_run(run_name="boost_model_v1") as run:
    # Train
    xgb_model = xgb.fit(train_data)
    
    # Predict
    xgb_predictions = xgb_model.transform(test_data)
    
    # Evaluate with RMSE, MAE, and R2
    evaluator_rmse = RegressionEvaluator(labelCol='price', predictionCol='prediction', metricName='rmse')
    evaluator_mae  = RegressionEvaluator(labelCol='price', predictionCol='prediction', metricName='mae')
    evaluator_r2   = RegressionEvaluator(labelCol='price', predictionCol='prediction', metricName='r2')
    
    rmse_xgb = evaluator_rmse.evaluate(xgb_predictions)
    mae_xgb  = evaluator_mae.evaluate(xgb_predictions)
    r2_xgb   = evaluator_r2.evaluate(xgb_predictions)
    
    # Log metrics in MLflow
    mlflow.log_metric("rmse_xgb", rmse_xgb)
    mlflow.log_metric("mae_xgb",  mae_xgb)
    mlflow.log_metric("r2_xgb",   r2_xgb)
    
    mlflow.log_param("seed", 42)
    mlflow.log_param("feature_cols", feature_cols)
    mlflow.log_param("maxIter", 100)
    mlflow.log_param("maxDepth", 5)
    
    # Log the model
    mlflow.spark.log_model(xgb_model, "gbt-model")

    print(f"[GBT]  RMSE={rmse_xgb}, MAE={mae_xgb}, R2={r2_xgb}")