## Lab 5
In this lab, you will implement the random forest algorithm to build models for regression.

In [1]:
import numpy as np
import pandas as pd
import seaborn as sbs
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

Import the data in 'ENB2012_data.csv', split it into training, testing and validation sets.

In [None]:
#your code here
data = pd.read_csv('ENB2012_data.csv')

X = data[['X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8']].values
y1 = data['Y1'].values  # Heating Load
y2 = data['Y2'].values  # Cooling Load

y_combined = np.column_stack((y1, y2))

# Split data into training, testing, and validation sets
X_train_full, X_temp, y_combined_train_full, y_combined_temp = train_test_split(X, y_combined, train_size=0.8, random_state=42)

X_val, X_test, y_combined_val, y_combined_test = train_test_split(X_temp, y_combined_temp, test_size=0.5, random_state=42)

# Unpack
y1_train_full, y2_train_full = y_combined_train_full[:, 0], y_combined_train_full[:, 1]
y1_val, y2_val = y_combined_val[:, 0], y_combined_val[:, 1]
y1_test, y2_test = y_combined_test[:, 0], y_combined_test[:, 1]

print(f"Dataset loaded. X shape: {X.shape}, Y1 shape: {y1.shape}, Y2 shape: {y2.shape}")
print(f"Training set size: {X_train_full.shape[0]} samples")
print(f"Validation set size: {X_val.shape[0]} samples")
print(f"Testing set size: {X_test.shape[0]} samples")

Dataset loaded. X shape: (768, 8), Y1 shape: (768,), Y2 shape: (768,)
Training set size: 614 samples
Validation set size: 77 samples
Testing set size: 77 samples


Built the function you need to train a normal decision tree:

In [None]:
#your code goes here
def bootstrap(X, num_bags=10, random_seed=0):
    """
    Given a dataset and a number of bags, sample the dataset with replacement.
    This function returns a list of indices with compatible dimensionality.

    Parameters
    ----------
    X : ndarray
        A dataset
    num_bags : int, default 10
        The number of bags to create
    random_seed : int, default 0
        Seed for the random number generator to ensure reproducibility.

    Returns
    -------
    list of ndarray
        The list contains `num_bags` integer one-dimensional ndarrays.
        Each of these contains the indices corresponding to the sampled datapoints in `X`.

    Notes
    -----
    * The number of datapoints in each bag will match the number of datapoints in the given dataset.
    """
    rng = np.random.default_rng(random_seed)
    num_samples = len(X)
    bags = []
    for _ in range(num_bags):
        # Generate indices with replacement
        indices = rng.choice(num_samples, size=num_samples, replace=True)
        bags.append(indices)
    return bags


def aggregate_regression(preds):
    """
    Aggregate predictions by several estimators.

    Parameters
    ----------
    preds : list of ndarray
        Predictions from multiple estimators. All ndarrays in this list should have the same
        dimensionality.

    Return
    ------
    ndarray
        The mean of the predictions.
    """
    # Calculate the mean across all predictions
    return np.mean(preds, axis=0)

def regression_criterion(region):
    """
    Implements the sum of squared error criterion in a region.

    Parameters
    ----------
    region : ndarray
        Array of shape (N,) containing the values of the target values
        for N datapoints in the training set.

    Returns
    -------
    float
        The sum of squared error.

    Note
    ----
    The error for an empty region should be infinity (use: float("inf")).
    This avoids creating empty regions.
    """
    if len(region) == 0:
        return float("inf")

    if isinstance(region, pd.DataFrame) or isinstance(region, pd.Series):
        region = region.values

    mean_value = np.mean(region)
    squared_errors = (region - mean_value) ** 2

    return np.sum(squared_errors)


def split_region(region_data, feature_index, tau):
    """
    Given a region, splits it based on the feature indicated by
    `feature_index`, the region will be split in two, where
    one side will contain all points with the feature with values
    lower than `tau`, and the other split will contain the
    remaining datapoints.

    Parameters
    ----------
    region_data : array of size (n_samples, n_features + 1)
        a partition of the dataset (or the full dataset) to be split,
        including the target variable as the last column.
    feature_index : int
        the index of the feature (column of the region array) used to make this partition
    tau : float
        The threshold used to make this partition

    Return
    ------
    left_partition : array
        datapoints in `region_data` where feature <= `tau`
    right_partition : array
        datapoints in `region_data` where feature > `tau`
    """
    if isinstance(region_data, pd.DataFrame):
        feature_values = region_data.iloc[:, feature_index].values
    else:
        feature_values = region_data[:, feature_index]

    left_indices = np.where(feature_values <= tau)[0]
    right_indices = np.where(feature_values > tau)[0]

    left_partition = region_data[left_indices]
    right_partition = region_data[right_indices]

    return left_partition, right_partition


class TreeNode:
    """
    Represents a node in the Decision Tree.
    """
    def __init__(self, data, feature_idx, feature_val, sq_error, is_leaf=False):
        self.data = data
        self.feature_idx = feature_idx
        self.feature_val = feature_val
        self.sq_error = sq_error
        self.left = None
        self.right = None
        self._is_leaf = is_leaf

    @property
    def is_leaf(self):
        """
        Determines if the node is a leaf node.
        A node is a leaf if it was explicitly marked as such, or if it has no children.
        """
        return self._is_leaf or (self.left is None and self.right is None)


class DecisionTree:
    """
    Custom Decision Tree Regressor implementation.
    """
    def __init__(self, criterion=regression_criterion, max_depth=None, min_samples_leaf=1, num_features=None, random_state=None):
        """
        Initializes the DecisionTree Regressor.

        Parameters
        ----------
        criterion : function, default regression_criterion
            The function used to evaluate the quality of a split (e.g., sum of squared errors).
        max_depth : int, default None
            The maximum depth of the tree. If None, then nodes are expanded until
            all leaves are pure or until all leaves contain less than min_samples_leaf samples.
        min_samples_leaf : int, default 1
            The minimum number of samples required to be at a leaf node.
        num_features : int or float or str, default None
            The number of features to consider when looking for the best split:
            - If int, then consider `num_features` features at each split.
            - If float, then `num_features` is a fraction and `int(num_features * n_features)` features are considered.
            - If "auto", then `num_features=sqrt(n_features)`.
            - If "sqrt", then `num_features=sqrt(n_features)`.
            - If "log2", then `num_features=log2(n_features)`.
            - If None, then all features are considered.
        random_state : int, default None
            Controls the random seed for reproducibility of feature selection within the tree.
        """
        self.max_depth = max_depth
        self.min_samples_leaf = min_samples_leaf
        self.criterion = criterion
        self.tree = None
        self.num_features = num_features
        self.random_state = random_state

    def fit(self, X_train, y_train):
        """
        Builds the decision tree from the training data.

        Parameters
        ----------
        X_train : ndarray
            Training features.
        y_train : ndarray
            Training target values.
        """
        train_data = np.column_stack((X_train, y_train))
        self.tree = self.create_tree(train_data, current_depth=0)

    def predict(self, X_test):
        """
        Predicts target values for new data using the trained Decision Tree.

        Parameters
        ----------
        X_test : ndarray
            Features of the new data points.

        Returns
        -------
        ndarray
            Predicted target values.
        """
        if self.tree is None:
            raise RuntimeError("Decision Tree not fitted. Call fit() before predict().")

        predictions = np.array([self.predict_sample(x, self.tree) for x in X_test])
        return predictions

    def create_tree(self, data, current_depth):
        """
        Recursively creates the decision tree.

        Parameters
        ----------
        data : ndarray
            The subset of data for the current node (features and target).
        current_depth : int
            The current depth of the node in the tree.

        Returns
        -------
        TreeNode
            The created node (either internal or leaf).
        """
        # Calculate sum of squared error for the current node's data
        current_node_sq_error = self.criterion(data[:, -1])

        # Stopping criteria:
        # 1. Max depth reached
        if self.max_depth is not None and current_depth >= self.max_depth:
            return TreeNode(data, None, None, current_node_sq_error, is_leaf=True)

        # 2. Minimum samples per leaf reached
        if len(data) <= self.min_samples_leaf:
            return TreeNode(data, None, None, current_node_sq_error, is_leaf=True)

        # Find the best split for the current data
        split_data_tuple, split_feature_idx, split_feature_val, best_split_sq_error = self.find_bestsplit(data)

        # 3. No valid split found or no improvement
        if split_feature_idx is None:
            return TreeNode(data, None, None, current_node_sq_error, is_leaf=True)

        # Create the current node
        node = TreeNode(data, split_feature_idx, split_feature_val, current_node_sq_error)

        # 4. Check if splits result in regions smaller than min_samples_leaf
        # This check ensures that even if a split is found, if it leads to
        # children nodes that are too small, we make the current node a leaf.
        left_data, right_data = split_data_tuple
        if len(left_data) < self.min_samples_leaf or len(right_data) < self.min_samples_leaf:
             return TreeNode(data, None, None, current_node_sq_error, is_leaf=True)

        # Recursively build left and right children
        node.left = self.create_tree(left_data, current_depth + 1)
        node.right = self.create_tree(right_data, current_depth + 1)

        return node

    def find_bestsplit(self, data):
        """
        Finds the best feature and threshold to split the given data.

        Parameters
        ----------
        data : ndarray
            The subset of data for which to find the best split.

        Returns
        -------
        tuple
            (regions, best_feature_index, best_tau, current_node_sq_error)
            regions : tuple of ndarrays (left_partition, right_partition)
            best_feature_index : int or None
            best_tau : float or None
            current_node_sq_error : float (sum of squared error for the current node's data)
        """
        if isinstance(data, pd.DataFrame):
            data = data.values
        if isinstance(data, pd.Series):
            data = data.values

        best_sq_error = float('inf')
        best_feature_index = None
        best_tau = None
        regions = None

        n_samples, n_total_cols = data.shape
        n_features = n_total_cols - 1  # Exclude target column (last column)

        # Calculate sum of squared error for the current node's data before splitting
        current_node_sq_error = self.criterion(data[:, -1])

        # Determine the actual number of features to sample for this split
        num_features_for_split = n_features  # Default to all features if num_features is None or invalid
        if self.num_features is not None:
            if isinstance(self.num_features, int):
                num_features_for_split = min(self.num_features, n_features)
            elif isinstance(self.num_features, float):
                num_features_for_split = int(self.num_features * n_features)
            elif self.num_features == "auto" or self.num_features == "sqrt":
                num_features_for_split = int(np.sqrt(n_features))
            elif self.num_features == "log2":
                num_features_for_split = int(np.log2(n_features))

        # Handle case where num_features_for_split might be 0 or negative
        if num_features_for_split <= 0:
            return None, None, None, current_node_sq_error # No valid split possible

        # Randomly select a subset of features for this split
        rng_split = np.random.default_rng(self.random_state)
        all_feature_indices = np.arange(n_features)
        selected_feature_indices = rng_split.choice(all_feature_indices, size=num_features_for_split, replace=False)

        for feature_index in selected_feature_indices:  # Iterate only over selected features
            feature_values = data[:, feature_index]
            possible_taus = np.unique(feature_values)

            # Avoid splitting on features with only one unique value (no meaningful split)
            if len(possible_taus) < 2:
                continue

            for tau in possible_taus:
                left, right = split_region(data, feature_index, tau) # Use the global split_region function

                # Ensure splits are not empty before evaluating criterion
                if len(left) == 0 or len(right) == 0:
                    continue  # Skip if split creates empty regions

                sq_error = self.criterion(left[:, -1]) + self.criterion(right[:, -1])

                if sq_error < best_sq_error:
                    best_sq_error = sq_error
                    best_feature_index = feature_index
                    best_tau = tau
                    regions = (left, right)

        # If no split was found that improves the criterion (best_sq_error remains inf)
        # or if the best split found is not better than the current node's error (no gain)
        if best_feature_index is None or best_sq_error >= current_node_sq_error:
            return None, None, None, current_node_sq_error  # Indicate no valid split found

        return regions, best_feature_index, best_tau, current_node_sq_error

    def predict_sample(self, sample, node):
        """
        Recursively predicts the target value for a single sample.

        Parameters
        ----------
        sample : ndarray
            A single data point (features only).
        node : TreeNode
            The current node in the decision tree.

        Returns
        -------
        float
            The predicted target value for the sample.
        """
        if node.is_leaf:
            # For a leaf node, the prediction is the mean of the target values in that node's data
            return np.mean(node.data[:, -1])

        # Traverse the tree based on the feature and threshold
        if sample[node.feature_idx] <= node.feature_val: # Changed to <= as per split_region
            if node.left is None: # Handle cases where a branch might be None (e.g., due to max_depth or min_samples_leaf)
                return np.mean(node.data[:, -1]) # Return current node's mean if child is None
            return self.predict_sample(sample, node.left)
        else:
            if node.right is None: # Handle cases where a branch might be None
                return np.mean(node.data[:, -1])
            return self.predict_sample(sample, node.right)

    def print_tree(self):
        """Prints the structure of the trained decision tree."""
        self._print_tree(self.tree)

    def _print_tree(self, node, depth=0, prefix=""):
        """Helper function for recursively printing the tree."""
        if node is None:
            return
        indent = "  " * depth
        if node.is_leaf:
            print(f"{indent}{prefix}Leaf: Predict {np.mean(node.data[:, -1]):.2f}, Samples: {len(node.data)}, SSE = {node.sq_error:.2f}")
        else:
            print(f"{indent}{prefix}Node: Feature {node.feature_idx}, Threshold {node.feature_val:.2f}, SSE = {node.sq_error:.2f}")
            self._print_tree(node.left, depth + 1, prefix="L--> ")
            self._print_tree(node.right, depth + 1, prefix="R--> ")

Buit a function that train a random forest and predicts the electric loads of a new point given a trained random forest

In [4]:
#your code goes here
class RandomForest:
    """
    Custom Random Forest Regressor implementation.
    """
    def __init__(self, num_features=None, min_samples_leaf=1, max_depth=None, num_estimators=10, random_state=None):
        """
        Initializes the RandomForest Regressor.

        Parameters
        ----------
        num_features : int or float or str, default None
            The number of features to consider when looking for the best split in each tree.
            Passed to the underlying DecisionTreeRegressor.
        min_samples_leaf : int, default 1
            The minimum number of samples required to be at a leaf node for each tree.
        max_depth : int, default None
            The maximum depth of each decision tree in the forest.
        num_estimators : int, default 10
            The number of decision trees (estimators) in the forest.
        random_state : int, default None
            Controls the random seed for reproducibility of the overall Random Forest.
            Used for bootstrapping and for passing unique seeds to individual trees.
        """
        self.num_features = num_features
        self.min_samples_leaf = min_samples_leaf
        self.max_depth = max_depth
        self.num_estimators = num_estimators
        self.random_state = random_state
        self.estimators = []  # List to store trained DecisionTree models

    def fit(self, X_train, y_train):
        """
        Trains the Random Forest model.

        Parameters
        ----------
        X_train : ndarray
            Training features.
        y_train : ndarray
            Training target values.
        """
        self.estimators = []  # Clear any existing estimators
        rng = np.random.default_rng(self.random_state)  # Random number generator for reproducibility

        # Generate all bags of indices upfront
        all_bag_indices = bootstrap(X_train, num_bags=self.num_estimators, random_seed=self.random_state)

        for i in range(self.num_estimators):
            # Get the data for the current bag
            bag_indices = all_bag_indices[i]
            bag_X = X_train[bag_indices]
            bag_y = y_train[bag_indices]

            # Build and train a custom DecisionTree for the current bag
            # Pass a unique random state to each tree for its internal feature selection
            tree_random_state = rng.integers(0, 1000000) # Use a large range for seeds
            rf_estimator = DecisionTree(
                max_depth=self.max_depth,
                min_samples_leaf=self.min_samples_leaf,
                num_features=self.num_features,
                random_state=tree_random_state
            )
            rf_estimator.fit(bag_X, bag_y)
            self.estimators.append(rf_estimator)

    def predict(self, X_test):
        """
        Predicts target values for new data using the trained Random Forest.

        Parameters
        ----------
        X_test : ndarray
            Features of the new data points.

        Returns
        -------
        ndarray
            Predicted target values.
        """
        if not self.estimators:
            raise RuntimeError("Random Forest not fitted. Call fit() before predict().")

        predictions = []
        for estimator in self.estimators:
            # Get predictions from each individual tree
            pred = estimator.predict(X_test)
            predictions.append(pred)
        # Aggregate predictions using the mean
        return aggregate_regression(predictions)

In [5]:
def rmse(y_true, y_pred):
    """
    Calculates the Root Mean Squared Error (RMSE).

    Parameters
    ----------
    y_true : ndarray
        Actual target values.
    y_pred : ndarray
        Predicted target values.

    Returns
    -------
    float
        The RMSE value.
    """
    # Ensure y_true and y_pred are numpy arrays for consistent operations
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    return np.sqrt(np.mean((y_true - y_pred) ** 2))

Builld functions to evaluate your predictions, use this functions to find optimal hyperparameters using also our validation set.

In [6]:
#(you are training two models, so you need to different sets of hyperparameters for each)
#your code goes here
param_grid = {
    'num_estimators': [10, 20, 50],
    'max_depth': [5, 10, 15, None],
    'min_samples_leaf': [1, 5, 10],
    'num_features': [0.5, 0.7, 1.0]  # Fraction of features to consider
}

best_rmse_y1 = float('inf')
best_params_y1 = {}

# Iterate through all combinations of hyperparameters
for n_est in param_grid['num_estimators']:
    for m_depth in param_grid['max_depth']:
        for min_leaf in param_grid['min_samples_leaf']:
            for n_feat in param_grid['num_features']:
                print(f"Testing Y1: n_est={n_est}, max_depth={m_depth}, min_leaf={min_leaf}, n_feat={n_feat}")
                rf_model = RandomForest(
                    num_estimators=n_est,
                    max_depth=m_depth,
                    min_samples_leaf=min_leaf,
                    num_features=n_feat,
                    random_state=42 # Consistent random state for the whole RF
                )
                rf_model.fit(X_train_full, y1_train_full)
                y1_pred_val = rf_model.predict(X_val)
                current_rmse_y1 = rmse(y1_val, y1_pred_val)

                if current_rmse_y1 < best_rmse_y1:
                    best_rmse_y1 = current_rmse_y1
                    best_params_y1 = {
                        'num_estimators': n_est,
                        'max_depth': m_depth,
                        'min_samples_leaf': min_leaf,
                        'num_features': n_feat
                    }
                print(f"  Validation RMSE Y1: {current_rmse_y1:.4f}")

print(f"\nOptimal Hyperparameters for Y1: {best_params_y1}")
print(f"Best Validation RMSE for Y1: {best_rmse_y1:.4f}")


print("\n--- Hyperparameter Tuning for Y2 (Cooling Load) ---")
best_rmse_y2 = float('inf')
best_params_y2 = {}

# Iterate through all combinations of hyperparameters for Y2
for n_est in param_grid['num_estimators']:
    for m_depth in param_grid['max_depth']:
        for min_leaf in param_grid['min_samples_leaf']:
            for n_feat in param_grid['num_features']:
                print(f"Testing Y2: n_est={n_est}, max_depth={m_depth}, min_leaf={min_leaf}, n_feat={n_feat}")
                rf_model = RandomForest(
                    num_estimators=n_est,
                    max_depth=m_depth,
                    min_samples_leaf=min_leaf,
                    num_features=n_feat,
                    random_state=42
                )
                rf_model.fit(X_train_full, y2_train_full)
                y2_pred_val = rf_model.predict(X_val)
                current_rmse_y2 = rmse(y2_val, y2_pred_val)

                if current_rmse_y2 < best_rmse_y2:
                    best_rmse_y2 = current_rmse_y2
                    best_params_y2 = {
                        'num_estimators': n_est,
                        'max_depth': m_depth,
                        'min_samples_leaf': min_leaf,
                        'num_features': n_feat
                    }
                print(f"  Validation RMSE Y2: {current_rmse_y2:.4f}")

print(f"\nOptimal Hyperparameters for Y2: {best_params_y2}")
print(f"Best Validation RMSE for Y2: {best_rmse_y2:.4f}")


Testing Y1: n_est=10, max_depth=5, min_leaf=1, n_feat=0.5
  Validation RMSE Y1: 1.4125
Testing Y1: n_est=10, max_depth=5, min_leaf=1, n_feat=0.7
  Validation RMSE Y1: 1.2922
Testing Y1: n_est=10, max_depth=5, min_leaf=1, n_feat=1.0
  Validation RMSE Y1: 1.0542
Testing Y1: n_est=10, max_depth=5, min_leaf=5, n_feat=0.5
  Validation RMSE Y1: 1.5310
Testing Y1: n_est=10, max_depth=5, min_leaf=5, n_feat=0.7
  Validation RMSE Y1: 1.4216
Testing Y1: n_est=10, max_depth=5, min_leaf=5, n_feat=1.0
  Validation RMSE Y1: 1.1948
Testing Y1: n_est=10, max_depth=5, min_leaf=10, n_feat=0.5
  Validation RMSE Y1: 1.9456
Testing Y1: n_est=10, max_depth=5, min_leaf=10, n_feat=0.7
  Validation RMSE Y1: 1.7748
Testing Y1: n_est=10, max_depth=5, min_leaf=10, n_feat=1.0
  Validation RMSE Y1: 1.6447
Testing Y1: n_est=10, max_depth=10, min_leaf=1, n_feat=0.5
  Validation RMSE Y1: 1.1828
Testing Y1: n_est=10, max_depth=10, min_leaf=1, n_feat=0.7
  Validation RMSE Y1: 1.0210
Testing Y1: n_est=10, max_depth=10, mi

Train a final random forest using your optimal hyperparameters and both the training and validation sets. Predict for the datapoints in the testing set and evaluate your final predictions.

In [7]:
#your code goes here
# Combine training and validation sets for final training
X_train_val = np.vstack((X_train_full, X_val))
y1_train_val = np.concatenate((y1_train_full, y1_val))
y2_train_val = np.concatenate((y2_train_full, y2_val))

# Train final Random Forest model for Y1 with optimal hyperparameters
print("\nTraining final model for Y1 (Heating Load)...")
final_rf_y1 = RandomForest(random_state=42, **best_params_y1)
final_rf_y1.fit(X_train_val, y1_train_val)

# Predict on the testing set for Y1
y1_pred_test = final_rf_y1.predict(X_test)
final_rmse_y1 = rmse(y1_test, y1_pred_test)
print(f"Final Test RMSE for Y1 (Heating Load): {final_rmse_y1:.4f}")

# Train final Random Forest model for Y2 with optimal hyperparameters
print("\nTraining final model for Y2 (Cooling Load)...")
final_rf_y2 = RandomForest(random_state=42, **best_params_y2)
final_rf_y2.fit(X_train_val, y2_train_val)

# Predict on the testing set for Y2
y2_pred_test = final_rf_y2.predict(X_test)
final_rmse_y2 = rmse(y2_test, y2_pred_test)
print(f"Final Test RMSE for Y2 (Cooling Load): {final_rmse_y2:.4f}")


Training final model for Y1 (Heating Load)...
Final Test RMSE for Y1 (Heating Load): 0.4483

Training final model for Y2 (Cooling Load)...
Final Test RMSE for Y2 (Cooling Load): 1.9465
