# Assignment 3
## Group name: ID2214-5
### Project members: 
[Francesco Luce, luce@kth.se]

[Leandro Duarte, leandrod@kth.se]

[Stefano Bosoppi, bosoppi@kth.se]

### Declaration:
By submitting this assignment, it is hereby declared that all group members listed above have contributed to the solution, either with code that appear in the final solution below, or with code that has been evaluated and compared to the final solution, but for some reason has been excluded. It is also declared that all project members fully understand all parts of the final solution and can explain it upon request.

It is furthermore declared that the code below is a contribution by the project members only, and specifically that no part of the solution has been copied from any other source (except for lecture slides at the course ID2214/FID3214), no part of the solution has been provided by someone not listed as a project member above, and no part of the solution has been generated by a system.

It is furthermore declared that the submitted assignment will not be shared during the course, with any individual other than the group members listed above and teachers of the course ID2214/FID3214. In particular, the assignment will not be uploaded to any public repository. The submitted assignment can be shared after the course only if written consent has been provided by the course responsible of ID2214/FID3214.

It is furthermore declared that it has been understood that no other library/package than the Python 3 standard library, NumPy, pandas, time and sklearn.tree, may be used in the solution for this assignment.

### Instructions
All parts of the assignment starting with number 1 below are mandatory. Satisfactory solutions
will give 1 point (in total). If they in addition are good (all parts work more or less 
as they should), completed on time (submitted before the deadline in Canvas) and according
to the instructions, together with satisfactory solutions of all parts of the assignment starting 
with number 2 below, then the assignment will receive 2 points (in total).

Note that you do not have to develop the code directly within the notebook
but may instead copy the comments and test cases to a more convenient development environment
and when everything works as expected, you may paste your functions into this
notebook, do a final testing (all cells should succeed) and submit the whole notebook 
(a single file) in Canvas (do not forget to fill in your group number and names above).

## Load NumPy, pandas, time and DecisionTreeClassifier from sklearn.tree

In [26]:
import numpy as np
import pandas as pd
import time
import sklearn
from numpy.matlib import zeros
from sklearn.tree import DecisionTreeClassifier

In [27]:
from platform import python_version

print(f"Python version: {python_version()}")
print(f"NumPy version: {np.__version__}")
print(f"Pandas version: {pd.__version__}")
print(f"sklearn version: {sklearn.__version__}")

Python version: 3.11.9
NumPy version: 2.1.2
Pandas version: 2.2.3
sklearn version: 1.5.2


## Reused functions from Assignment 1

In [28]:
# Copy and paste functions from Assignment 1 here that you need for this assignment
def create_column_filter(df: pd.DataFrame) -> tuple[pd.DataFrame, list[str]]:
    """
    Creates a filtered dataframe by removing columns with only missing values or one unique value.
    Keeps CLASS and ID columns.
    """
    # Copy df
    df_copy = df.copy()

    # filter with CLASS and ID if they exist
    column_filter = [col for col in df.columns if col in ['CLASS', 'ID']]

    # Check each column
    for col in df.columns:
        if col not in ['CLASS', 'ID']:
            unique_vals = df[col].dropna().unique()
            if len(unique_vals) > 1:  # Keep if more than 1 unique non-null value
                column_filter.append(col)

    return df_copy[column_filter], column_filter

def apply_column_filter(df: pd.DataFrame, column_filter: list[str]) -> pd.DataFrame:
    """
    Applies a column filter to keep only specified columns.
    """
    return df.copy()[column_filter]

def minmax_normalize(
        col: pd.Series, min_val: float = None, max_val: float = None
) -> tuple[pd.Series, tuple[float, float]]:
    """
    Returns MinMax-normalized `col` and the min and max values, respectively, used for normalization.
    If values for min and/or max are provided, they are used, otherwise they are derived from `col`.
    """
    norm_col = col.copy()

    col_min = col.min() if min_val is None else min_val
    col_max = col.max() if max_val is None else max_val

    norm_col = (norm_col - col_min) / (col_max - col_min)

    return norm_col, (col_min, col_max)


def zscore_normalize(
        col: pd.Series, mean_val: float = None, std_val: float = None
) -> tuple[pd.Series, tuple[float, float]]:
    """
    Returns z-normalized `col` and the mean and standard deviation values, respectively, used for normalization.
    If values for mean and/or standard deviation are provided, they are used, otherwise they are dervied from `col`.
    """
    norm_col = col.copy()

    col_mean = col.mean() if mean_val is None else mean_val
    col_std = col.std() if std_val is None else std_val

    norm_col = (norm_col - col_mean) / col_std

    return norm_col, (col_mean, col_std)


def get_normalizer(normalizationtype: str):
    """
    Returns the normalizer function corresponding to the provided type.
    Accepted types are "minmax" and "zscore".
    """
    match normalizationtype:
        case "minmax":
            return minmax_normalize
        case "zscore":
            return zscore_normalize
        case _:
            raise Exception(f'Normalization type "{normalizationtype}" not supported.')


def create_normalization(
        df: pd.DataFrame, normalizationtype: str = "minmax"
) -> tuple[pd.DataFrame, dict[str, tuple[str, float, float]]]:
    """
    Normalizes `df`'s columns (excluding "CLASS" and "ID") with the normalization type provided.
    Returns the normalized dataframe and a dictionary associating each column with the normalization type and the parameters used by the corresponding normalizer.
    """
    new_df = df.copy()
    normalization = {}

    normalizer = get_normalizer(normalizationtype)

    columns = set(new_df.columns).difference({"CLASS", "ID"})
    for col in columns:
        norm_col, params = normalizer(new_df[col])
        new_df[col] = norm_col

        normalization[col] = tuple([normalizationtype] + [val for val in params])

    return new_df, normalization


def apply_normalization(
        df: pd.DataFrame, normalization: dict[str, tuple[str, float, float]]
) -> pd.DataFrame:
    """
    Normalizes `df`'s column (excluding "CLASS" and "ID") using the normalization type and parameters specified in `normalization`.
    """
    new_df = df.copy()
    columns = set(new_df.columns).difference({"CLASS", "ID"})

    for col in columns:
        col_dets = normalization[col]

        normalizer = get_normalizer(col_dets[0])

        norm_col, _ = normalizer(new_df[col], *col_dets[1:])

        new_df[col] = norm_col

    return new_df

def create_imputation(df: pd.DataFrame) -> tuple[pd.DataFrame, dict]:
    """
    Create imputation values and apply them to missing values in dataframe.
    """

    df_copy = df.copy()
    imputation = {}

    for col in df.columns:
        if col in ["CLASS", "ID"]:
            continue

        # Handle numeric columns
        if pd.api.types.is_numeric_dtype(df[col]):
            fill_value = df[col].mean()
            if pd.isna(fill_value):  # All values missing
                fill_value = 0

        # Handle categorical/object columns
        elif df[col].dtype == 'category':
            fill_value = (
                df[col].mode().iloc[0]
                if not df[col].mode().empty
                else df[col].cat.categories[0]
            )
        else:  # object type
            fill_value = df[col].mode().iloc[0] if not df[col].mode().empty else ""

        df_copy[col] = df_copy[col].fillna(fill_value)
        imputation[col] = fill_value

    return df_copy, imputation


def apply_imputation(df: pd.DataFrame, imputation: dict) -> pd.DataFrame:
    """
    Apply existing imputation values to missing values in dataframe.
    """
    df_copy = df.copy()

    for col, value in imputation.items():
        if col in df_copy.columns:
            df_copy[col] = df_copy[col].fillna(value)

    return df_copy

def create_one_hot(df: pd.DataFrame) -> tuple[pd.DataFrame, dict]:
    """
    Create one-hot encoding for categorical features.
    """

    df_copy = df.copy()
    one_hot = {}

    for col in df.columns:
        if col in ["CLASS", "ID"]:
            continue

        # Only process object or category columns
        if df[col].dtype not in ["object", "category"]:
            continue

        # Get unique categories
        categories = df[col].unique()
        one_hot[col] = categories

        # Create one-hot encoded columns
        for category in categories:
            new_col_name = f"{col}_{category}"
            df_copy[new_col_name] = (df[col] == category).astype(float)

        # Drop original column
        df_copy.drop(columns=[col], inplace=True)

    return df_copy, one_hot


def apply_one_hot(df: pd.DataFrame, one_hot: dict) -> pd.DataFrame:
    """
    Apply one-hot encoding using existing categories.
    """

    df_copy = df.copy()

    for col, categories in one_hot.items():
        if col not in df_copy.columns:
            continue

        # Create one-hot encoded columns
        for category in categories:
            new_col_name = f"{col}_{category}"
            df_copy[new_col_name] = (df[col] == category).astype(float)

        # Drop original column
        df_copy.drop(columns=[col], inplace=True)

    return df_copy

def create_bins(
        df: pd.DataFrame, nobins: int = 10, bintype: str = "equal-width"
) -> tuple[pd.DataFrame, dict]:
    """
    Create bins for numeric features and apply discretization.
    """

    df_copy = df.copy()
    binning = {}

    for col in df.columns:
        if col in ["CLASS", "ID"]:
            continue

        # Only process numeric columns
        if not np.issubdtype(df[col].dtype, np.number):
            continue

        # Create bins based on bintype
        if bintype == "equal-width":
            discretized, bins = pd.cut(df[col], bins=nobins, labels=False, retbins=True)
        else:  # equal-size
            discretized, bins = pd.qcut(
                df[col], q=nobins, labels=False, retbins=True, duplicates="drop"
            )

        # Adjust bin edges
        bins[0] = -np.inf
        bins[-1] = np.inf

        # Store bins and update column
        binning[col] = bins
        df_copy[col] = pd.Categorical(discretized, categories=range(nobins))

    return df_copy, binning


def apply_bins(df: pd.DataFrame, binning: dict) -> pd.DataFrame:
    """
    Apply existing bins to numeric features.
    """

    df_copy = df.copy()

    for col, bins in binning.items():
        if col not in df_copy.columns:
            continue

        nobins = len(bins) - 1  # number of bins is one less than number of thresholds
        discretized = pd.cut(df_copy[col], bins=bins, labels=False)
        df_copy[col] = pd.Categorical(discretized, categories=range(nobins))

    return df_copy

def brier_score(df:pd.DataFrame, correctlabels:list) -> float:
    squared_errors = []

    for i, label in enumerate(correctlabels):
        # Create the true vector (ideal prediction)
        true_vector = np.zeros(len(df.columns))
        true_vector[np.where(df.columns == label)[0][0]] = 1

        # Calculate the squared error for the current prediction
        prediction = df.iloc[i].values
        squared_error = np.sum((prediction - true_vector) ** 2)
        squared_errors.append(squared_error)

    brier_score = np.mean(squared_errors)
    return brier_score

def feature_auc(
    scores_performance: list[tuple[float, int, int]], tot_tp: int, tot_fp: int
) -> float:
    """
    Returns the area under the ROC curve for a specific feature.
    
    Args:
        scores_performance: List of (score, true_positive, false_positive) tuples
        tot_tp: Total number of positive cases
        tot_fp: Total number of negative cases
    """
    auc_c = 0
    cov_tp = 0

    for s, tp_s, fp_s in scores_performance:

        if fp_s == 0:
            cov_tp += tp_s
        elif tp_s == 0:
            auc_c += (cov_tp / tot_tp) * (fp_s / tot_fp)
        else:
            auc_c += (cov_tp / tot_tp) * (fp_s / tot_fp) + (tp_s / tot_tp) * (
                fp_s / tot_fp
            ) * 0.5
            cov_tp += tp_s

    return auc_c


def auc(df: pd.DataFrame, correctlabels: list[int]) -> float:
    """
    Returns the weighted area under the ROC curve of a dataframe of scores, given the correct labels.
    """
    assert len(df) == len(
        correctlabels
    ), "Number of correct labels must match number of rows in DataFrame"

    correctlabels: np.ndarray = np.asarray(correctlabels)

    classes, counts = np.unique(correctlabels, return_counts=True)
    counts = counts / len(correctlabels)
    class_freqs = {cls: cnt for cls, cnt in zip(classes, counts)}

    auc = 0

    for cls in df.columns:
        tps = (correctlabels == cls).astype(int)
        fps = (correctlabels != cls).astype(int)

        scores_performance = [(s, tp, fp) for s, tp, fp in zip(df[cls], tps, fps)]
        scores_performance.sort(key=lambda x: x[0], reverse=True)

        auc += class_freqs.get(cls, 0) * feature_auc(scores_performance, sum(tps), sum(fps))

    return auc

def accuracy(df: pd.DataFrame, correctlabels:list) -> float:
    # Retrieving column names for max values, ties are resolved with first value
    predicted_labels = df.idxmax(axis=1)
    n_correct = sum(pred == correct for pred,correct in zip(predicted_labels, correctlabels))
    return n_correct / df.shape[0]



## 1. Define the class RandomForest

In [29]:
# Define the class RandomForest with three functions __init__, fit and predict (after the comments):
#
# Input to __init__: 
# self - the object itself
#
# Output from __init__:
# <nothing>
# 
# This function does not return anything but just initializes the following attributes of the object (self) to None:
# column_filter, imputation, one_hot, labels, model
#
# Input to fit:
# self      - the object itself
# df        - a dataframe (where the column names "CLASS" and "ID" have special meaning)
# no_trees  - no. of trees in the random forest (default = 100)
#
# Output from fit:
# <nothing>
#
# The result of applying this function should be:
#
# self.column_filter - a column filter (see Assignment 1) from df
# self.imputation    - an imputation mapping (see Assignment 1) from df
# self.one_hot       - a one-hot mapping (see Assignment 1) from df
# self.labels        - a (sorted) list of the categories of the "CLASS" column of df
# self.model         - a random forest, consisting of no_trees trees, where each tree is generated from a bootstrap sample
#                      and the number of evaluated features is log2|F| where |F| is the total number of features
#                      (for details, see lecture slides)
#
# Note that the function does not return anything but just assigns values to the attributes of the object.
#
# Hint 1: First create the column filter, imputation and one-hot mappings
#
# Hint 2: Then get the class labels and the numerical values (as an ndarray) from the dataframe after dropping the class labels 
#
# Hint 3: Generate no_trees classification trees, where each tree is generated using DecisionTreeClassifier 
#         from a bootstrap sample (see lecture slides), e.g., generated by np.random.choice (with replacement) 
#         from the row numbers of the ndarray, and where a random sample of the features are evaluated in
#         each node of each tree, of size log2(|F|), where |F| is the total number of features;
#         see the parameter max_features of DecisionTreeClassifier
#
# Input to predict:
# self - the object itself
# df   - a dataframe
# 
# Output from predict:
# predictions - a dataframe with class labels as column names and the rows corresponding to
#               predictions with estimated class probabilities for each row in df, where the class probabilities
#               are the averaged probabilities output by each decision tree in the forest
#
# Hint 1: Drop any "CLASS" and "ID" columns of the dataframe first and then apply column filter, imputation and one_hot
#
# Hint 2: Iterate over the trees in the forest to get the prediction of each tree by the method predict_proba(X) where 
#         X are the (numerical) values of the transformed dataframe; you may get the average predictions of all trees,
#         by first creating a zero-matrix with one row for each test instance and one column for each class label, 
#         to which you add the prediction of each tree on each iteration, and then finally divide the prediction matrix
#         by the number of trees.
#
# Hint 3: You may assume that each bootstrap sample that was used to generate each tree has included all possible
#         class labels and hence the prediction of each tree will contain probabilities for all class labels
#         (in the same order). Note that this assumption may be violated, and this limitation will be addressed 
#         in the next part of the assignment. 

class RandomForest:
    def __init__(self):
        self.column_filter: list[str] = None
        self.imputation: dict = None
        self.one_hot: dict = None
        self.labels: np.ndarray = None
        self.model: list[DecisionTreeClassifier] = None
        
    def fit(self, df: pd.DataFrame, no_trees: int = 100) -> None:
        # Preprocessing
        new_df, self.column_filter = create_column_filter(df)
        new_df, self.imputation = create_imputation(new_df)
        new_df, self.one_hot = create_one_hot(new_df)
        self.labels = new_df["CLASS"].unique()
        
        # Split labels from training data
        X = new_df.drop(columns=["CLASS"])
        y = new_df["CLASS"]
        
        self.model = []
        for no_tree in range(no_trees):
            # Sample with replacement
            sampled_index = np.random.choice(X.index, size=len(X.index),replace=True)
            X_bootstrap = X.iloc[sampled_index].values
            y_bootstrap = y.iloc[sampled_index].values
            
            # Fit and store tree
            tree = DecisionTreeClassifier(max_features="log2")
            tree.fit(X_bootstrap, y_bootstrap)
            self.model.append(tree)
            
    def predict(self, df: pd.DataFrame) -> pd.DataFrame:
        # Preprocessing
        new_df = apply_column_filter(df, self.column_filter)
        new_df = apply_imputation(new_df, self.imputation)
        new_df = apply_one_hot(new_df, self.one_hot)
        
        # Ignore ID and Class columns
        X = new_df.drop(columns=["ID", "CLASS"], errors="ignore").values
        probability_sum = np.zeros((X.shape[0], len(self.labels)))
        for tree in self.model:
            # Estimate class probabilities
            prediction = tree.predict_proba(X)
            probability_sum += prediction
        # Average probabilities
        predictions = pd.DataFrame(data=probability_sum / len(self.model), columns=self.labels)
        
        return predictions

In [30]:
# Test your code (leave this part unchanged, except for if auc is undefined)

train_df = pd.read_csv("tic-tac-toe_train.csv")

test_df = pd.read_csv("tic-tac-toe_test.csv")

rf = RandomForest()

t0 = time.perf_counter()
rf.fit(train_df)
print("Training time: {:.2f} s.".format(time.perf_counter()-t0))

test_labels = test_df["CLASS"]

t0 = time.perf_counter()
predictions = rf.predict(test_df)

print("Testing time: {:.2f} s.".format(time.perf_counter()-t0))

print("Accuracy: {:.4f}".format(accuracy(predictions,test_labels)))
print("AUC: {:.4f}".format(auc(predictions,test_labels))) # Comment this out if not implemented in assignment 1
print("Brier score: {:.4f}".format(brier_score(predictions,test_labels))) # Comment this out if not implemented in assignment 1

Training time: 0.12 s.
Testing time: 0.01 s.
Accuracy: 0.9186
AUC: 0.9899
Brier score: 0.1797


In [31]:
train_labels = train_df["CLASS"]
predictions = rf.predict(train_df)
print("Accuracy on training set: {0:.4f}".format(accuracy(predictions,train_labels)))
print("AUC on training set: {0:.4f}".format(auc(predictions,train_labels))) # Comment this out if not implemented in assignment 1
print("Brier score on training set: {0:.4f}".format(brier_score(predictions,train_labels))) # Comment this out if not implemented in assignment 1

Accuracy on training set: 1.0000
AUC on training set: 1.0000
Brier score on training set: 0.0201


### Comment on assumptions, things that do not work properly, etc.


## 2a. Handling trees with non-aligned predictions

In [32]:
# Define a revised version of the class RandomForest with the same input and output as described in part 1 above,
# where the predict function is able to handle the case where the individual trees are trained on bootstrap samples
# that do not include all class labels in the original training set. This leads to that the class probabilities output
# by the individual trees in the forest do not refer to the same set of class labels.
#
# Hint 1: The categories obtained with <pandas series>.cat.categories are sorted in the same way as the class labels
#         of a DecisionTreeClassifier; the latter are obtained by <DecisionTreeClassifier>.classes_ 
#         The problem is that classes_ may not include all possible labels, and hence the individual predictions 
#         obtained by <DecisionTreeClassifier>.predict_proba may be of different length or even if they are of the same
#         length do not necessarily refer to the same class labels. You may assume that each class label that is not included
#         in a bootstrap sample should be assigned zero probability by the tree generated from the bootstrap sample. 
#
# Hint 2: Create a mapping from the complete (and sorted) set of class labels l0, ..., lk-1 to a set of indexes 0, ..., k-1,
#         where k is the number of classes
#
# Hint 3: For each tree t in the forest, create a (zero) matrix with one row per test instance and one column per class label,
#         to which one column is added at a time from the output of t.predict_proba 
#
# Hint 4: For each column output by t.predict_proba, its index i may be used to obtain its label by t.classes_[i];
#         you may then obtain the index of this label in the ordered list of all possible labels from the above mapping (hint 2); 
#         this index points to which column in the prediction matrix the output column should be added to 

class RandomForest:
    """
    A Random Forest classifier that handles bootstrap samples missing class labels.
    Implements preprocessing, training with bootstrap samples, and probability-based predictions
    while properly handling cases where trees may not have seen all possible classes.
    """
    def __init__(self):
        # Storage for preprocessing transformations and model
        self.column_filter = None  # Columns to keep after filtering
        self.imputation = None     # Values for filling missing data
        self.one_hot = None        # Mapping for categorical variables
        self.labels = None         # All possible class labels (sorted)
        self.model = []            # Collection of decision trees

    def fit(self, df: pd.DataFrame, no_trees: int = 100):
        """
        Trains the random forest using bootstrap samples, handling class imbalance
        and potential missing classes in individual trees.
        
        Key steps:
        1. Preprocess data (filtering, imputation, one-hot encoding)
        2. Create bootstrap samples
        3. Train trees with random feature subsets
        """
        # Store complete set of possible classes for later prediction mapping
        self.labels = sorted(df["CLASS"].unique())
        
        # Apply preprocessing pipeline while preserving CLASS column for training
        filtered_df, self.column_filter = create_column_filter(df)
        imputed_df, self.imputation = create_imputation(filtered_df)
        one_hot_df, self.one_hot = create_one_hot(imputed_df)
        
        # Ensure consistent class label ordering using categorical type
        y = pd.Series(df["CLASS"]).astype(pd.CategoricalDtype(categories=self.labels))
        
        # Extract features, explicitly removing non-feature columns
        X = one_hot_df.drop(columns=["CLASS", "ID"], errors='ignore').values
        
        # Calculate feature subset size for random feature selection (log2|F|)
        n_features = X.shape[1]
        max_features = max(1, int(np.log2(n_features)))
        
        # Train individual trees on bootstrap samples
        for _ in range(no_trees):
            # Create bootstrap sample with replacement
            bootstrap_idx = np.random.choice(len(X), size=len(X), replace=True)
            X_bootstrap = X[bootstrap_idx]
            y_bootstrap = y.iloc[bootstrap_idx]
            
            # Train tree with random feature subset
            tree = DecisionTreeClassifier(max_features=max_features)
            tree.fit(X_bootstrap, y_bootstrap)
            self.model.append(tree)

    def predict(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Generates probability predictions handling trees that may have missing classes.
        Uses a counting mechanism to properly average probabilities only for classes
        that were present in each tree's training data.
        """
        # Apply same preprocessing as during training
        df_processed = apply_column_filter(df, self.column_filter)
        df_processed = apply_imputation(df_processed, self.imputation)
        df_processed = apply_one_hot(df_processed, self.one_hot)
        
        # Extract features for prediction
        X = df_processed.drop(columns=["CLASS", "ID"], errors='ignore').values
        n_samples = len(X)
        n_classes = len(self.labels)
        
        # Map class labels to matrix column indices
        label_to_idx = {label: idx for idx, label in enumerate(self.labels)}
        
        # Initialize matrices for probability accumulation
        predictions_sum = np.zeros((n_samples, n_classes))     # Sum of probabilities
        predictions_count = np.zeros((n_samples, n_classes))   # Count of predictions per class
        
        # Aggregate predictions from all trees
        for tree in self.model:
            tree_proba = tree.predict_proba(X)
            
            # Map each tree's predictions to the correct class columns
            for i, class_label in enumerate(tree.classes_):
                idx = label_to_idx[class_label]
                predictions_sum[:, idx] += tree_proba[:, i]    # Add probabilities
                predictions_count[:, idx] += 1                 # Track number of predictions
        
        # Calculate average probabilities, handling missing classes
        with np.errstate(divide='ignore', invalid='ignore'):
            final_predictions = np.divide(predictions_sum, predictions_count)
            final_predictions = np.nan_to_num(final_predictions, 0)  # Set 0 prob for missing classes
        
        return pd.DataFrame(final_predictions, columns=self.labels)

In [37]:
# Test your code (leave this part unchanged, except for if auc is undefined)

train_df = pd.read_csv("anneal_train.csv")

test_df = pd.read_csv("anneal_test.csv")

rf = RandomForest()

t0 = time.perf_counter()
rf.fit(train_df)
print("Training time: {:.2f} s.".format(time.perf_counter()-t0))

test_labels = test_df["CLASS"]

t0 = time.perf_counter()
predictions = rf.predict(test_df)
print("Testing time: {:.2f} s.".format(time.perf_counter()-t0))

print("Accuracy: {:.4f}".format(accuracy(predictions,test_labels)))
print("AUC: {:.4f}".format(auc(predictions,test_labels))) # Comment this out if not implemented in assignment 1
print("Brier score: {:.4f}".format(brier_score(predictions,test_labels))) # Comment this out if not implemented in assignment 1

Training time: 0.09 s.
Testing time: 0.01 s.
Accuracy: 0.9532
AUC: 0.9690
Brier score: 0.0999


## 2b. Estimate predictive performance using out-of-bag predictions

In [34]:
# Define an extended version of the class RandomForest with the same input and output as described in part 2a above,
# where the results of the fit function also should include:
# self.oob_acc - the accuracy estimated on the out-of-bag predictions, i.e., the fraction of training instances for
#                which the given (correct) label is the same as the predicted label when using only trees for which
#                the instance is out-of-bag
#
# Hint 1: You may first create a zero matrix with one row for each training instance and one column for each class label
#         and one zero vector to allow for storing aggregated out-of-bag predictions and the number of out-of-bag predictions
#         for each training instance, respectively. By "aggregated out-of-bag predictions" is here meant the sum of all
#         predicted probabilities (one sum per class and instance). These sums should be divided by the number of predictions
#         (stored in the vector) in order to obtain a single class probability distribution per training instance.
#         This distribution is considered to be the out-of-bag prediction for each instance, and e.g., the class that
#         receives the highest probability for each instance can be compared to the correct label of the instance,
#         when calculating the accuracy using the out-of-bag predictions.
#
# Hint 2: After generating a tree in the forest, iterate over the indexes that were not included in the bootstrap sample
#         and add a prediction of the tree to the out-of-bag prediction matrix and update the count vector
#
# Hint 3: Note that the input to predict_proba has to be a matrix; from a single vector (row) x, a matrix with one row
#         can be obtained by x[None,:]
#
# Hint 4: Finally, divide each row in the out-of-bag prediction matrix with the corresponding element of the count vector
#
#         For example, assuming that we have two class labels, then we may end up with the following matrix:
#
#         2 4
#         4 4
#         5 0
#         ...
#
#         and the vector (no. of predictions) (6, 8, 5, ...)
#
#         The resulting class probability distributions are:
#
#         0.333... 0.666...
#         0.5 0.5
#         1.0 0
class RandomForest:
    def __init__(self) -> None:
        self.column_filter: list[str] = None
        self.imputation: dict = None
        self.one_hot: dict = None
        self.labels: np.ndarray = None
        self.model: list[DecisionTreeClassifier] = None

        # Support attributes
        self.label_to_index: dict[int, int] = None

    def fit(self, df: pd.DataFrame, no_trees: int = 100) -> None:
        new_df, self.column_filter = create_column_filter(df)
        new_df, self.imputation = create_imputation(new_df)
        new_df, self.one_hot = create_one_hot(new_df)
        new_df["CLASS"] = new_df["CLASS"].astype("category")

        self.labels = new_df["CLASS"].cat.categories
        self.label_to_index = {label: i for i, label in enumerate(self.labels)}

        X = new_df.drop(columns=["ID", "CLASS"], errors="ignore").values
        y = new_df["CLASS"].values

        oob_predictions = np.zeros((X.shape[0], len(self.labels)))
        oob_counts = np.zeros(X.shape[0])

        self.model = []
        bootstrap_indices = np.random.choice(
            X.shape[0], size=(no_trees, X.shape[0]), replace=True
        )

        for indices in bootstrap_indices:
            X_bootstrap, y_bootstrap = X[indices], y[indices]
            oob_indices = np.setdiff1d(np.arange(X.shape[0]), indices)

            model = DecisionTreeClassifier(max_features="log2")
            tree = model.fit(X_bootstrap, y_bootstrap)
            self.model.append(tree)

            x_oob = np.take(X, oob_indices, axis=0)
            tree_probs = tree.predict_proba(x_oob)
            label_indices = [self.label_to_index[label] for label in tree.classes_]

            # Update OOB prediction matrix and counts
            oob_predictions[oob_indices[:, None], label_indices] += tree_probs
            oob_counts[oob_indices] += [1] * len(oob_indices)

        # Avoid division by zero for instances with no OOB predictions
        valid_oob = oob_counts > 0
        oob_predictions[valid_oob] /= oob_counts[valid_oob, None]

        # Determine out-of-bag accuracy
        oob_predictions = pd.DataFrame(oob_predictions[valid_oob], columns=self.labels)
        self.oob_acc = accuracy(oob_predictions, y[valid_oob])

    def predict(self, df: pd.DataFrame) -> pd.DataFrame:
        new_df = apply_column_filter(df, self.column_filter)
        new_df = apply_imputation(new_df, self.imputation)
        new_df = apply_one_hot(new_df, self.one_hot)
        new_df = new_df.drop(columns=["ID", "CLASS"], errors="ignore")

        X = new_df.values
        y_pred = np.zeros((X.shape[0], len(self.labels)))

        for tree in self.model:
            pred = np.zeros((X.shape[0], len(self.labels)))
            indices = [self.label_to_index[label] for label in tree.classes_]

            pred[:, indices] = tree.predict_proba(X)

            y_pred += pred

        y_pred /= len(self.model)

        return pd.DataFrame(y_pred, columns=self.labels)

In [35]:
# Test your code (leave this part unchanged, except for if auc is undefined)

train_df = pd.read_csv("anneal_train.csv")

test_df = pd.read_csv("anneal_test.csv")

rf = RandomForest()

t0 = time.perf_counter()
rf.fit(train_df)
print("Training time: {:.2f} s.".format(time.perf_counter()-t0))

print("OOB accuracy: {:.4f}".format(rf.oob_acc))

test_labels = test_df["CLASS"]

t0 = time.perf_counter()
predictions = rf.predict(test_df)
print("Testing time: {:.2f} s.".format(time.perf_counter()-t0))

print("Accuracy: {:.4f}".format(accuracy(predictions,test_labels)))
print("AUC: {:.4f}".format(auc(predictions,test_labels))) # Comment this out if not implemented in assignment 1
print("Brier score: {:.4f}".format(brier_score(predictions,test_labels))) # Comment this out if not implemented in assignment 1

Training time: 0.08 s.
OOB accuracy: 0.9443
Testing time: 0.01 s.
Accuracy: 0.9510
AUC: 0.9700
Brier score: 0.1000


In [36]:
train_labels = train_df["CLASS"]
rf = RandomForest()
rf.fit(train_df)
predictions = rf.predict(train_df)
print("Accuracy on training set: {0:.2f}".format(accuracy(predictions,train_labels)))
print("AUC on training set: {0:.2f}".format(auc(predictions,train_labels)))
print("Brier score on training set: {0:.2f}".format(brier_score(predictions,train_labels)))

Accuracy on training set: 1.00
AUC on training set: 1.00
Brier score on training set: 0.01


### Comment on assumptions, things that do not work properly, etc.