In [1]:
!pip install numpy pandas scipy scikit-learn optuna




[notice] A new release of pip is available: 25.0.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


1. Dataset Loading and Preprocessing

First, you need to load your .arff datasets and prepare them for scikit-learn

In [6]:
import pandas as pd
import numpy as np
from scipy.io import arff
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

def load_and_preprocess_arff(filepath, target_column_name=None):
    """
    Loads an ARFF file, identifies features and target, and prepares data.

    Args:
        filepath (str): Path to the .arff file.
        target_column_name (str, optional): Name of the target column.
                                            If None, assumes last column is target.

    Returns:
        tuple: X (features DataFrame), y (target Series), feature_names, categorical_features_idx
    """
    data, meta = arff.loadarff('datasets/covtype-normalized.arff')
    df = pd.DataFrame(data)

    # Decode byte strings to standard strings for all columns
    for col in df.columns:
        if df[col].dtype == 'object': # Check if dtype is object, which means it might contain byte strings
            try:
                # Attempt to decode, if it fails, it might be a different object type or mixed
                df[col] = df[col].apply(lambda x: x.decode('utf-8') if isinstance(x, bytes) else x)
            except (UnicodeDecodeError, AttributeError):
                # Handle cases where not all are bytes or if decoding fails
                pass

    if target_column_name is None:
        target_column_name = df.columns[-1]

    X = df.drop(columns=[target_column_name])
    y = df[target_column_name]

    # Identify categorical and numerical features
    categorical_features = X.select_dtypes(include=['object', 'category']).columns
    numerical_features = X.select_dtypes(include=np.number).columns

    # Get indices for ColumnTransformer
    feature_names = X.columns.tolist()
    categorical_features_idx = [feature_names.index(col) for col in categorical_features]

    print(f"Loaded {filepath}. Shape: {df.shape}")
    print(f"Target column: {target_column_name}")
    print(f"Categorical features: {list(categorical_features)}")
    print(f"Numerical features: {list(numerical_features)}")

    return X, y, feature_names, categorical_features_idx, df.columns.tolist() # Return all column names too

def get_preprocessor(numerical_features, categorical_features):
    """
    Creates a column transformer for preprocessing.
    """
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', 'passthrough', numerical_features), # No transformation for numerical
            ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features)
        ],
        remainder='passthrough' # Keep other columns if any, or 'drop' them
    )
    return preprocessor

2. Optuna Objective Function (with Partial RF Evaluation)

This is the core of your HPO logic for a single trial. It defines how a Random Forest is trained partially and how its performance is reported.

In [7]:
import optuna
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.metrics import accuracy_score, mean_squared_error, roc_auc_score
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold # For classification
from sklearn.model_selection import KFold # For regression
import warnings

# Suppress ConvergenceWarning from scikit-learn, which might occur with warm_start
warnings.filterwarnings("ignore", category=UserWarning, module='sklearn')

def objective(trial, X_train_raw, y_train_encoded, X_val_raw, y_val_encoded,
              task_type='classification', preprocessor=None, max_n_estimators_upper_bound=1000):
    """
    Optuna objective function for Random Forest HPO with multi-fidelity evaluation.

    Args:
        trial (optuna.trial.Trial): Current Optuna trial object.
        X_train_raw (pd.DataFrame): Raw training features.
        y_train_encoded (np.array): Encoded training target.
        X_val_raw (pd.DataFrame): Raw validation features.
        y_val_encoded (np.array): Encoded validation target.
        task_type (str): 'classification' or 'regression'.
        preprocessor (ColumnTransformer): Preprocessing pipeline.
        max_n_estimators_upper_bound (int): Max possible n_estimators for a full run.
                                            Used to define fidelity levels.

    Returns:
        float: Validation metric for the trial.
    """
    # 1. Hyperparameter Sampling
    n_estimators = trial.suggest_int("n_estimators", 50, max_n_estimators_upper_bound)
    max_depth = trial.suggest_int("max_depth", 2, 32)
    min_samples_leaf = trial.suggest_int("min_samples_leaf", 1, 20)
    max_features_val = trial.suggest_categorical("max_features", ['sqrt', 'log2', 0.5, 0.7, 1.0])

    if task_type == 'classification':
        # For classification, adjust criterion and potentially class_weight
        criterion = trial.suggest_categorical("criterion", ["gini", "entropy"])
        class_weight = trial.suggest_categorical("class_weight", [None, "balanced"])
        model_class = RandomForestClassifier
    else: # Regression
        criterion = trial.suggest_categorical("criterion", ["squared_error", "absolute_error"])
        class_weight = None # Not applicable for regression
        model_class = RandomForestRegressor

    # Instantiate the base model once
    rf_model = model_class(
        n_estimators=1, # Start with 1, warm_start will add more
        max_depth=max_depth,
        min_samples_leaf=min_samples_leaf,
        max_features=max_features_val,
        criterion=criterion,
        class_weight=class_weight,
        oob_score=True, # Critical for our strategy
        random_state=42,
        warm_start=True, # Critical for partial evaluation
        n_jobs=-1 # Use all available cores
    )

    # Preprocess data if a preprocessor is provided
    if preprocessor:
        X_train_processed = preprocessor.fit_transform(X_train_raw)
        X_val_processed = preprocessor.transform(X_val_raw)
    else:
        X_train_processed = X_train_raw
        X_val_processed = X_val_raw


    # 2. Fidelity Schedule and Partial Evaluation
    # Define fidelity steps as a percentage of the suggested n_estimators for this trial
    # Using small steps to simulate fine-grained learning curve
    fidelity_steps = [int(n_estimators * p) for p in [0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]]
    fidelity_steps = sorted(list(set([max(1, s) for s in fidelity_steps]))) # Ensure at least 1 estimator

    for step_n_estimators in fidelity_steps:
        if step_n_estimators > rf_model.n_estimators: # Only add if more estimators are needed
            rf_model.n_estimators = step_n_estimators # Update n_estimators
            rf_model.fit(X_train_processed, y_train_encoded) # Incremental fit due to warm_start=True

        # Get OOB score - OOB score is available if oob_score=True and n_estimators > 1
        # For the very first step (1 estimator), OOB score might not be reliable or available
        if rf_model.n_estimators > 1 and hasattr(rf_model, 'oob_score_'):
            current_oob_score = rf_model.oob_score_
        else:
            # Fallback for very early steps or if OOB score isn't directly meaningful yet
            # Can use a placeholder or simply avoid reporting for these steps if pruner handles it
            current_oob_score = np.nan # Or a base performance

        # 3. Report intermediate result to Optuna.
        # It's crucial to report the metric that the pruner expects to minimize/maximize.
        # For accuracy/AUC, we maximize. For error (RMSE, OOB error), we minimize.
        # Let's assume we want to maximize some score. If OOB_score is an error, invert it.
        # Here we'll report OOB score directly and let the pruner handle min/max (or convert it)
        if task_type == 'classification':
            # OOB_score_ for RandomForestClassifier is accuracy. So, maximize.
            trial.report(current_oob_score, step=step_n_estimators)
        else:
            # OOB_score_ for RandomForestRegressor is R^2. So, maximize.
            trial.report(current_oob_score, step=step_n_estimators)


        # 4. Check for pruning. The CustomPruner will receive this report.
        if trial.should_prune():
            print(f"Trial {trial.number} pruned at {step_n_estimators} estimators (OOB: {current_oob_score:.4f}).")
            raise optuna.exceptions.TrialPruned()

    # Final evaluation on validation set after full training
    y_pred = rf_model.predict(X_val_processed)

    if task_type == 'classification':
        # For binary classification, use predict_proba for AUC
        if hasattr(rf_model, 'predict_proba') and len(np.unique(y_val_encoded)) == 2:
            y_proba = rf_model.predict_proba(X_val_processed)[:, 1]
            final_metric = roc_auc_score(y_val_encoded, y_proba)
            metric_name = "AUC"
        else: # Multi-class or if proba not available
            final_metric = accuracy_score(y_val_encoded, y_pred)
            metric_name = "Accuracy"
    else: # Regression
        final_metric = -mean_squared_error(y_val_encoded, y_pred) # Minimize MSE, so negate for Optuna (maximize)
        metric_name = "Negative MSE"

    print(f"Trial {trial.number} completed with {metric_name}: {final_metric:.4f}")
    return final_metric

3. Custom Optuna Pruner Implementation

This is where your "Performance Prediction from Partial Evaluation" and "Pruning Logic" come together.

In [8]:
import optuna
import numpy as np
import pandas as pd
from collections import defaultdict
from sklearn.linear_model import LinearRegression # Simple model for illustration

class RandomForestMultiFidelityPruner(optuna.pruners.BasePruner):
    """
    Custom Optuna pruner for Random Forest Hyperparameter Optimization
    using a multi-fidelity approach and learning curve prediction.
    """
    def __init__(self,
                 min_intermediate_steps=5, # Minimum number of reported steps before considering pruning
                 pruning_quantile=0.25,     # Prune if predicted performance is below this quantile of best
                 grace_period_ratio=0.1,    # Proportion of n_estimators_max to wait before aggressive pruning
                 predictive_model_type='linear'): # 'linear' or 'simple_extrapolation'
        self.min_intermediate_steps = min_intermediate_steps
        self.pruning_quantile = pruning_quantile
        self.grace_period_ratio = grace_period_ratio
        self.predictive_model_type = predictive_model_type

        # Store historical OOB scores and steps for performance prediction model
        # Structure: {trial_id: [(step1, oob_score1), (step2, oob_score2), ...]}
        self.trial_learning_curves = defaultdict(list)
        self.completed_trial_performances = [] # Store final performances of completed trials

    def prune(self, study: optuna.study.Study, trial: optuna.trial.FrozenTrial) -> bool:
        trial_id = trial.number
        current_step = trial.last_step
        current_value = trial.value

        # Store current intermediate value for learning curve
        if current_step is not None and current_value is not None:
            self.trial_learning_curves[trial_id].append((current_step, current_value))

        # Do not prune during the grace period or if too few steps have been reported
        if current_step < self.grace_period_ratio * trial.params.get('n_estimators', 1000) or \
           len(self.trial_learning_curves[trial_id]) < self.min_intermediate_steps:
            return False

        # Get historical data for the current trial's learning curve
        current_lc_data = self.trial_learning_curves[trial_id]
        steps = np.array([s for s, _ in current_lc_data])
        values = np.array([v for _, v in current_lc_data])
        
        # Ensure we have enough data points to fit a predictor
        if len(steps) < 2: # Need at least 2 points for a simple line or curve
            return False

        # Predict final performance based on partial evaluation
        predicted_final_performance = self._predict_final_performance(steps, values, trial.params.get('n_estimators', 1000))

        # Get best completed trial's performance so far
        # Optuna stores objective values. We need to handle maximization vs minimization.
        # By default, Optuna aims to minimize. If our objective returns a higher-is-better metric,
        # we need to consider 'study.best_value' carefully.
        # Assuming our objective returns a higher-is-better metric (e.g., AUC, Accuracy),
        # so study.best_value is the max.
        best_objective_value = study.best_value

        # Only prune if there are completed trials to compare against
        if best_objective_value is None:
            return False

        # Pruning logic: Compare predicted performance to a quantile of best so far
        # We need to compute a threshold based on the distribution of past performances,
        # or relative to the best_objective_value.
        # For maximization: Prune if predicted_final_performance is significantly worse than best_objective_value.
        
        # A simple approach: prune if predicted performance is X% worse than the best.
        # Or, if predicted performance is below a certain quantile of *all completed trials' final performances*.
        
        # For this example, let's use a simpler heuristic:
        # Prune if predicted performance is below the (pruning_quantile) quantile of the best
        # of completed trials' final performances.
        
        # NOTE: This pruner needs to be "aware" if Optuna is maximizing or minimizing.
        # For maximization (like AUC or Accuracy), a lower value is worse.
        # For minimization (like OOB error directly, or MSE), a higher value is worse.
        # Assume our objective always returns a metric that Optuna maximizes (e.g., -MSE or Accuracy)
        
        # To get the best completed trials, we iterate and filter
        completed_trial_values = [
            t.value for t in study.trials
            if t.state == optuna.trial.TrialState.COMPLETE and t.value is not None
        ]
        
        if not completed_trial_values:
            return False # No completed trials to compare against yet

        # This part requires careful thought:
        # If maximizing, we want to prune trials whose P_pred is "too low".
        # If minimizing, we want to prune trials whose P_pred is "too high".
        # Optuna's best_value is always "best" according to its direction.
        
        # Let's assume the objective always returns a metric to be maximized (e.g., Accuracy, AUC, negative MSE).
        # So, lower values are worse.
        threshold = np.quantile(completed_trial_values, self.pruning_quantile) # e.g., 25th percentile of completed trials
        
        # Apply a safety margin for the uncertainty trap, especially if few trials are complete
        # This margin could be dynamic, e.g., wider if standard deviation of values is high
        safety_margin = 0.0 # Placeholder for more sophisticated margin logic
        
        # If predicted_final_performance is worse than the threshold, prune.
        if predicted_final_performance < (threshold - safety_margin):
            return True
        
        return False

    def _predict_final_performance(self, steps, values, max_n_estimators):
        """
        Predicts the final performance based on observed learning curve.
        """
        if self.predictive_model_type == 'linear':
            # Simple linear regression on steps vs values
            # This is a very basic model and might not capture RF learning curve well
            model = LinearRegression()
            model.fit(steps.reshape(-1, 1), values)
            predicted_value = model.predict(np.array([[max_n_estimators]]))[0]
            
            # Simple heuristic: If prediction goes beyond observed range in a "bad" direction, cap it.
            # For maximization: predicted_value should not be lower than min observed, nor higher than max observed by much
            # For minimization: predicted_value should not be higher than max observed, nor lower than min observed by much
            if max_n_estimators > steps[-1]: # If extrapolating
                # If maximizing, ensure prediction doesn't sharply decline
                if predicted_value < values[-1] and (values[-1] - predicted_value) > np.std(values):
                    predicted_value = values[-1] # Cap it to last observed value if it's a sharp decline

            return predicted_value

        elif self.predictive_model_type == 'simple_extrapolation':
            # Assumes performance plateaus or improves slowly after a certain point.
            # Averages last few observations to predict final.
            if len(values) >= 3: # Need at least last 3 points for averaging
                return np.mean(values[-3:])
            else:
                return values[-1] # Fallback to last observed value

        # You would implement more complex models here, e.g., curve fitting for exponential decay/saturation
        # For RF OOB, a common pattern is to fit an exponential decay or similar saturation curve.
        # Example for curve fitting (requires scipy.optimize.curve_fit):
        # from scipy.optimize import curve_fit
        # def learning_curve_func(x, a, b, c): # e.g., a * exp(-b*x) + c for minimization
        #     return a * np.exp(-b * x) + c
        # try:
        #     popt, pcov = curve_fit(learning_curve_func, steps, values, p0=[1, 0.1, np.mean(values)])
        #     return learning_curve_func(max_n_estimators, *popt)
        # except RuntimeError:
        #     return values[-1] # Fallback if curve_fit fails

        return values[-1] # Default fallback

4. Running the Optuna Study

This brings all the pieces together.

In [9]:
from sklearn.model_selection import train_test_split, StratifiedKFold # Import StratifiedKFold
from sklearn.preprocessing import LabelEncoder
import functools # For partial function application

# --- Configuration Parameters ---
N_TRIALS = 100 # Number of HPO trials
RANDOM_STATE = 42
DATASET_PATH = 'your_dataset.arff' # Update this
TARGET_COLUMN = 'Rings' # Update this for your dataset
TASK_TYPE = 'regression' # 'classification' or 'regression'

# --- Load and Preprocess Data ---
# Load the raw data (replace with your actual file loading)
X_raw, y_raw, feature_names, categorical_features_idx, all_cols = load_and_preprocess_arff(
    DATASET_PATH, target_column_name=TARGET_COLUMN
)

# Encode target variable if classification and not already numeric
if TASK_TYPE == 'classification' and (y_raw.dtype == 'object' or y_raw.dtype == 'category'):
    le = LabelEncoder()
    y_encoded = le.fit_transform(y_raw)
    print(f"Encoded target classes: {le.classes_}")
else:
    y_encoded = y_raw

# Create the preprocessor
numerical_feats = X_raw.select_dtypes(include=np.number).columns.tolist()
categorical_feats = X_raw.select_dtypes(include=['object', 'category']).columns.tolist()
data_preprocessor = get_preprocessor(numerical_feats, categorical_feats)

# Split into training and validation sets for the objective function
# Note: For robust evaluation in a full research paper, you'd typically use K-Fold Cross-Validation
# outside the Optuna study, and Optuna's objective would run on internal validation sets
# or average K-Fold results. For a single train/val split for quick demonstration:
X_train_split, X_val_split, y_train_split, y_val_split = train_test_split(
    X_raw, y_encoded, test_size=0.2, random_state=RANDOM_STATE,
    stratify=y_encoded if TASK_TYPE == 'classification' else None
)


# --- Initialize Custom Pruner ---
custom_pruner = RandomForestMultiFidelityPruner(
    min_intermediate_steps=5,
    pruning_quantile=0.25, # Prune if predicted performance is in the bottom 25% of completed trials
    grace_period_ratio=0.1, # Don't prune for first 10% of n_estimators
    predictive_model_type='linear' # or 'simple_extrapolation'
)

# --- Create Optuna Study ---
# Set direction to 'maximize' if your objective returns Accuracy, AUC, or -MSE (higher is better)
# Set direction to 'minimize' if your objective returns Error (e.g., OOB_error or MSE directly, lower is better)
study = optuna.create_study(
    direction="maximize" if TASK_TYPE == 'classification' else "maximize", # Assuming we maximize accuracy/AUC or -MSE
    pruner=custom_pruner,
    sampler=optuna.samplers.TPESampler(seed=RANDOM_STATE),
    study_name="rf_multi_fidelity_hpo_study",
    storage="sqlite:///rf_hpo.db" # Optional: Persist study results to a DB
)

# --- Wrap the objective function with fixed arguments ---
# Using functools.partial to pass fixed arguments to the objective
objective_with_args = functools.partial(
    objective,
    X_train_raw=X_train_split,
    y_train_encoded=y_train_split,
    X_val_raw=X_val_split,
    y_val_encoded=y_val_split,
    task_type=TASK_TYPE,
    preprocessor=data_preprocessor,
    max_n_estimators_upper_bound=1000 # Example, make sure this matches your search space's upper bound
)

# --- Start the Optimization ---
print(f"Starting Optuna study with {N_TRIALS} trials...")
study.optimize(objective_with_args, n_trials=N_TRIALS, show_progress_bar=True)

# --- Print Results ---
print("\nOptimization finished.")
print(f"Best trial: {study.best_trial.value:.4f}")
print("Best hyperparameters:")
for key, value in study.best_trial.params.items():
    print(f"  {key}: {value}")

# You can also access trial details
pruned_trials = study.get_trials(deepcopy=False, states=[optuna.trial.TrialState.PRUNED])
complete_trials = study.get_trials(deepcopy=False, states=[optuna.trial.TrialState.COMPLETE])
print(f"Number of pruned trials: {len(pruned_trials)}")
print(f"Number of complete trials: {len(complete_trials)}")

KeyError: "['Rings'] not found in axis"

5. Evaluation and Results Saving

After the study, you'll want to save and analyze your results.

In [None]:
import joblib # For saving models and preprocessors
import time

def evaluate_final_model(best_params, X_train_full, y_train_full, X_test_full, y_test_full,
                         task_type='classification', preprocessor=None):
    """
    Trains the best model from HPO on the full training data and evaluates on a test set.
    """
    start_time = time.time()

    # Preprocess full data
    if preprocessor:
        X_train_processed = preprocessor.fit_transform(X_train_full)
        X_test_processed = preprocessor.transform(X_test_full)
    else:
        X_train_processed = X_train_full
        X_test_processed = X_test_full

    # Re-instantiate the best model
    if task_type == 'classification':
        model = RandomForestClassifier(random_state=42, n_jobs=-1, **best_params)
    else:
        model = RandomForestRegressor(random_state=42, n_jobs=-1, **best_params)

    model.fit(X_train_processed, y_train_full)
    fit_time = time.time() - start_time

    # Evaluate
    y_pred = model.predict(X_test_processed)

    if task_type == 'classification':
        final_metric = accuracy_score(y_test_full, y_pred)
        metric_name = "Accuracy"
        if hasattr(model, 'predict_proba') and len(np.unique(y_test_full)) == 2:
            y_proba = model.predict_proba(X_test_processed)[:, 1]
            auc_score = roc_auc_score(y_test_full, y_proba)
            print(f"Final Model AUC: {auc_score:.4f}")
    else:
        final_metric = mean_squared_error(y_test_full, y_pred)
        metric_name = "MSE"

    print(f"\n--- Final Model Evaluation ---")
    print(f"Metric ({metric_name}): {final_metric:.4f}")
    print(f"Training time for final model: {fit_time:.2f} seconds")

    return model, final_metric


# --- Example of full training and evaluation ---
# Assuming you have loaded X_raw, y_encoded, data_preprocessor from before

# Final split for evaluation
X_train_full, X_test_full, y_train_full, y_test_full = train_test_split(
    X_raw, y_encoded, test_size=0.2, random_state=RANDOM_STATE,
    stratify=y_encoded if TASK_TYPE == 'classification' else None
)

if study.best_trial:
    best_params = study.best_trial.params
    final_model, final_score = evaluate_final_model(
        best_params, X_train_full, y_train_full, X_test_full, y_test_full,
        task_type=TASK_TYPE, preprocessor=data_preprocessor
    )

    # Save the best model and preprocessor
    joblib.dump(final_model, 'best_rf_model.pkl')
    joblib.dump(data_preprocessor, 'data_preprocessor.pkl')
    print("Best model and preprocessor saved.")

# --- Analysis of Optuna Study Results ---
# You can use Optuna's visualization tools
# import optuna.visualization as ov
# # Requires plotly and kaleido
# # pip install plotly kaleido

# # Plot optimization history
# fig1 = ov.plot_optimization_history(study)
# fig1.show()
# # Save the plot
# # fig1.write_image("optimization_history.png")

# # Plot intermediate values
# fig2 = ov.plot_intermediate_values(study)
# fig2.show()
# # fig2.write_image("intermediate_values.png")

# # Plot parameter importance
# fig3 = ov.plot_param_importances(study)
# fig3.show()
# # fig3.write_image("param_importances.png")

# You can also manually extract data for custom plots (e.g., speedup vs. accuracy from Table 3)