In [1]:
import math
import os
import pickle
import torch
import torch.nn.functional as F


## Helper functions: CIFAR-10 loading with max pooling


In [2]:
def unpickle(file_path: str):
    with open(file_path, "rb") as fo:
        d = pickle.load(fo, encoding="bytes")
    return d

def load_cifar10_with_maxpool(base_dir: str, pool_times: int = 2):
    batch_files = [f for f in os.listdir(base_dir) if "data_batch" in f]
    batch_files = sorted(batch_files)
    train_batches = [unpickle(os.path.join(base_dir, f)) for f in batch_files]
    test_batch = unpickle(os.path.join(base_dir, "test_batch"))

    X_train_list = []
    y_train_list = []
    for batch in train_batches:
        X_batch = torch.tensor(batch[b"data"], dtype=torch.float32)
        y_batch = torch.tensor(batch[b"labels"], dtype=torch.long)
        X_train_list.append(X_batch)
        y_train_list.append(y_batch)
    X_train = torch.cat(X_train_list, dim=0)
    y_train = torch.cat(y_train_list, dim=0)

    X_test = torch.tensor(test_batch[b"data"], dtype=torch.float32)
    y_test = torch.tensor(test_batch[b"labels"], dtype=torch.long)

    X_train = X_train.view(-1, 3, 32, 32) / 255.0
    X_test = X_test.view(-1, 3, 32, 32) / 255.0

    for _ in range(pool_times):
        X_train = F.max_pool2d(X_train, kernel_size=2)
        X_test = F.max_pool2d(X_test, kernel_size=2)

    X_train_flat = X_train.view(X_train.size(0), -1)
    X_test_flat = X_test.view(X_test.size(0), -1)

    return X_train_flat, y_train, X_test_flat, y_test


## Decision Tree implementation (classification and regression)


In [3]:
class DecisionTree:
    def __init__(
        self,
        depth: int = 0,
        max_depth: int = None,
        classification: bool = True,
    ):
        self.depth = depth
        self.max_depth = max_depth
        self.feature_index = None
        self.threshold = None
        self.left = None
        self.right = None
        self.is_leaf = False
        self.predicted_value = None
        self.classification = classification

    def fit(self, X, y):
        self.feature_index = None
        self.threshold = None

        if y.unique().numel() == 1:
            self.is_leaf = True
            self.predicted_value = y[0].item()
            return

        if self.max_depth is not None and self.depth >= self.max_depth:
            self.is_leaf = True
            self.predicted_value = self._get_prediction_value(y)
            return

        if self.classification:
            self._fit_classification(X, y)
        else:
            self._fit_regression(X, y)

    def predict(self, X):
        dtype = torch.long if self.classification else torch.float32

        if self.is_leaf:
            return torch.full((X.shape[0],), self.predicted_value, dtype=dtype)

        left_mask = X[:, self.feature_index] <= self.threshold
        right_mask = X[:, self.feature_index] > self.threshold

        y_pred = torch.empty(X.shape[0], dtype=dtype)
        y_pred[left_mask] = self.left.predict(X[left_mask]).to(dtype)
        y_pred[right_mask] = self.right.predict(X[right_mask]).to(dtype)

        return y_pred

    def _fit_classification(self, X, y):
        (
            self.feature_index,
            self.threshold,
            best_left_mask,
            best_right_mask,
        ) = self._find_best_split(X, y, self._gini)

        if self.feature_index is None:
            self.is_leaf = True
            self.predicted_value = self._get_prediction_value(y)
            return

        self.left = DecisionTree(
            depth=self.depth + 1,
            max_depth=self.max_depth,
            classification=self.classification,
        )
        self.right = DecisionTree(
            depth=self.depth + 1,
            max_depth=self.max_depth,
            classification=self.classification,
        )

        self.left.fit(X[best_left_mask], y[best_left_mask])
        self.right.fit(X[best_right_mask], y[best_right_mask])

    def _fit_regression(self, X, y):
        (
            self.feature_index,
            self.threshold,
            best_left_mask,
            best_right_mask,
        ) = self._find_best_split(X, y, self._mse)

        if self.feature_index is None:
            self.is_leaf = True
            self.predicted_value = self._get_prediction_value(y)
            return

        self.left = DecisionTree(
            depth=self.depth + 1,
            max_depth=self.max_depth,
            classification=self.classification,
        )
        self.right = DecisionTree(
            depth=self.depth + 1,
            max_depth=self.max_depth,
            classification=self.classification,
        )

        self.left.fit(X[best_left_mask], y[best_left_mask])
        self.right.fit(X[best_right_mask], y[best_right_mask])

    def _find_best_split(self, X, y, criterion_fn):
        num_features = X.shape[1]
        best_score = float("inf")
        best_feature = None
        best_threshold = None
        best_left_mask = None
        best_right_mask = None

        for feature in range(num_features):
            thresholds = torch.unique(X[:, feature])

            for ts in thresholds:
                left_mask = X[:, feature] <= ts
                right_mask = X[:, feature] > ts

                if not left_mask.any() or not right_mask.any():
                    continue

                y_left = y[left_mask]
                y_right = y[right_mask]

                score = criterion_fn(y_left, y_right)

                if score < best_score:
                    best_score = score
                    best_feature = feature
                    best_threshold = ts.item()
                    best_left_mask = left_mask
                    best_right_mask = right_mask

        return best_feature, best_threshold, best_left_mask, best_right_mask

    def _get_prediction_value(self, y):
        if self.classification:
            return y.mode()[0].item()
        return y.float().mean().item()

    @staticmethod
    def _gini_impurity(group):
        if group.numel() == 0:
            return 0.0
        _, counts = torch.unique(group, return_counts=True)
        probs = counts.float() / counts.sum()
        return 1.0 - torch.sum(probs ** 2).item()

    def _gini(self, y_left, y_right):
        total_samples = y_left.numel() + y_right.numel()
        gini_left = self._gini_impurity(y_left)
        gini_right = self._gini_impurity(y_right)
        return (
            (y_left.numel() / total_samples) * gini_left
            + (y_right.numel() / total_samples) * gini_right
        )

    def _mse(self, y_left, y_right):
        total_samples = y_left.numel() + y_right.numel()
        if total_samples == 0:
            return 0.0

        y_left_f = y_left.float()
        y_right_f = y_right.float()

        mse_left = (y_left_f - y_left_f.mean()).pow(2).mean().item()
        mse_right = (y_right_f - y_right_f.mean()).pow(2).mean().item()

        return (
            (y_left.numel() / total_samples) * mse_left
            + (y_right.numel() / total_samples) * mse_right
        )


## Random Forest implementation


In [4]:
class RandomForest:
    def __init__(
        self,
        n_estimators: int = 100,
        max_depth: int = None,
        max_features = None,
        bootstrap: bool = True,
        classification: bool = True,
        random_state: int = None,
    ):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.max_features = max_features
        self.bootstrap = bootstrap
        self.classification = classification
        self.random_state = random_state
        self.trees = []
        self.features_per_tree = []

    def _get_n_sub_features(self, n_features: int) -> int:
        if self.max_features is None:
            if self.classification:
                k = int(math.sqrt(n_features))
            else:
                k = n_features
        elif isinstance(self.max_features, float):
            k = int(self.max_features * n_features)
        else:
            k = int(self.max_features)
        k = max(1, k)
        k = min(n_features, k)
        return k

    def fit(self, X: torch.Tensor, y: torch.Tensor):
        if self.random_state is not None:
            torch.manual_seed(self.random_state)

        n_samples, n_features = X.shape
        n_sub_features = self._get_n_sub_features(n_features)

        self.trees = []
        self.features_per_tree = []

        for _ in range(self.n_estimators):
            if self.bootstrap:
                indices = torch.randint(0, n_samples, (n_samples,))
            else:
                indices = torch.randperm(n_samples)

            X_sample = X[indices]
            y_sample = y[indices]

            feature_indices = torch.randperm(n_features)[:n_sub_features]
            X_sample_sub = X_sample[:, feature_indices]

            tree = DecisionTree(
                max_depth=self.max_depth,
                classification=self.classification,
            )
            tree.fit(X_sample_sub, y_sample)

            self.trees.append(tree)
            self.features_per_tree.append(feature_indices)

    def predict(self, X: torch.Tensor) -> torch.Tensor:
        if len(self.trees) == 0:
            raise RuntimeError("RandomForest has not been fitted yet.")

        all_preds = []
        for tree, feature_indices in zip(self.trees, self.features_per_tree):
            X_sub = X[:, feature_indices]
            preds = tree.predict(X_sub)
            all_preds.append(preds)

        preds_stack = torch.stack(all_preds, dim=0)

        if self.classification:
            values, _ = torch.mode(preds_stack, dim=0)
            return values

        return preds_stack.float().mean(dim=0)


## Gradient Boosting implementation


In [5]:
import torch

class GradientBoosting:
    def __init__(
        self,
        n_estimators: int = 100,
        learning_rate: float = 0.1,
        max_depth: int = 3,
        classification: bool = True,
        random_state: int | None = None,
    ):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.classification = classification
        self.random_state = random_state
        self.trees = None
        self.init_prediction = None
        self.num_classes = None

    def fit(self, X: torch.Tensor, y: torch.Tensor):
        if self.random_state is not None:
            torch.manual_seed(self.random_state)

        self.trees = None
        self.init_prediction = None
        self.num_classes = None

        if not self.classification:
            y_float = y.float()
            self.num_classes = 1
            self.init_prediction = y_float.mean().item()
            F_vals = torch.full_like(y_float, self.init_prediction, dtype=torch.float32)
            self.trees = []
            for _ in range(self.n_estimators):
                residuals = y_float - F_vals
                tree = DecisionTree(
                    max_depth=self.max_depth,
                    classification=False,
                )
                tree.fit(X, residuals)
                update = tree.predict(X).float()
                F_vals = F_vals + self.learning_rate * update
                self.trees.append(tree)
            return

        y_long = y.long()
        unique = torch.unique(y_long)
        self.num_classes = int(unique.numel())

        if self.num_classes == 1:
            self.init_prediction = unique[0].item()
            self.trees = []
            return

        if self.num_classes == 2:
            y_float = y_long.float()
            p = y_float.mean()
            eps = 1e-6
            p = torch.clamp(p, eps, 1 - eps)
            self.init_prediction = torch.log(p / (1 - p)).item()
            F_vals = torch.full_like(y_float, self.init_prediction, dtype=torch.float32)
            self.trees = []
            for _ in range(self.n_estimators):
                p_hat = torch.sigmoid(F_vals)
                residuals = y_float - p_hat
                tree = DecisionTree(
                    max_depth=self.max_depth,
                    classification=False,
                )
                tree.fit(X, residuals)
                update = tree.predict(X).float()
                F_vals = F_vals + self.learning_rate * update
                self.trees.append(tree)
            return

        num_samples = y_long.shape[0]
        num_classes = self.num_classes
        y_one_hot = torch.zeros(num_samples, num_classes, dtype=torch.float32)
        indices = torch.arange(num_samples)
        y_one_hot[indices, y_long] = 1.0
        scores = torch.zeros(num_samples, num_classes, dtype=torch.float32)
        self.init_prediction = 0.0
        self.trees = [[None for _ in range(self.n_estimators)] for _ in range(num_classes)]
        for m in range(self.n_estimators):
            probs = torch.softmax(scores, dim=1)
            residuals = y_one_hot - probs
            for k in range(num_classes):
                tree = DecisionTree(
                    max_depth=self.max_depth,
                    classification=False,
                )
                tree.fit(X, residuals[:, k])
                update = tree.predict(X).float()
                scores[:, k] = scores[:, k] + self.learning_rate * update
                self.trees[k][m] = tree

    def _raw_predict(self, X: torch.Tensor) -> torch.Tensor:
        if not self.classification:
            if self.trees is None or len(self.trees) == 0:
                raise RuntimeError("GradientBoosting has not been fitted yet.")
            num_samples = X.shape[0]
            F_vals = torch.full((num_samples,), self.init_prediction, dtype=torch.float32)
            for tree in self.trees:
                F_vals = F_vals + self.learning_rate * tree.predict(X).float()
            return F_vals

        if self.num_classes is None:
            raise RuntimeError("GradientBoosting has not been fitted yet.")

        if self.num_classes == 1:
            num_samples = X.shape[0]
            return torch.full((num_samples,), float(self.init_prediction), dtype=torch.float32)

        if self.num_classes == 2:
            if self.trees is None or len(self.trees) == 0:
                raise RuntimeError("GradientBoosting has not been fitted yet.")
            num_samples = X.shape[0]
            F_vals = torch.full((num_samples,), self.init_prediction, dtype=torch.float32)
            for tree in self.trees:
                F_vals = F_vals + self.learning_rate * tree.predict(X).float()
            return F_vals

        num_samples = X.shape[0]
        num_classes = self.num_classes
        scores = torch.zeros(num_samples, num_classes, dtype=torch.float32)
        for m in range(self.n_estimators):
            for k in range(num_classes):
                tree = self.trees[k][m]
                update = tree.predict(X).float()
                scores[:, k] = scores[:, k] + self.learning_rate * update
        return scores

    def predict(self, X: torch.Tensor) -> torch.Tensor:
        outputs = self._raw_predict(X)
        if not self.classification:
            return outputs
        if self.num_classes == 1:
            num_samples = outputs.shape[0]
            return torch.full((num_samples,), int(self.init_prediction), dtype=torch.long)
        if self.num_classes == 2:
            proba_pos = torch.sigmoid(outputs)
            return (proba_pos >= 0.5).long()
        probs = torch.softmax(outputs, dim=1)
        return torch.argmax(probs, dim=1)

    def predict_proba(self, X: torch.Tensor) -> torch.Tensor:
        if not self.classification:
            raise RuntimeError("predict_proba is only available for classification.")
        outputs = self._raw_predict(X)
        if self.num_classes == 1:
            num_samples = outputs.shape[0]
            return torch.ones(num_samples, 1, dtype=torch.float32)
        if self.num_classes == 2:
            proba_pos = torch.sigmoid(outputs)
            proba_neg = 1.0 - proba_pos
            return torch.stack([proba_neg, proba_pos], dim=1)
        probs = torch.softmax(outputs, dim=1)
        return probs


## Regression 


In [6]:
from sklearn.metrics import r2_score, mean_absolute_error, mean_absolute_percentage_error

def get_regression_metrics(y_pred, y_true) -> None:
    for m in [r2_score, mean_absolute_error, mean_absolute_percentage_error]:
        print(f'Metric {m.__name__}')
        print(f'Result {m(y_pred, y_true)}')
        print()

In [7]:
import pandas as pd
from sklearn.model_selection import train_test_split

data = pd.read_csv("london_merged.csv")
data.drop(columns=['timestamp'], inplace=True)

X, y = data.drop(columns=['cnt']).values, data.cnt.values

X_train_np_reg, X_test_np_reg, y_train_np_reg, y_test_np_reg = train_test_split(
    X, y, random_state=42, test_size=0.2
)

X_train_reg = torch.tensor(X_train_np_reg, dtype=torch.float32)
X_test_reg = torch.tensor(X_test_np_reg, dtype=torch.float32)
y_train_reg = torch.tensor(y_train_np_reg, dtype=torch.float32)
y_test_reg = torch.tensor(y_test_np_reg, dtype=torch.float32)\

print("Training DT")

dt_reg = DecisionTree(max_depth=4, classification=False)
dt_reg.fit(X_train_reg, y_train_reg)
y_pred_dt_reg = dt_reg.predict(X_test_reg)

get_regression_metrics(y_pred_dt_reg, y_test_np_reg)

print("Training RF")

rf_reg = RandomForest(
    n_estimators=20,
    max_depth=6,
    max_features=None,
    bootstrap=True,
    classification=False,
    random_state=42,
)
rf_reg.fit(X_train_reg, y_train_reg)
y_pred_rf_reg = rf_reg.predict(X_test_reg)

get_regression_metrics(y_pred_rf_reg, y_test_np_reg)

print("Training GB")

gb_reg = GradientBoosting(
    n_estimators=30,
    learning_rate=0.1,
    max_depth=3,
    classification=False,
    random_state=42,
)
gb_reg.fit(X_train_reg, y_train_reg)
y_pred_gb_reg = gb_reg.predict(X_test_reg)

get_regression_metrics(y_pred_gb_reg, y_test_np_reg)

Training DT
Metric r2_score
Result -1.7970409393310547

Metric mean_absolute_error
Result 686.2468872070312

Metric mean_absolute_percentage_error
Result 0.6829274892807007

Training RF
Metric r2_score
Result -1.3947644233703613

Metric mean_absolute_error
Result 658.37158203125

Metric mean_absolute_percentage_error
Result 0.6617386937141418

Training GB
Metric r2_score
Result -2.0063259601593018

Metric mean_absolute_error
Result 668.3765869140625

Metric mean_absolute_percentage_error
Result 0.65778648853302



## Binary classification task and model comparison (DT, RF, GB)

We restrict CIFAR-10 to classes 0 and 1 to use Gradient Boosting for binary classification and compare models.


In [18]:
from sklearn.metrics import f1_score, recall_score, precision_score

def get_classification_metrics(y_pred, y_true) -> None:
    for m in [f1_score, recall_score, precision_score]:
        print(f'Metric {m.__name__}')
        print(f'Result {m(y_pred, y_true, average='weighted')}')
        print()

In [None]:
base_dir = "cifar-10-batches-py"
X_train_flat, y_train, X_test_flat, y_test = load_cifar10_with_maxpool(base_dir, pool_times=2)

n_train = 10000
perm = torch.randperm(X_train_flat.size(0))[:n_train]
X_train_small = X_train_flat[perm]
y_train_small = y_train[perm]

dt_mc = DecisionTree(max_depth=4, classification=True)
dt_mc.fit(X_train_small, y_train_small)
y_pred_dt_mc = dt_mc.predict(X_test_flat)

get_classification_metrics(y_pred_dt_mc, y_test)

rf_mc = RandomForest(
    n_estimators=20,
    max_depth=6,
    max_features=None,
    bootstrap=True,
    classification=True,
    random_state=42,
)
rf_mc.fit(X_train_small, y_train_small)
y_pred_rf_mc = rf_mc.predict(X_test_flat)

get_classification_metrics(y_pred_rf_mc, y_test)

gb_mc = GradientBoosting(
    n_estimators=20,
    learning_rate=0.1,
    max_depth=3,
    classification=True,
    random_state=42,
)
gb_mc.fit(X_train_small, y_train_small)
y_pred_gb_mc = gb_mc.predict(X_test_flat)

get_classification_metrics(y_pred_gb_mc, y_test)


Metric f1_score
Result 0.25184079174073193

Metric recall_score
Result 0.2317

Metric precision_score
Result 0.3291374

