## Random Forest

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split, KFold, RandomizedSearchCV
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
import numpy as np
import warnings
import joblib

warnings.filterwarnings("ignore")

df = pd.read_csv('../data/casas_idealista_procesado.csv', sep=';')

# 🔧 Fill m2_utiles and m2_construidos from each other if only one is missing
if 'm2_utiles' in df.columns and 'm2_construidos' in df.columns:
    solo_utiles = df['m2_utiles'].isna() & df['m2_construidos'].notna()
    solo_construidos = df['m2_construidos'].isna() & df['m2_utiles'].notna()

    df.loc[solo_utiles, 'm2_utiles'] = df.loc[solo_utiles, 'm2_construidos']
    df.loc[solo_construidos, 'm2_construidos'] = df.loc[solo_construidos, 'm2_utiles']

# ❌ Drop rows where both m2_utiles and m2_construidos are missing
df = df.dropna(subset=['m2_utiles', 'm2_construidos'])

# ❌ Drop rows without bedrooms, bathrooms, or floor information
df = df.dropna(subset=['habitaciones', 'banos'])

df['planta_numero'] = df['planta_numero'].fillna(0)

# 🔧 Fill NaN in numerical columns with the mean
for column in df.select_dtypes(include=['float64', 'int64']).columns:
    df[column].fillna(df[column].mean(), inplace=True)

# 🔧 Fill NaN in categorical columns with "Unknown"
for column in df.select_dtypes(include=['object']).columns:
    df[column].fillna('Desconocido', inplace=True)

X = df.drop(['Precio', 'id'], axis=1)
y = df['Precio']

X_encoded = pd.get_dummies(X)

X_train, X_test, y_train, y_test = train_test_split(X_encoded, y, test_size=0.2, random_state=42)

# Range of max_depth values to test
max_depths = range(3, 20)
best_depth = None
best_score = float('inf')
best_model = None

# Threshold to consider an outlier based on residuals
outlier_threshold = 1.5 

kf = KFold(n_splits=5, shuffle=True, random_state=42)

for max_depth in max_depths:
    fold_scores = []
    
    for train_index, val_index in kf.split(X_train):
        X_train_fold, X_val_fold = X_train.iloc[train_index], X_train.iloc[val_index]
        y_train_fold, y_val_fold = y_train.iloc[train_index], y_train.iloc[val_index]
        
        rf_model = RandomForestRegressor(n_estimators=2000, max_depth=max_depth, random_state=42)
        rf_model.fit(X_train_fold, y_train_fold)
        
        y_pred_fold = rf_model.predict(X_val_fold)
        
        residuals = np.abs(y_val_fold - y_pred_fold)
        no_outliers = residuals < (outlier_threshold * residuals.std())

        X_val_clean = X_val_fold[no_outliers]
        y_val_clean = y_val_fold[no_outliers]

        rf_model_clean = RandomForestRegressor(n_estimators=2000, max_depth=max_depth, random_state=42)
        rf_model_clean.fit(X_train_fold, y_train_fold)

        y_pred_clean = rf_model_clean.predict(X_val_clean)
        mse_clean = mean_squared_error(y_val_clean, y_pred_clean)
        
        fold_scores.append(mse_clean)
    
    average_mse = np.mean(fold_scores)
    
    if average_mse < best_score or best_score == float('inf'):
        best_score = average_mse
        best_depth = max_depth
        best_model = rf_model_clean  

print(f'El mejor max_depth es {best_depth} con un MSE promedio de {best_score:.2f}')

# Optimization of additional hyperparameters
param_dist = {
    "n_estimators": [500, 1000, 2000],
    "max_depth": [best_depth],
    "min_samples_split": [2, 5, 10],
    "min_samples_leaf": [1, 2, 4],
    "max_features": ["auto", "sqrt", "log2"]
}

rf_opt_model = RandomForestRegressor(random_state=42)
random_search = RandomizedSearchCV(rf_opt_model, param_distributions=param_dist, n_iter=10, cv=3, n_jobs=-1)
random_search.fit(X_train, y_train)

best_rf = random_search.best_estimator_

y_pred = best_rf.predict(X_test)
residuals_final = np.abs(y_test - y_pred)
no_outliers_final = residuals_final < (outlier_threshold * residuals_final.std())

X_test_clean = X_test[no_outliers_final]
y_test_clean = y_test[no_outliers_final]

best_rf.fit(X_train, y_train)

y_pred_clean = best_rf.predict(X_test_clean)
mse_clean_final = mean_squared_error(y_test_clean, y_pred_clean)
r2_clean_final = r2_score(y_test_clean, y_pred_clean)

print(f'MSE final sin outliers: {mse_clean_final}')
print(f'R^2 final sin outliers: {r2_clean_final}')

joblib.dump(best_rf, '../data/random_forest_venta.pkl')
joblib.dump(X_encoded.columns.tolist(), '../data/columnas_entrenamiento_rf_venta.pkl')

## BAGGING

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split, KFold, RandomizedSearchCV
from sklearn.ensemble import BaggingRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_squared_error, r2_score
import numpy as np
import warnings
import joblib

warnings.filterwarnings("ignore")

df_bagging = pd.read_csv('../data/casas_idealista_procesado.csv', sep=';')

# 🔧 Fill m2_utiles and m2_construidos from each other if only one is missing
if 'm2_utiles' in df_bagging.columns and 'm2_construidos' in df_bagging.columns:
    solo_utiles = df_bagging['m2_utiles'].isna() & df_bagging['m2_construidos'].notna()
    solo_construidos = df_bagging['m2_construidos'].isna() & df_bagging['m2_utiles'].notna()

    df_bagging.loc[solo_utiles, 'm2_utiles'] = df_bagging.loc[solo_utiles, 'm2_construidos']
    df_bagging.loc[solo_construidos, 'm2_construidos'] = df_bagging.loc[solo_construidos, 'm2_utiles']

# ❌ Drop rows where both m2_utiles and m2_construidos are missing
df_bagging = df_bagging.dropna(subset=['m2_utiles', 'm2_construidos'])

df_bagging['planta_numero'] = df_bagging['planta_numero'].fillna(0)

# ❌ Drop rows without bedrooms, bathrooms, or floor information
df_bagging = df_bagging.dropna(subset=['habitaciones', 'banos'])

# 🔧 Fill NaN in numerical columns with the mean (except m2)
for column in df_bagging.select_dtypes(include=['float64', 'int64']).columns:
    if column not in ['m2_utiles', 'm2_construidos']:
        df_bagging[column].fillna(df_bagging[column].mean(), inplace=True)

# 🔧 Fill NaN in categorical columns with "Unknown"
for column in df_bagging.select_dtypes(include=['object']).columns:
    df_bagging[column].fillna('Desconocido', inplace=True)

X_bagging = df_bagging.drop(['Precio', 'id'], axis=1)
y_bagging = df_bagging['Precio']

X_bagging_encoded = pd.get_dummies(X_bagging)

X_bagging_train, X_bagging_test, y_bagging_train, y_bagging_test = train_test_split(
    X_bagging_encoded, y_bagging, test_size=0.2, random_state=42
)

# Base model: Decision Tree
base_dt_bagging_model = DecisionTreeRegressor(random_state=42)

# Define hyperparameters for optimization
param_dist = {
    "n_estimators": [50, 100, 200],
    "max_samples": [0.5, 0.7, 1.0],
    "max_features": [0.5, 0.7, 1.0],
    "bootstrap": [True, False]
}

bagging_model = BaggingRegressor(estimator=base_dt_bagging_model, random_state=42)

random_search = RandomizedSearchCV(
    bagging_model, param_distributions=param_dist, n_iter=10, cv=3, n_jobs=-1
)
random_search.fit(X_bagging_train, y_bagging_train)

best_bagging_model = random_search.best_estimator_

# Threshold to consider an outlier based on residuals
outlier_threshold_bagging = 1.5

kf_bagging = KFold(n_splits=5, shuffle=True, random_state=42)
best_bagging_score = float('inf')

bagging_results = []

for train_index, val_index in kf_bagging.split(X_bagging_train):
    X_train_bagging_fold, X_val_bagging_fold = (
        X_bagging_train.iloc[train_index],
        X_bagging_train.iloc[val_index],
    )
    y_train_bagging_fold, y_val_bagging_fold = (
        y_bagging_train.iloc[train_index],
        y_bagging_train.iloc[val_index],
    )

    best_bagging_model.fit(X_train_bagging_fold, y_train_bagging_fold)

    y_pred_bagging_fold = best_bagging_model.predict(X_val_bagging_fold)

    residuals_bagging = np.abs(y_val_bagging_fold - y_pred_bagging_fold)

    no_outliers_bagging = residuals_bagging < (
        outlier_threshold_bagging * residuals_bagging.std()
    )
    X_val_clean_bagging = X_val_bagging_fold[no_outliers_bagging]
    y_val_clean_bagging = y_val_bagging_fold[no_outliers_bagging]

    best_bagging_model.fit(X_train_bagging_fold, y_train_bagging_fold)

    y_pred_clean_bagging = best_bagging_model.predict(X_val_clean_bagging)
    mse_clean_bagging = mean_squared_error(y_val_clean_bagging, y_pred_clean_bagging)

    bagging_results.append(mse_clean_bagging)

    if mse_clean_bagging < best_bagging_score:
        best_bagging_score = mse_clean_bagging

average_bagging_mse = np.mean(bagging_results)
print(f'El MSE promedio con Bagging es {average_bagging_mse:.2f}')

# Train the final model
best_bagging_model.fit(X_bagging_train, y_bagging_train)

y_pred_bagging = best_bagging_model.predict(X_bagging_test)

residuals_final_bagging = np.abs(y_bagging_test - y_pred_bagging)

no_outliers_final_bagging = residuals_final_bagging < (
    outlier_threshold_bagging * residuals_final_bagging.std()
)
X_bagging_test_clean = X_bagging_test[no_outliers_final_bagging]
y_bagging_test_clean = y_bagging_test[no_outliers_final_bagging]

best_bagging_model.fit(X_bagging_train, y_bagging_train)

y_pred_clean_bagging_final = best_bagging_model.predict(X_bagging_test_clean)
mse_clean_bagging_final = mean_squared_error(y_bagging_test_clean, y_pred_clean_bagging_final)
r2_clean_bagging_final = r2_score(y_bagging_test_clean, y_pred_clean_bagging_final)

print(f'MSE final sin outliers (Bagging): {mse_clean_bagging_final}')
print(f'R^2 final sin outliers (Bagging): {r2_clean_bagging_final}')

joblib.dump(best_bagging_model, '../data/bagging_venta.pkl')
joblib.dump(X_bagging_encoded.columns.tolist(), '../data/columnas_entrenamiento_bagging_venta.pkl')

## GRADIENT BOOSTING

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split, KFold, RandomizedSearchCV
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_squared_error, r2_score
import numpy as np
import warnings
import joblib

warnings.filterwarnings("ignore")

df_gb = pd.read_csv('../data/casas_idealista_procesado.csv', sep=';')

# 🔧 Fill m2_utiles and m2_construidos from each other if only one is missing
if 'm2_utiles' in df_gb.columns and 'm2_construidos' in df_gb.columns:
    solo_utiles = df_gb['m2_utiles'].isna() & df_gb['m2_construidos'].notna()
    solo_construidos = df_gb['m2_construidos'].isna() & df_gb['m2_utiles'].notna()

    df_gb.loc[solo_utiles, 'm2_utiles'] = df_gb.loc[solo_utiles, 'm2_construidos']
    df_gb.loc[solo_construidos, 'm2_construidos'] = df_gb.loc[solo_construidos, 'm2_utiles']

df_gb['planta_numero'] = df_gb['planta_numero'].fillna(0)

# ❌ Drop rows where both m2_utiles and m2_construidos are missing
df_gb = df_gb.dropna(subset=['m2_utiles', 'm2_construidos'])

# ❌ Drop rows without bedrooms, bathrooms, or floor information
df_gb = df_gb.dropna(subset=['habitaciones', 'banos', 'planta_numero'])

# 🔧 Fill NaN in numerical columns with the mean (except m2)
for column in df_gb.select_dtypes(include=['float64', 'int64']).columns:
    if column not in ['m2_utiles', 'm2_construidos']:
        df_gb[column].fillna(df_gb[column].mean(), inplace=True)

# 🔧 Fill NaN in categorical columns with "Unknown"
for column in df_gb.select_dtypes(include=['object']).columns:
    df_gb[column].fillna('Desconocido', inplace=True)

X_gb = df_gb.drop(['Precio', 'id'], axis=1)
y_gb = df_gb['Precio']

X_gb_encoded = pd.get_dummies(X_gb)

X_gb_train, X_gb_test, y_gb_train, y_gb_test = train_test_split(
    X_gb_encoded, y_gb, test_size=0.2, random_state=42
)

# Define hyperparameters for optimization
param_dist_gb = {
    "n_estimators": [100, 200, 500],
    "learning_rate": [0.01, 0.05, 0.1],
    "max_depth": [3, 5, 10],
    "subsample": [0.7, 0.85, 1.0]
}

gb_model = GradientBoostingRegressor(random_state=42)

random_search_gb = RandomizedSearchCV(
    gb_model, param_distributions=param_dist_gb, n_iter=10, cv=3, n_jobs=-1
)
random_search_gb.fit(X_gb_train, y_gb_train)

best_gb_model = random_search_gb.best_estimator_

# Threshold to consider an outlier based on residuals
outlier_threshold_gb = 1.5

kf_gb = KFold(n_splits=5, shuffle=True, random_state=42)
best_gb_score = float("inf")

gb_results = []

for train_index, val_index in kf_gb.split(X_gb_train):
    X_gb_train_fold, X_gb_val_fold = (
        X_gb_train.iloc[train_index],
        X_gb_train.iloc[val_index],
    )
    y_gb_train_fold, y_gb_val_fold = (
        y_gb_train.iloc[train_index],
        y_gb_train.iloc[val_index],
    )

    best_gb_model.fit(X_gb_train_fold, y_gb_train_fold)

    y_pred_gb_fold = best_gb_model.predict(X_gb_val_fold)

    residuals_gb = np.abs(y_gb_val_fold - y_pred_gb_fold)

    no_outliers_gb = residuals_gb < (outlier_threshold_gb * residuals_gb.std())
    X_val_clean_gb = X_gb_val_fold[no_outliers_gb]
    y_val_clean_gb = y_gb_val_fold[no_outliers_gb]

    best_gb_model.fit(X_gb_train_fold, y_gb_train_fold)

    y_pred_clean_gb = best_gb_model.predict(X_val_clean_gb)
    mse_clean_gb = mean_squared_error(y_val_clean_gb, y_pred_clean_gb)

    gb_results.append(mse_clean_gb)

    if mse_clean_gb < best_gb_score:
        best_gb_score = mse_clean_gb

average_gb_mse = np.mean(gb_results)
print(f'El MSE promedio con Gradient Boosting es {average_gb_mse:.2f}')

best_gb_model.fit(X_gb_train, y_gb_train)

y_pred_gb = best_gb_model.predict(X_gb_test)

residuals_final_gb = np.abs(y_gb_test - y_pred_gb)

no_outliers_final_gb = residuals_final_gb < (
    outlier_threshold_gb * residuals_final_gb.std()
)
X_gb_test_clean = X_gb_test[no_outliers_final_gb]
y_gb_test_clean = y_gb_test[no_outliers_final_gb]

y_pred_clean_gb_final = best_gb_model.predict(X_gb_test_clean)
mse_clean_gb_final = mean_squared_error(y_gb_test_clean, y_pred_clean_gb_final)
r2_clean_gb_final = r2_score(y_gb_test_clean, y_pred_clean_gb_final)

print(f'MSE final sin outliers (Gradient Boosting): {mse_clean_gb_final}')
print(f'R^2 final sin outliers (Gradient Boosting): {r2_clean_gb_final}')

joblib.dump(best_gb_model, '../data/gb_venta.pkl')
joblib.dump(X_gb_encoded.columns.tolist(), '../data/columnas_entrenamiento_gb_venta.pkl')

## SMV

In [None]:
import numpy as np
from sklearn.svm import SVR
from sklearn.model_selection import RandomizedSearchCV, KFold, train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score
import pandas as pd
import warnings
import joblib

warnings.filterwarnings("ignore")

# Load the data
df_svr = pd.read_csv('casas_idealista_procesado.csv', sep=';')

# Fill NaN in numerical columns with the mean
for column in df_svr.select_dtypes(include=['float64', 'int64']).columns:
    df_svr[column].fillna(df_svr[column].mean(), inplace=True)

# Fill NaN in categorical columns with "Unknown"
for column in df_svr.select_dtypes(include=['object']).columns:
    df_svr[column].fillna('Desconocido', inplace=True)

X_svr = df_svr.drop(['Precio', 'id'], axis=1)
y_svr = df_svr['Precio']

X_svr_encoded = pd.get_dummies(X_svr)

X_svr_train, X_svr_test, y_svr_train, y_svr_test = train_test_split(
    X_svr_encoded, y_svr, test_size=0.2, random_state=42
)

# Standardize features
scaler_svr = StandardScaler()
X_svr_train_scaled = scaler_svr.fit_transform(X_svr_train)
X_svr_test_scaled = scaler_svr.transform(X_svr_test)

# Define hyperparameters for optimization
param_dist_svr = {
    "C": np.logspace(0, 3, 10),
    "kernel": ["rbf"],
    "gamma": np.logspace(-3, 2, 10),
}

svr_model = SVR()

random_search_svr = RandomizedSearchCV(
    svr_model, param_distributions=param_dist_svr, n_iter=10, cv=3, n_jobs=-1
)
random_search_svr.fit(X_svr_train_scaled, y_svr_train)

best_svr_model = random_search_svr.best_estimator_

# Cross-validation with outlier filtering
kf_svr = KFold(n_splits=5, shuffle=True, random_state=42)
best_svr_score = float("inf")

svr_results = []

for train_index, val_index in kf_svr.split(X_svr_train_scaled):
    X_train_svr_fold, X_val_svr_fold = (
        X_svr_train_scaled[train_index],
        X_svr_train_scaled[val_index],
    )
    y_train_svr_fold, y_val_svr_fold = (
        y_svr_train.iloc[train_index],
        y_svr_train.iloc[val_index],
    )

    best_svr_model.fit(X_train_svr_fold, y_train_svr_fold)

    y_pred_svr_fold = best_svr_model.predict(X_val_svr_fold)

    residuals_svr = np.abs(y_val_svr_fold - y_pred_svr_fold)

    # Outlier filtering
    no_outliers_svr = residuals_svr < (1.5 * residuals_svr.std())
    X_val_clean_svr = X_val_svr_fold[no_outliers_svr]
    y_val_clean_svr = y_val_svr_fold[no_outliers_svr]

    best_svr_model.fit(X_train_svr_fold, y_train_svr_fold)

    y_pred_clean_svr = best_svr_model.predict(X_val_clean_svr)
    mse_clean_svr = mean_squared_error(y_val_clean_svr, y_pred_clean_svr)

    svr_results.append(mse_clean_svr)

    if mse_clean_svr < best_svr_score:
        best_svr_score = mse_clean_svr

average_svr_mse = np.mean(svr_results)
print(f'El MSE promedio con SVR es {average_svr_mse:.2f}')

# Train final model
best_svr_model.fit(X_svr_train_scaled, y_svr_train)

y_pred_svr = best_svr_model.predict(X_svr_test_scaled)

residuals_final_svr = np.abs(y_svr_test - y_pred_svr)

# Outlier filtering on test set
no_outliers_final_svr = residuals_final_svr < (1.5 * residuals_final_svr.std())
X_svr_test_clean = X_svr_test_scaled[no_outliers_final_svr]
y_svr_test_clean = y_svr_test[no_outliers_final_svr]

y_pred_clean_svr_final = best_svr_model.predict(X_svr_test_clean)
mse_clean_svr_final = mean_squared_error(y_svr_test_clean, y_pred_clean_svr_final)
r2_clean_svr_final = r2_score(y_svr_test_clean, y_pred_clean_svr_final)

print(f'MSE final sin outliers (SVR): {mse_clean_svr_final}')
print(f'R^2 final sin outliers (SVR): {r2_clean_svr_final}')

# Save model, scaler, and training columns
joblib.dump(best_svr_model, 'svr_venta.pkl')
joblib.dump(scaler_svr, 'scaler_svr.pkl')
joblib.dump(X_svr_encoded.columns.tolist(), 'columnas_entrenamiento_svr_venta.pkl')
