In [None]:
from sklearn.linear_model import Ridge
from sklearn.model_selection import GridSearchCV
import pandas as pd
import numpy as np


def tune_ridge_over_feature_sets(
    X_train_scaled,
    y_train,
    feature_sets,
    cv,
    scoring,
    alpha_grid=None,
    random_state=42
):
    """
    Tune Ridge regression over multiple feature sets using GridSearchCV.

    Parameters
    ----------
    X_train_scaled : pd.DataFrame
        Scaled training features (all features).
    y_train : pd.Series or np.ndarray
        Target variable for training.
    feature_sets : dict
        Dictionary mapping feature set names to lists of column names.
    cv : cross-validation splitter
        e.g., KFold instance.
    scoring : dict
        Scoring dictionary for GridSearchCV (e.g., r2, rmse, mae).
    alpha_grid : list or None
        List of alpha values to test. Defaults to [0.1, 1.0, 10.0, 100.0].
    random_state : int
        Random state for reproducibility.

    Returns
    -------
    results_df : pd.DataFrame
        Cross-validated performance for each feature set.
    best_row : pd.Series
        Row corresponding to the best-performing Ridge model (by RÂ²).
    best_model_name : str
        Human-readable name of the best Ridge model.
    """

    if alpha_grid is None:
        alpha_grid = [0.1, 1.0, 10.0, 100.0]

    ridge_param_grid = {
        "alpha": alpha_grid,
        "fit_intercept": [True, False]
    }

    ridge_results = []

    for set_name, features in feature_sets.items():
        X_loop = X_train_scaled[features]

        grid_ridge = GridSearchCV(
            estimator=Ridge(random_state=random_state),
            param_grid=ridge_param_grid,
            cv=cv,
            scoring=scoring,
            refit="r2"
        )

        grid_ridge.fit(X_loop, y_train)
        best_idx = grid_ridge.best_index_

        ridge_results.append({
            "Feature Set": set_name,
            "Alpha": grid_ridge.best_params_["alpha"],
            "r2": grid_ridge.cv_results_["mean_test_r2"][best_idx],
            "rmse": abs(grid_ridge.cv_results_["mean_test_rmse"][best_idx]),
            "mae": abs(grid_ridge.cv_results_["mean_test_mae"][best_idx])
        })

    results_df = pd.DataFrame(ridge_results)
    best_row = results_df.sort_values("r2", ascending=False).iloc[0]

    best_model_name = f'Ridge (alpha={best_row["Alpha"]})'

    return results_df, best_row, best_model_name


In [None]:
def tune_ridge(
    X_train_scaled,
    y_train,
    feature_set=feature_set,
    cv=kf,
    scoring=scoring,
    alpha_grid=None,
    random_state=42
):
    # setup parameter grid for Ridge
    if alpha_grid is None:
        ridge_param_grid = {
            "alpha": [0.1, 1.0, 10.0, 100.0],
            "fit_intercept": [True, False]
        }

    ridge_feat_results = []

    for set_name, features in feature_set.items():
        loop_X = X_train_scaled[features]

        grid_ridge = GridSearchCV(
            Ridge(random_state=random_state),
            param_grid=ridge_param_grid,
            cv=cv,
            scoring=scoring,
            refit='r2'
        )

        grid_ridge.fit(loop_X, y_train)
        best_index = grid_ridge.best_index_

        ridge_feat_results.append({
            "Feature Set": set_name,
            "Alpha": grid_ridge.best_params_['alpha'],
            "r2": grid_ridge.cv_results_['mean_test_r2'][best_index],
            "rmse": abs(grid_ridge.cv_results_['mean_test_rmse'][best_index]),
            "mae": abs(grid_ridge.cv_results_['mean_test_mae'][best_index]),
        })

    ridge_feat_df = pd.DataFrame(ridge_feat_results)
    best_row = ridge_feat_df.sort_values("r2", ascending=False).iloc[0]

    best_model_name = f'Ridge (alpha={best_row["Alpha"]})'

    return ridge_feat_df, best_row, best_model_name

In [None]:
ridge_feat_df, best_ridge_row, best_ridge_name = tune_ridge(
    X_train_scaled=X_train_opt_scaled,
    y_train=y_train_opt,
    feature_sets=feature_set,
    cv=kf,
    scoring=scoring
)

ridge_feat_df

In [None]:
# find and track the best Ridge performer
best_ridge_row = ridge_feat_df.sort_values("r2", ascending=False).iloc[0]

best_scores_opt.append({
    "Model": best_ridge_name,
    "Feature_set": best_ridge_row["Feature Set"],
    "r2": best_ridge_row["r2"],
    "rmse": best_ridge_row["rmse"],
    "mae": best_ridge_row["mae"]
})

In [None]:
def tune_simple_linear(
    X_train_scaled,
    y_train,
    cv=5,
    scoring=scoring
):
    """
    Tune Simple Linear Regression by testing all single features.
    """

    simple_results = []

    for col in X_train_scaled.columns:
        X_col = X_train_scaled[[col]]
        model = LinearRegression()

        scores = cross_validate(
            model,
            X_col,
            y_train,
            cv=cv,
            scoring=scoring
        )

        simple_results.append({
            "Feature": col,
            "r2": scores["test_r2"].mean(),
            "rmse": abs(scores["test_rmse"].mean()),
            "mae": abs(scores["test_mae"].mean())
        })

    results_df = pd.DataFrame(simple_results)
    best_row = results_df.sort_values("r2", ascending=False).iloc[0]
    best_model_name = f"Simple Linear ({best_row['Feature']})"

    return results_df, best_row, best_model_name

In [None]:
# find the Simple Linear entry
best_simple_entry = next(
    item for item in best_scores_opt
    if item["Model"].startswith("Simple Linear")
)

best_feature = best_simple_entry["Feature_set"]

# initialise best simple regression model
model_simple_best = LinearRegression(fit_intercept=True)
model_simple_best.fit(
    X_train_opt_scaled[[best_feature]],
    y_train_opt
)

# add to model dictionary
best_models_dict["Simple Linear Regression"] = {
    "model_obj": model_simple_best,
    "features": [best_feature]
}

In [None]:
pd.DataFrame(best_models_dict)

In [None]:
def tune_multi_linear(
    X_train_scaled,
    y_train,
    feature_set=feature_set,
    cv=kf,
    scoring=scoring
):
    """
    Tune Multiple Linear Regression over feature sets.
    """

    param_grid = {
        "fit_intercept": [True, False]
    }

    multi_feat_results = []

    for set_name, features in feature_set.items():
        loop_X = X_train_scaled[features]

        grid_multi = GridSearchCV(
            LinearRegression(),
            param_grid,
            cv=cv,
            scoring=scoring,
            refit="r2"
        )

        grid_multi.fit(loop_X, y_train)
        best_index = grid_multi.best_index_

        multi_feat_results.append({
            "Feature Set": set_name,
            "Num Features": len(features),
            "r2": grid_multi.cv_results_["mean_test_r2"][best_index],
            "rmse": abs(grid_multi.cv_results_["mean_test_rmse"][best_index]),
            "mae": abs(grid_multi.cv_results_["mean_test_mae"][best_index]),
            "fit_intercept": grid_multi.best_params_["fit_intercept"]
        })

    results_df = pd.DataFrame(multi_feat_results)
    best_row = results_df.sort_values("r2", ascending=False).iloc[0]
    best_model_name = "Multiple Linear Regression"

    return results_df, best_row, best_model_name

In [None]:
def tune_polynomial(
    X_train_scaled,
    y_train,
    feature_set=feature_set,
    cv=kf,
    scoring=scoring
):
    """
    Tune Polynomial Regression (interaction-only).
    """

    poly_pipe = make_pipeline(
        PolynomialFeatures(interaction_only=True),
        LinearRegression()
    )

    param_grid = {
        "polynomialfeatures__degree": [1, 2, 3, 4],
        "linearregression__fit_intercept": [True, False]
    }

    poly_feat_results = []

    for set_name, features in feature_set.items():
        loop_X = X_train_scaled[features]

        grid_poly = GridSearchCV(
            poly_pipe,
            param_grid,
            cv=cv,
            scoring=scoring,
            refit="r2",
            n_jobs=-1
        )

        grid_poly.fit(loop_X, y_train)
        best_index = grid_poly.best_index_

        poly_feat_results.append({
            "Feature Set": set_name,
            "Num Features": len(features),
            "Degree": grid_poly.best_params_["polynomialfeatures__degree"],
            "r2": grid_poly.cv_results_["mean_test_r2"][best_index],
            "rmse": abs(grid_poly.cv_results_["mean_test_rmse"][best_index]),
            "mae": abs(grid_poly.cv_results_["mean_test_mae"][best_index]),
        })

    results_df = pd.DataFrame(poly_feat_results)
    best_row = results_df.sort_values("r2", ascending=False).iloc[0]
    best_model_name = f"Polynomial (deg={best_row['Degree']})"

    return results_df, best_row, best_model_name

In [None]:
best_models_dict[ridge_label] = {
    'model_obj': model_ridge_best,
    'features': best_ridge_feats
}

model_simple
model_multi
model_poly
model_lasso
model_ridge

In [None]:
def tune_lasso(
    X_train_scaled,
    y_train,
    feature_set=feature_set,
    cv=kf,
    scoring=scoring,
    alpha_grid=None,
    random_state=42
):
    """
    Tune Lasso Regression over feature sets.
    """

    if alpha_grid is None:
        alpha_grid = [0.01, 0.1, 1.0, 10.0]

    param_grid = {
        "alpha": alpha_grid,
        "fit_intercept": [True, False]
    }

    lasso_feat_results = []

    for set_name, features in feature_set.items():
        loop_X = X_train_scaled[features]

        grid_lasso = GridSearchCV(
            Lasso(random_state=random_state, max_iter=10000),
            param_grid,
            cv=cv,
            scoring=scoring,
            refit="r2"
        )

        grid_lasso.fit(loop_X, y_train)
        best_index = grid_lasso.best_index_

        lasso_feat_results.append({
            "Feature Set": set_name,
            "Alpha": grid_lasso.best_params_["alpha"],
            "r2": grid_lasso.cv_results_["mean_test_r2"][best_index],
            "rmse": abs(grid_lasso.cv_results_["mean_test_rmse"][best_index]),
            "mae": abs(grid_lasso.cv_results_["mean_test_mae"][best_index]),
        })

    results_df = pd.DataFrame(lasso_feat_results)
    best_row = results_df.sort_values("r2", ascending=False).iloc[0]
    best_model_name = f"Lasso (alpha={best_row['Alpha']})"

    return results_df, best_row, best_model_name

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression


def process_thermal_data(
    df: pd.DataFrame,
    target_col: str,
    f_set: dict,
    kf_split,
    score_dict: dict
):
    """
    Processes a subset of data (Heating / Cooling / Off) through the full
    optimization pipeline and returns the best models.

    Returns
    -------
    best_models_dict : dict
        Trained final models with selected features
    best_scores : list
        Best CV scores for each estimator
    X_test_scaled : pd.DataFrame
        Scaled test features
    y_test : pd.Series
        Test target values
    """

    # -----------------------------
    # 1. Split & scale
    # -----------------------------
    X = df.drop(columns=[target_col])
    y = df[target_col]

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    scaler = StandardScaler()
    X_train_scaled = pd.DataFrame(
        scaler.fit_transform(X_train),
        columns=X.columns,
        index=X_train.index
    )

    X_test_scaled = pd.DataFrame(
        scaler.transform(X_test),
        columns=X.columns,
        index=X_test.index
    )

    # -----------------------------
    # 2. Run all tuning functions
    # -----------------------------
    best_scores = []
    best_models_dict = {}

    # --- Simple Linear ---
    simple_df, simple_best, simple_name = tune_simple_linear(
        X_train_scaled,
        y_train,
        cv=kf_split,
        scoring=score_dict
    )

    best_scores.append({
        "Model": simple_name,
        "Feature_set": simple_best["Feature"],
        "r2": simple_best["r2"],
        "rmse": simple_best["rmse"],
        "mae": simple_best["mae"]
    })

    model_simple = LinearRegression()
    model_simple.fit(
        X_train_scaled[[simple_best["Feature"]]],
        y_train
    )

    best_models_dict["Simple Linear"] = {
        "model_obj": model_simple,
        "features": [simple_best["Feature"]]
    }

    # --- Multiple Linear ---
    multi_df, multi_best, multi_name = tune_multi_linear(
        X_train_scaled,
        y_train,
        feature_set=f_set,
        cv=kf_split,
        scoring=score_dict
    )

    best_scores.append({
        "Model": multi_name,
        "Feature_set": multi_best["Feature Set"],
        "r2": multi_best["r2"],
        "rmse": multi_best["rmse"],
        "mae": multi_best["mae"]
    })

    model_multi = LinearRegression(
        fit_intercept=multi_best["fit_intercept"]
    )
    model_multi.fit(
        X_train_scaled[f_set[multi_best["Feature Set"]]],
        y_train
    )

    best_models_dict["Multi Linear"] = {
        "model_obj": model_multi,
        "features": f_set[multi_best["Feature Set"]]
    }

    # --- Polynomial ---
    poly_df, poly_best, poly_name = tune_polynomial(
        X_train_scaled,
        y_train,
        feature_set=f_set,
        cv=kf_split,
        scoring=score_dict
    )

    best_scores.append({
        "Model": poly_name,
        "Feature_set": poly_best["Feature Set"],
        "r2": poly_best["r2"],
        "rmse": poly_best["rmse"],
        "mae": poly_best["mae"]
    })

    best_models_dict["Polynomial"] = {
        "model_obj": None,  # pipeline already refit internally
        "features": f_set[poly_best["Feature Set"]]
    }

    # --- Lasso ---
    lasso_df, lasso_best, lasso_name = tune_lasso(
        X_train_scaled,
        y_train,
        feature_set=f_set,
        cv=kf_split,
        scoring=score_dict
    )

    best_scores.append({
        "Model": lasso_name,
        "Feature_set": lasso_best["Feature Set"],
        "r2": lasso_best["r2"],
        "rmse": lasso_best["rmse"],
        "mae": lasso_best["mae"]
    })

    model_lasso = Lasso(
        alpha=lasso_best["Alpha"],
        max_iter=10000,
        random_state=42
    )
    model_lasso.fit(
        X_train_scaled[f_set[lasso_best["Feature Set"]]],
        y_train
    )

    best_models_dict["Lasso"] = {
        "model_obj": model_lasso,
        "features": f_set[lasso_best["Feature Set"]]
    }

    # --- Ridge ---
    ridge_df, ridge_best, ridge_name = tune_ridge(
        X_train_scaled,
        y_train,
        feature_set=f_set,
        cv=kf_split,
        scoring=score_dict
    )

    best_scores.append({
        "Model": ridge_name,
        "Feature_set": ridge_best["Feature Set"],
        "r2": ridge_best["r2"],
        "rmse": ridge_best["rmse"],
        "mae": ridge_best["mae"]
    })

    model_ridge = Ridge(
        alpha=ridge_best["Alpha"],
        random_state=42
    )
    model_ridge.fit(
        X_train_scaled[f_set[ridge_best["Feature Set"]]],
        y_train
    )

    best_models_dict["Ridge"] = {
        "model_obj": model_ridge,
        "features": f_set[ridge_best["Feature Set"]]
    }

    # -----------------------------
    # 3. Return everything needed downstream
    # -----------------------------
    return best_models_dict, best_scores, X_test_scaled, y_test


In [None]:

    best_results.append({
        "Model": simple_name,
        "Feature_set": simple_best["Feature"],
        "r2": simple_best["r2"],
        "rmse": simple_best["rmse"],
        "mae": simple_best["mae"]
    })

In [None]:
# Example for Heating Subset
rf_df_h, rf_best_h, rf_model_h = tune_rf(X_train_heat_scaled, y_train_heat, feature_set, kf, scoring)
gb_df_h, gb_best_h, gb_model_h = tune_gb(X_train_heat_scaled, y_train_heat, feature_set, kf, scoring)

best_models_dict["Optimized Random Forest (Heating)"] = {
    "model_obj": rf_model_h,
    "features": feature_set[rf_best_h["Feature Set"]],
    "is_optimized": True
}

best_models_dict["Optimized Gradient Boosting (Heating)"] = {
    "model_obj": gb_model_h,
    "features": feature_set[gb_best_h["Feature Set"]],
    "is_optimized": True
}

In [None]:
test_results = []

for name, info in best_models_dict.items():
    model = info['model_obj']
    feats = info['features']

    # 1. Select the correct X_test data source
    if info.get('is_optimized') == True:
        X_test_input = X_test_opt_scaled
    else:
        X_test_input = X_test_scaled

    # 2. Prepare the features for prediction
    # Logic for Baseline Polynomial (requires expansion)
    if info.get('is_poly_baseline') == True:
        data_to_predict = poly.transform(X_test_input[feats])
    # Logic for all other models (Standard Linear, Trees, etc.)
    else:
        data_to_predict = X_test_input[feats]

    # 3. Predict on the TEST set
    y_pred = model.predict(data_to_predict)

    # 4. Calculate metrics
    test_results.append({
        "Model": name,
        "Optimized": info.get('is_optimized'),
        "R2": r2_score(y_test_opt, y_pred),
        "RMSE": np.sqrt(mean_squared_error(y_test_opt, y_pred)),
        "MAE": mean_absolute_error(y_test_opt, y_pred)
    })

# Convert to DataFrame and sort
test_results_df = (
    pd.DataFrame(test_results)
    .sort_values("R2", ascending=False)
    .reset_index(drop=True)
)

test_results_df