In [0]:
%run ../DataProcesing/Transversal/config

In [0]:
%run ../DataProcesing/Transversal/utils



In [0]:
df = spark.table(Gold_Train_Model_Dataset).drop("event_date")

train, test = df.randomSplit([0.8, 0.2], seed=666)

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler


neighborhood_indexer = StringIndexer(inputCol="neighborhood", outputCol="neighborhood_index", handleInvalid="keep")
day_indexer = StringIndexer(inputCol="day_number", outputCol="day_number_index", handleInvalid="keep")

neighborhood_encoder = OneHotEncoder(inputCol="neighborhood_index", outputCol="neighborhood_ohe")
day_encoder = OneHotEncoder(inputCol="day_number_index", outputCol="day_number_ohe")


numeric_cols = [col for col in train.columns if col not in ["neighborhood", "day_number", "quantity_products", "event_date"]]

vec_numeric = VectorAssembler(inputCols=numeric_cols, outputCol="numeric_features_vec")
scaler = StandardScaler(inputCol="numeric_features_vec", outputCol="numeric_features_scaled")

assembler = VectorAssembler(
    inputCols=["numeric_features_scaled", "neighborhood_ohe", "day_number_ohe"],
    outputCol="features"
)

In [0]:
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator


models = {
    "LinearRegression": LinearRegression(featuresCol="features", labelCol="quantity_products"),
    "RandomForest": RandomForestRegressor(featuresCol="features", labelCol="quantity_products", seed=666),
    "GradientBoostedTrees": GBTRegressor(featuresCol="features", labelCol="quantity_products", seed=666)
}


evaluators = {
    "RMSE": RegressionEvaluator(labelCol="quantity_products", predictionCol="prediction", metricName="rmse"),
    "MAE": RegressionEvaluator(labelCol="quantity_products", predictionCol="prediction", metricName="mae"),
    "R2": RegressionEvaluator(labelCol="quantity_products", predictionCol="prediction", metricName="r2")
}

In [0]:
from pyspark.ml import Pipeline
from mlflow.models.signature import infer_signature
import mlflow
import mlflow.spark


results = {}
trained_models = {}

for model_name, model in models.items():
    print(f"\nEntrenando {model_name}")
    
    pipeline = Pipeline(stages=[
        neighborhood_indexer,
        day_indexer,
        neighborhood_encoder,
        day_encoder,
        vec_numeric,
        scaler,
        assembler,
        model])
        
    pipeline_model = pipeline.fit(train)
    trained_models[model_name] = pipeline_model

    predictions = pipeline_model.transform(test)
    
    metrics = {}
    for metric_name, evaluator in evaluators.items():
        metrics[metric_name] = evaluator.evaluate(predictions)
    
    results[model_name] = metrics

    predictions_train = pipeline_model.transform(train)
    signature = infer_signature(train.drop("quantity_products"), predictions_train.select("prediction"))

    with mlflow.start_run(run_name=f"{model_name}_run"):
        mlflow.spark.log_model(
            pipeline_model,
            artifact_path="model",
            registered_model_name=f"{model_name}_v2",
            signature=signature
        )


Entrenando LinearRegression


Downloading artifacts:   0%|          | 0/75 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

2025/08/01 01:26:43 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Downloading artifacts:   0%|          | 0/75 [00:00<?, ?it/s]



Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Successfully registered model 'unalwater_v2.default.linearregression_v2'.


Downloading artifacts:   0%|          | 0/79 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/79 [00:00<?, ?it/s]

🔗 Created version '1' of model 'unalwater_v2.default.linearregression_v2': https://dbc-e9915cc8-6c3e.cloud.databricks.com/explore/data/models/unalwater_v2/default/linearregression_v2/version/1?o=841556636100288



Entrenando RandomForest


Downloading artifacts:   0%|          | 0/80 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

2025/08/01 01:29:25 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Downloading artifacts:   0%|          | 0/80 [00:00<?, ?it/s]



Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Successfully registered model 'unalwater_v2.default.randomforest_v2'.


Downloading artifacts:   0%|          | 0/84 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/84 [00:00<?, ?it/s]

🔗 Created version '1' of model 'unalwater_v2.default.randomforest_v2': https://dbc-e9915cc8-6c3e.cloud.databricks.com/explore/data/models/unalwater_v2/default/randomforest_v2/version/1?o=841556636100288



Entrenando GradientBoostedTrees


Downloading artifacts:   0%|          | 0/80 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

2025/08/01 01:32:18 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Downloading artifacts:   0%|          | 0/80 [00:00<?, ?it/s]



Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Successfully registered model 'unalwater_v2.default.gradientboostedtrees_v2'.


Downloading artifacts:   0%|          | 0/84 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/84 [00:00<?, ?it/s]

🔗 Created version '1' of model 'unalwater_v2.default.gradientboostedtrees_v2': https://dbc-e9915cc8-6c3e.cloud.databricks.com/explore/data/models/unalwater_v2/default/gradientboostedtrees_v2/version/1?o=841556636100288


In [0]:
print(f"{'Modelo':<20} {'RMSE':<10} {'MAE':<10} {'R2':<10}")

for model_name, metrics in results.items():

    print(f"{model_name:<20} {metrics['RMSE']:<10.2f} {metrics['MAE']:<10.2f} "
          f"{metrics['R2']:<10.4f}")

sorted_models = sorted(results.items(), key=lambda item: item[1]['RMSE'])
best_model_name = sorted_models[0][0]
best_model = trained_models[best_model_name]

print(f"\nMEJOR MODELO: {best_model_name}")

Modelo               RMSE       MAE        R2        
LinearRegression     194.07     147.42     0.8991    
RandomForest         206.55     142.68     0.8857    
GradientBoostedTrees 279.40     179.56     0.7909    

MEJOR MODELO: LinearRegression


In [0]:
best_predictions = best_model.transform(df)

bias_by_neighborhood = calculate_bias_by_neighborhood(dataset=best_predictions)

In [0]:
mlflow.spark.save_model(
    spark_model=best_model,
    path="/Volumes/unalwater_v2/default/files/model_test/"
)
