# Classification Comparison Project

## Imports

In [1]:

import time
import numpy as np
import os
import joblib
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import umap
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from sklearn.tree import DecisionTreeClassifier
from datasets import load_dataset
    

  from .autonotebook import tqdm as notebook_tqdm


## Dataset Utilities

In [2]:
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from datasets import load_dataset
import numpy as np


# `load_mnist(test_size=0.2, val_size=0.2, random_state=42) -->
#   - Loads the mnist dataset and returns it as a dictionary with keys 'train', 'val', and 'test' splits.
def load_mnist(test_size=0.2, val_size=0.2, random_state=42):
    mnist = fetch_openml("mnist_784", version=1)
    X = (
        mnist["data"].to_numpy()
        if hasattr(mnist["data"], "to_numpy")
        else mnist["data"]
    )
    y = (
        mnist["target"].astype(int).to_numpy()
        if hasattr(mnist["target"], "to_numpy")
        else mnist["target"].astype(int)
    )

    # Analyze dataset properties
    num_samples = X.shape[0]
    pixels = X.shape[1]
    side_length = int(np.sqrt(pixels))
    input_shape = (1, side_length, side_length)  # (channels, height, width)
    num_classes = len(np.unique(y))

    # Split into train+val and test
    X_temp, X_test, y_temp, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state
    )

    # Split train+val into train and val
    X_train, X_val, y_train, y_val = train_test_split(
        X_temp, y_temp, test_size=val_size, random_state=random_state
    )

    return {
        "train": (X_train, y_train),
        "val": (X_val, y_val),
        "test": (X_test, y_test),
        "metadata": {
            "input_shape": input_shape,
            "num_classes": num_classes,
            "num_samples": num_samples,
            "dataset_name": "MNIST",
        },
    }


# `load_tiny_imagenet(test_size=0.2, val_size=0.2, random_state=42) -->
#   - Loads the tiny-imagenet dataset and returns it as a dictionary with keys 'train', 'val', and 'test' splits.
def load_tiny_imagenet(test_size=0.2, val_size=0.2, random_state=42):
    dataset = load_dataset("zh-plus/tiny-imagenet")

    def process_image(img):
        if img.mode != "RGB":
            img = img.convert("RGB")
        return np.array(img)

    X_full = np.array([process_image(img["image"]) for img in dataset["train"]])
    y_full = np.array([img["label"] for img in dataset["train"]])

    # Analyze dataset properties
    num_samples = X_full.shape[0]
    input_shape = (3, X_full.shape[1], X_full.shape[2])  # (channels, height, width)
    num_classes = len(np.unique(y_full))

    # Reshape images to be flat
    X_full_flat = X_full.reshape(X_full.shape[0], -1)

    # Split into train+val and test
    X_temp, X_test, y_temp, y_test = train_test_split(
        X_full_flat, y_full, test_size=test_size, random_state=random_state
    )

    # Split train+val into train and val
    X_train, X_val, y_train, y_val = train_test_split(
        X_temp, y_temp, test_size=val_size, random_state=random_state
    )

    return {
        "train": (X_train, y_train),
        "val": (X_val, y_val),
        "test": (X_test, y_test),
        "metadata": {
            "input_shape": input_shape,
            "num_classes": num_classes,
            "num_samples": num_samples,
            "dataset_name": "tiny-imagenet",
        },
    }


## CNN Model

In [3]:
import time
import numpy as np
import os
import joblib
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import umap


class CNNModel(nn.Module):
    def __init__(self, in_channels=1, num_classes=10):
        super(CNNModel, self).__init__()

        # Layer 1: First Convolutional Layer
        # Output channels: 32
        # Kernel size: 3x3
        # Stride: 1
        # Padding: 1
        # Output size: (28-3+1)x(28-3+1) = 26x26
        self.conv1 = nn.Conv2d(in_channels, 32, 3, 1, padding=1)

        # Layer 2: Second Convolutional Layer
        # Input channels: 32 (from previous layer)
        # Output channels: 64
        # Kernel size: 3x3
        # Stride: 1
        # Padding: 1
        # Output size: (26-3+1)x(26-3+1) = 24x24
        self.conv2 = nn.Conv2d(32, 64, 3, 1, padding=1)

        # Dropout Layers
        # Layer 3: First Dropout Layer (25% dropout)
        self.dropout1 = nn.Dropout(0.25)
        # Layer 5: Second Dropout Layer (50% dropout)
        self.dropout2 = nn.Dropout(0.5)

        # Layer 4: First Fully Connected Layer
        # Size will be calculated in forward pass
        self.fc1 = None

        # Layer 6: Second Fully Connected Layer (Output Layer)
        # Input size: 128 neurons
        # Output size: Number of Classes
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        # First Convolutional Block
        x = self.conv1(x)  # Apply first conv layer
        x = F.relu(x)  # Apply ReLU activation

        # Second Convolutional Block
        x = self.conv2(x)  # Apply second conv layer
        x = F.relu(x)  # Apply ReLU activation
        x = F.max_pool2d(x, 2)  # Apply max pooling with 2x2 kernel

        # First Dropout
        x = self.dropout1(x)  # Apply 25% dropout

        # Flatten Layer
        x = torch.flatten(x, 1)  # Flatten all dimensions except batch

        # Initialize fc1 if not done yet (to handle different input sizes)
        if self.fc1 is None:
            self.fc1 = nn.Linear(x.shape[1], 128).to(x.device)

        # First Fully Connected Layer
        x = self.fc1(x)  # Apply first FC layer
        x = F.relu(x)  # Apply ReLU activation

        # Second Dropout
        x = self.dropout2(x)  # Apply 50% dropout

        # Output Layer
        x = self.fc2(x)  # Apply second FC layer
        output = F.log_softmax(x, dim=1)

        return output


class CNN:
    def __init__(self, epochs=10, batch_size=64):
        self.model = None
        self.epochs = epochs
        self.batch_size = batch_size
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.dataset_name = "Unknown"
        self.base_path = "../out"
        self.input_shape = None
        self.num_classes = None

    # `_generate_paths()` -->
    #   - Generate the paths for saving models and visualizations
    def _generate_paths(self):
        # Create the base directories if they don't exist
        model_dir = f"{self.base_path}/trained_models"
        viz_dir = f"{self.base_path}/visualizations"
        os.makedirs(model_dir, exist_ok=True)
        os.makedirs(viz_dir, exist_ok=True)

        # Generate the base name for files
        base_name = f"CNN_{self.epochs}epochs_{self.dataset_name}"

        # Generate complete paths
        model_path = f"{model_dir}/{base_name}.joblib"
        umap_path = f"{viz_dir}/{base_name}_umap.png"

        return model_path, umap_path

    # `_preprocess_data(X)` -->
    #   - Preprocess data for training
    def _preprocess_data(self, X):
        # If data is flat (like MNIST), reshape appropriately
        if len(X.shape) == 2:  # Flat data
            channels, height, width = self.input_shape
            X = X.reshape(-1, channels, height, width)
        else:  # Already in image format (N, H, W, C) like TinyImageNet
            X = X.reshape(-1, *self.input_shape)
            if len(X.shape) == 4:
                X = X.transpose(0, 3, 1, 2)  # Convert from (N, H, W, C) to (N, C, H, W)

        # Normalize pixel values to [0,1]
        X = X.astype(np.float32) / 255.0

        return X

    # `train(X_train, y_train, X_val, y_val, dataset_name='Unknown')` -->
    #   - Train the CNN model
    def train(self, X_train, y_train, X_val, y_val, metadata):
        # Set metadata from dataset
        self.dataset_name = metadata["dataset_name"]
        self.input_shape = metadata["input_shape"]
        self.num_classes = metadata["num_classes"]

        # Initialize model with correct parameters
        self.model = CNNModel(
            in_channels=self.input_shape[0], num_classes=self.num_classes
        ).to(self.device)

        # Preprocess data
        X_train = self._preprocess_data(X_train)

        # Convert to tensors
        X_train = torch.FloatTensor(X_train)
        y_train = torch.LongTensor(y_train.astype(int))

        # Create data loader
        train_dataset = TensorDataset(X_train, y_train)
        train_loader = DataLoader(
            train_dataset, batch_size=self.batch_size, shuffle=True
        )

        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(self.model.parameters())

        for epoch in range(self.epochs):
            self.model.train()
            train_loss = 0
            pbar = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{self.epochs}")

            for batch_idx, (data, target) in enumerate(pbar):
                data, target = data.to(self.device), target.to(self.device)
                optimizer.zero_grad()
                output = self.model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()

                pbar.set_postfix({"loss": f"{train_loss / (batch_idx + 1):.4f}"})

        return self

    # `evaluate(X_test, y_test)` -->
    #   - Evaluate the model accuracy and prediction speed
    def evaluate(self, X_test, y_test):
        accuracy = self._evaluate_accuracy(X_test, y_test)
        prediction_speed = self._measure_prediction_speed(X_test)

        return {"accuracy": accuracy, "prediction_speed": prediction_speed}

    # `predict(X)` -->
    #   - Make predictions using the trained model
    def predict(self, X, batch_size=16):
        self.model.eval()
        X = self._preprocess_data(X)
        X = torch.FloatTensor(X).to(self.device)

        with torch.no_grad():
            predictions = self.model(X).argmax(dim=1).cpu().numpy()
        return predictions

    # `generate_umap(X, predictions)` -->
    #   - Generate and save UMAP visualization
    def generate_umap(self, X, predictions):
        _, umap_path = self._generate_paths()

        # Reshape the array
        X_flat = X.reshape(X.shape[0], -1)

        reducer = umap.UMAP(n_neighbors=15, min_dist=0.1, n_components=2, n_jobs=-1)
        X_umap = reducer.fit_transform(X_flat)

        plt.figure(figsize=(12, 10), dpi=300)
        scatter = sns.scatterplot(
            x=X_umap[:, 0],
            y=X_umap[:, 1],
            hue=predictions,
            palette="tab10",
            alpha=0.8,
            s=100,
            edgecolor="black",
            linewidth=0.5,
            legend="full",
        )

        scatter.legend(title="Class", fontsize=12)
        plt.title("UMAP Visualization of Predictions", fontsize=14, pad=20)
        plt.xlabel("UMAP Component 1", fontsize=12)
        plt.ylabel("UMAP Component 2", fontsize=12)
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.savefig(umap_path, dpi=300, bbox_inches="tight")
        plt.close()

    # `save()` -->
    #   - Save the model to file
    def save(self):
        model_path, _ = self._generate_paths()
        # Save both model and configuration
        config = {
            "model": self.model,
            "input_shape": self.input_shape,
            "num_classes": self.num_classes,
        }
        joblib.dump(config, model_path)
        return model_path

    # `load()` -->
    #   - Load the model from file
    def load(self, filename=None):
        if filename is None:
            model_path, _ = self._generate_paths()
            filename = model_path

        # Load configuration
        config = joblib.load(filename)

        # Set model and configuration
        self.model = config["model"]
        self.input_shape = config["input_shape"]
        self.num_classes = config["num_classes"]

        print("Loaded model configuration:")
        print(f"Input shape: {self.input_shape}")
        print(f"Number of classes: {self.num_classes}")

        return self

    # `_evaluate_accuracy(X_test, y_test)` -->
    #   - Helper method to evaluate model accuracy
    def _evaluate_accuracy(self, X_test, y_test):
        self.model.eval()

        # Preprocess the data
        X_test = self._preprocess_data(X_test)

        # Convert to PyTorch tensors
        X_test = torch.FloatTensor(X_test)
        y_test = torch.LongTensor(y_test)

        test_dataset = TensorDataset(X_test, y_test)
        test_loader = DataLoader(test_dataset, batch_size=self.batch_size)

        correct = 0
        total = 0

        with torch.no_grad():
            for data, target in tqdm(test_loader):
                data, target = data.to(self.device), target.to(self.device)
                outputs = self.model(data)
                _, predicted = torch.max(outputs.data, 1)
                total += target.size(0)
                correct += (predicted == target).sum().item()

        return correct / total

    # `_measure_prediction_speed(X_test, n_trials=100)` -->
    #   - Helper method to measure prediction speed
    def _measure_prediction_speed(self, X_test, n_trials=100):
        self.model.eval()

        # Preprocess the data first
        X_test = self._preprocess_data(X_test)
        X_test = torch.FloatTensor(X_test)

        total_time = 0
        with torch.no_grad():
            for _ in tqdm(range(n_trials)):
                start_time = time.time()
                self.model(X_test.to(self.device))
                total_time += time.time() - start_time

        return total_time / n_trials


## Classification Tree

In [4]:
import time
import joblib
import os
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import accuracy_score
from sklearn.model_selection import GridSearchCV, cross_val_score
from sklearn.tree import DecisionTreeClassifier
from tqdm import tqdm
import umap


class ClassificationTree:
    def __init__(self, base_max_depth=None, random_state=42):
        self.base_max_depth = base_max_depth
        self.random_state = random_state
        self.model = None
        self.dataset_name = "Unknown"
        self.is_grid_search = False
        self.base_path = "../out"

    # `_generate_paths()` -->
    #   - Generate the paths for saving models and visualizations
    def _generate_paths(self):
        # Create the base directories if they don't exist
        model_dir = f"{self.base_path}/trained_models"
        viz_dir = f"{self.base_path}/visualizations"
        os.makedirs(model_dir, exist_ok=True)
        os.makedirs(viz_dir, exist_ok=True)

        # Generate the base name for files
        base_name = f"ClassificationTree{'_GridSearch' if self.is_grid_search else ''}_{self.dataset_name}"

        # Generate complete paths
        model_path = f"{model_dir}/{base_name}.joblib"
        umap_path = f"{viz_dir}/{base_name}_umap.png"

        return model_path, umap_path

    # `train(X_train, y_train) -->
    #   - Directly train the model without grid search
    def train(self, X_train, y_train):
        self.is_grid_search = False

        # Initialize the model
        self.model = DecisionTreeClassifier(
            max_depth=self.base_max_depth, random_state=self.random_state
        )

        # Fit the model
        self.model.fit(X_train, y_train)

        # Return self
        return self

    # `grid_train(X_train, y_train) -->
    #   - Train the model using grid search
    def grid_train(self, X_train, y_train):
        self.is_grid_search = True

        # Define the parameter grid
        param_grid = {
            "max_depth": [2, 4, 8, 16, 32, None],
            "min_samples_split": [2, 5, 10],
            "min_samples_leaf": [1, 2, 4, 8],
            "criterion": ["gini", "entropy"],
        }

        # Initialize the base classifier
        base_classifier = DecisionTreeClassifier(
            max_depth=self.base_max_depth, random_state=self.random_state
        )

        # Initialize the grid search
        grid_search = GridSearchCV(
            estimator=base_classifier,
            param_grid=param_grid,
            cv=5,
            n_jobs=-1,
            verbose=3,
        )

        # Fit the grid search
        grid_search.fit(X_train, y_train)

        # Get the best model
        self.model = grid_search.best_estimator_

        # Return self
        return self

    # `prune(X_val, y_val) -->
    #   - Prune the model using cost complexity pruning
    def prune(self, X_val, y_val):
        # Get the pruning path
        pruning_path = self.model.cost_complexity_pruning_path(X_val, y_val)

        # Get the alphas and impurities (unused)
        alphas, _ = pruning_path.ccp_alphas, pruning_path.impurities

        # Initialize variables to store the best classifier and its cross-validation score
        best_classifier = None
        best_cv_mean_score = 0

        # Loop through the alphas and prune the model
        for alpha in tqdm(alphas):
            # Initialize the pruned classifier
            pruned_classifier = DecisionTreeClassifier(
                random_state=self.random_state, ccp_alpha=alpha
            )

            # Fit the pruned classifier
            pruned_classifier.fit(X_val, y_val)

            # Evaluate the pruned classifier
            cv_score = pruned_classifier.score(X_val, y_val)

            # Check if the current classifier has a better cross-validation score
            if cv_score > best_cv_mean_score:
                # Update the best classifier and its cross-validation score
                best_classifier = pruned_classifier
                best_cv_mean_score = cv_score

        # Return self
        self.model = best_classifier
        return self

    # `evaluate(X_test, y_test, cv=5) -->
    #   - Evaluate the model using:
    #       - Cross-validation score
    #       - Accuracy score
    #       - Prediction speed
    def evaluate(self, X_test, y_test, cv=5):
        cv_score = cross_val_score(self.model, X_test, y_test, cv=cv).mean()
        predictions = self.model.predict(X_test)
        accuracy = accuracy_score(y_test, predictions)
        prediction_speed = self._measure_prediction_speed(X_test)

        return {
            "cv_score": cv_score,
            "accuracy": accuracy,
            "prediction_speed": prediction_speed,
        }

    # `predict(X)` -->
    #   - Predict the class labels for the given data
    def predict(self, X):
        return self.model.predict(X)

    # `generate_umap(X, predictions, save_path=None)` -->
    #   - Generate UMAP visualization
    def generate_umap(self, X, predictions):
        # Get save path
        _, umap_path = self._generate_paths()

        # Initialize UMAP reducer
        reducer = umap.UMAP(n_neighbors=15, min_dist=0.1, n_components=2, n_jobs=-1)
        X_umap = reducer.fit_transform(X)

        # Initialize plot
        plt.figure(figsize=(12, 10), dpi=300)
        scatter = sns.scatterplot(
            x=X_umap[:, 0],
            y=X_umap[:, 1],
            hue=predictions,
            palette="tab10",
            alpha=0.8,
            s=100,
            edgecolor="black",
            linewidth=0.5,
            legend="full",
        )

        # Initialize legend
        scatter.legend(title="Class", fontsize=12)
        plt.title("UMAP Visualization of Predictions", fontsize=14, pad=20)
        plt.xlabel("UMAP Component 1", fontsize=12)
        plt.ylabel("UMAP Component 2", fontsize=12)
        plt.grid(True, alpha=0.3)
        plt.tight_layout()

        if umap_path:
            # Save plot
            plt.savefig(umap_path, dpi=300, bbox_inches="tight")
        plt.close()

    # `save(self, filename)` -->
    #    - Save the model to a file
    def save(self):
        # Generate model path
        model_path, _ = self._generate_paths()

        # Save the model to a file
        joblib.dump(self.model, model_path)
        return model_path

    # `load(self, filename)` -->
    #    - Load the model from a file
    def load(self, filename=None):
        if filename is None:
            # Generate paths if filename is not provided
            model_path, *_ = self._generate_paths()
            filename = model_path

        # Load the model from the file
        self.model = joblib.load(filename)
        return self

    # `_measure_prediction_speed(self, X_test, n_trials=100)` -->
    #    - Helper function to measure average prediction speed
    def _measure_prediction_speed(self, X_test, n_trials=100):
        total_time = 0
        for _ in tqdm(range(n_trials)):
            start_time = time.time()
            self.model.predict(X_test)
            total_time += time.time() - start_time
        return total_time / n_trials


## Main Execution (MNIST)

In [None]:


def main():
    print("<< === Loading Data === >>")
    # print("--> Loading tiny-imagenet")
    # data = load_tiny_imagenet()
    print("--> Loading MNIST")
    data = load_mnist()

    # Separate data into sets
    X_train, y_train = data["train"]
    X_val, y_val = data["val"]
    X_test, y_test = data["test"]
    metadata = data["metadata"]

    # ---------- Classification Tree ---------- #
    print("<< === Classification Tree === >>")
    clf_tree = ClassificationTree(base_max_depth=3)
    clf_tree.dataset_name = "MNIST"

    print("--> Loading Model")
    clf_tree.load()

    print("--> Training Model")
    clf_tree.train(X_train, y_train)

    print("--> Pruning Model")
    clf_tree.prune(X_val, y_val)

    print("--> Saving Model")
    clf_tree.save()

    print("--> Running Predictions")
    predictions = clf_tree.predict(X_test)

    print("--> Generating UMAP")
    clf_tree.generate_umap(X_test, predictions)

    print("--> Evaluating Model")
    clf_tree_evaluation = clf_tree.evaluate(X_test, y_test)

    print("\n----- RESULTS -----")
    print(f"Accuracy: {clf_tree_evaluation['accuracy']}")

    # --------- CNN Deep Learning ---------- #
    print("<< === CNN Deep Learning === >>")
    cnn = CNN(epochs=20, batch_size=64)
    cnn.dataset_name = "MNIST"

    # print("--> Loading Model")
    # cnn.load()

    print("--> Training Model")
    cnn.train(X_train, y_train, X_val, y_val, metadata)

    print("--> Saving Model")
    cnn.save()

    print("--> Running Predictions")
    cnn_predictions = cnn.predict(X_test)

    print("--> Generating UMAP")
    cnn.generate_umap(X_test, cnn_predictions)

    print("--> Evaluating Model")
    cnn_evaluation = cnn.evaluate(X_test, y_test)

    print("\n----- RESULTS -----")
    print(f"Accuracy: {cnn_evaluation['accuracy']}")


if __name__ == "__main__":
    main()


## Main Execution (tiny-imagenet -> ISSUE HERE)

In [None]:


def main():
    print("<< === Loading Data === >>")
    print("--> Loading tiny-imagenet")
    data = load_tiny_imagenet()

    # Separate data into sets
    X_train, y_train = data["train"]
    X_val, y_val = data["val"]
    X_test, y_test = data["test"]
    metadata = data["metadata"]

    # ---------- Classification Tree ---------- #
    print("<< === Classification Tree === >>")
    clf_tree = ClassificationTree(base_max_depth=3)
    clf_tree.dataset_name = "tiny-imagenet"

    # print("--> Loading Model")
    # clf_tree.load()

    print("--> Training Model")
    clf_tree.train(X_train, y_train)

    print("--> Pruning Model")
    clf_tree.prune(X_val, y_val)

    print("--> Saving Model")
    clf_tree.save()

    print("--> Running Predictions")
    predictions = clf_tree.predict(X_test)

    print("--> Generating UMAP")
    clf_tree.generate_umap(X_test, predictions)

    print("--> Evaluating Model")
    clf_tree_evaluation = clf_tree.evaluate(X_test, y_test)

    print("\n----- RESULTS -----")
    print(f"Accuracy: {clf_tree_evaluation['accuracy']}")

    # --------- CNN Deep Learning ---------- #
    print("<< === CNN Deep Learning === >>")
    cnn = CNN(epochs=20, batch_size=64)
    cnn.dataset_name = "tiny-imagenet"

    # print("--> Loading Model")
    # cnn.load()

    print("--> Training Model")
    cnn.train(X_train, y_train, X_val, y_val, metadata)

    print("--> Saving Model")
    cnn.save()

    print("--> Running Predictions")
    cnn_predictions = cnn.predict(X_test)

    print("--> Generating UMAP")
    cnn.generate_umap(X_test, cnn_predictions)

    print("--> Evaluating Model")
    cnn_evaluation = cnn.evaluate(X_test, y_test)

    print("\n----- RESULTS -----")
    print(f"Accuracy: {cnn_evaluation['accuracy']}")


if __name__ == "__main__":
    main()


<< === Loading Data === >>
--> Loading tiny-imagenet
<< === Classification Tree === >>
--> Training Model
