Para rodar o MNIST, tens que rodar o arquivo MNIST.ipynb. Se não quiser pode rodar todas as células menos a 5 e a 6.

In [16]:
import pickle
import time

import matplotlib.pyplot as plt
import numpy as np
from mnist1d.data import get_dataset_args, make_dataset
from tqdm import tqdm

np.random.seed(42)
# matplotlib.use('TkAgg')


class Activation:
    @staticmethod
    def relu(z):
        return np.maximum(0, z)

    @staticmethod
    def relu_derivative(z):
        return np.where(z > 0, 1, 0)

    @staticmethod
    def sigmoid(z):
        return 1 / (1 + np.exp(-z))

    @staticmethod
    def sigmoid_derivative(z):
        s = Activation.sigmoid(z)
        return s * (1 - s)

    @staticmethod
    def tanh(z):
        return np.tanh(z)

    @staticmethod
    def tanh_derivative(z):
        return 1 - np.tanh(z) ** 2

    @staticmethod
    def LeakyReLU(z, alpha=0.01):
        return np.maximum(alpha * z, z)

    @staticmethod
    def LeakyReLU_derivative(z, alpha=0.01):
        return np.where(z > 0, 1, alpha)

    @staticmethod
    def linear(z):
        return z

    @staticmethod
    def linear_derivative(z):
        return np.ones_like(z)

    @staticmethod
    def softmax(z):
        # Ficou brabo, not stable
        # (https://stackoverflow.com/questions/40575841/numpy-calculate-the-derivative-of-the-softmax-function)
        # So we subtract the maximum value from the input to make it stable
        exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))
        return exp_z / np.sum(exp_z, axis=1, keepdims=True)

    @staticmethod
    def softmax_derivative(z):
        # Compute softmax for z
        # s = Activation.softmax(z)  # shape (batch_size, num_classes)
        return 1


class Loss:
    @staticmethod
    def MSE(y, y_hat):
        return 1 / y.shape[0] * (y - y_hat).T @ (y - y_hat)

    def MSE_derivative(y, y_hat):
        return y_hat - y

    @staticmethod
    def BCE(y, y_hat):
        return -1 / y.shape[0] * (y * np.log(y_hat) + (1 - y) * np.log(1 - y_hat))

    def BCE_derivative(y, y_hat):
        return (y_hat - y) / (y_hat * (1 - y_hat))

    @staticmethod
    def CrossEntropy(y, y_hat):
        y_hat = np.clip(y_hat, 1e-15, 1 - 1e-15)
        return -1 / y.shape[0] * np.sum(y * np.log(y_hat))

    def CrossEntropy_derivative(y, y_hat):
        return y_hat - y


class Initialization:
    @staticmethod
    def xavier(size):
        input = size[0]
        stddev = np.sqrt(1 / input)
        return np.random.normal(0, stddev, size=size)

    @staticmethod
    def he(size):
        fan_in = size[0]
        stddev = np.sqrt(2 / fan_in)
        return np.random.normal(0, stddev, size=size)

    @staticmethod
    def normal(size):
        return np.random.randn(*size)

    @staticmethod
    def uniform(size):
        return np.random.uniform(size=size)

    @staticmethod
    def zeros(size):
        return np.zeros(size)

    @staticmethod
    def explode(size):
        return 1000000 * np.random.randn(*size)

    @staticmethod
    def vanish(size):
        return np.random.normal(size=size) / 1000000


class Optimizer:
    def __init__(self, learning_rate, network):
        self.learning_rate = learning_rate
        self.grads = network.grads
        self.weights = network.weights
        self.biases = network.biases

    def zero_grad(self):
        return {key: np.zeros_like(value) for key, value in self.grads.items()}


class SGD(Optimizer):
    def __init__(self, learning_rate, network):
        super().__init__(learning_rate, network)

    def step(self):
        for i in range(len(self.weights)):
            self.weights[i] = self.weights[i] - self.learning_rate * self.grads[f"dw{i + 1}"]
            self.biases[i] = self.biases[i] - self.learning_rate * self.grads[f"db{i + 1}"]

        return self.weights, self.biases


class Adam(Optimizer):
    def __init__(self, learning_rate, network, beta1=0.9, gamma=0.999, epsilon=1e-8):
        super().__init__(learning_rate, network)
        self.beta1 = beta1
        self.gamma = gamma
        self.epsilon = epsilon
        # Initializing as zeros so that we can have momentum and RMSprop for each parameter
        self.m_w = [np.zeros_like(w) for w in self.weights]
        self.v_w = [np.zeros_like(w) for w in self.weights]
        self.m_b = [np.zeros_like(b) for b in self.biases]
        self.v_b = [np.zeros_like(b) for b in self.biases]
        self.t = 0

    def step(self):
        self.t += 1
        for i in range(len(self.weights)):
            # Update for weights
            self.m_w[i] = self.beta1 * self.m_w[i] + (1 - self.beta1) * self.grads[f"dw{i + 1}"]
            self.v_w[i] = self.gamma * self.v_w[i] + (1 - self.gamma) * np.square(
                self.grads[f"dw{i + 1}"]
            )
            m_w_hat = self.m_w[i] / (1 - self.beta1**self.t)
            v_w_hat = self.v_w[i] / (1 - self.gamma**self.t)
            self.weights[i] = self.weights[i] - self.learning_rate * m_w_hat / (
                np.sqrt(v_w_hat) + self.epsilon
            )

            # Update for biases
            self.m_b[i] = self.beta1 * self.m_b[i] + (1 - self.beta1) * self.grads[f"db{i + 1}"]
            self.v_b[i] = self.gamma * self.v_b[i] + (1 - self.gamma) * np.square(
                self.grads[f"db{i + 1}"]
            )
            m_b_hat = self.m_b[i] / (1 - self.beta1**self.t)
            v_b_hat = self.v_b[i] / (1 - self.gamma**self.t)
            self.biases[i] = self.biases[i] - self.learning_rate * m_b_hat / (
                np.sqrt(v_b_hat) + self.epsilon
            )

        return self.weights, self.biases


class MLP:
    def __init__(self, neurons, activations, weight_initialization) -> None:
        self.MLP_DEPTH = len(neurons) - 1
        self.neurons = neurons
        if len(activations) == 1:
            self.activations = [activations[0] for _ in range(self.MLP_DEPTH)]
        elif len(activations) == 2 and self.MLP_DEPTH > 2:
            self.activations = [activations[0] for _ in range(self.MLP_DEPTH)]
            self.activations[-1] = activations[1]
        else:
            self.activations = activations

        self.activation_derivatives = [
            getattr(Activation, f"{activation.__name__}_derivative")
            for activation in self.activations
        ]

        if isinstance(weight_initialization, list):
            # Direct weight assignment
            self.weights = weight_initialization
        else:
            # Use initialization method
            self.weights = [
                np.squeeze(weight_initialization((neurons[i + 1], neurons[i])))
                for i in range(self.MLP_DEPTH)
            ]

        self.biases = [np.zeros((neurons[i + 1])) for i in range(self.MLP_DEPTH)]

        self.cache = {}
        self.grads = {}

    def forward(self, input):
        self.cache["a0"] = a = input

        for i in range(self.MLP_DEPTH):
            z = a @ self.weights[i].T + self.biases[i]
            a = self.activations[i](z)

            self.cache[f"w{i + 1}"] = self.weights[i]
            self.cache[f"z{i + 1}"] = z
            self.cache[f"a{i + 1}"] = a

        return a

    def backward(self, y, loss):
        BATCH_SIZE = y.shape[0]

        self.loss_derivative = getattr(Loss, f"{loss.__name__}_derivative")

        # Output layer
        dz = self.loss_derivative(y, self.cache[f"a{self.MLP_DEPTH}"])
        da = self.activation_derivatives[self.MLP_DEPTH - 1](self.cache[f"z{self.MLP_DEPTH}"])
        dl = dz * da

        self.grads[f"dw{self.MLP_DEPTH}"] = (
            (1 / BATCH_SIZE) * dl.T @ self.cache[f"a{self.MLP_DEPTH - 1}"]
        )
        self.grads[f"db{self.MLP_DEPTH}"] = (1 / BATCH_SIZE) * np.sum(dz, axis=0, keepdims=True)

        for i in range(self.MLP_DEPTH, 1, -1):
            if len(dl.shape) == 1:
                dl = dl.reshape(-1, 1)
            if len(self.cache[f"w{i}"].shape) == 1:
                self.cache[f"w{i}"] = self.cache[f"w{i}"].reshape(1, -1)

            dz = dl @ self.cache[f"w{i}"]

            dz *= self.activation_derivatives[i - 1](self.cache[f"a{i - 1}"])

            self.grads[f"dw{i - 1}"] = (1 / BATCH_SIZE) * dz.T @ self.cache[f"a{i - 2}"]
            self.grads[f"db{i - 1}"] = (1 / BATCH_SIZE) * np.sum(dz, axis=0, keepdims=True)
            dl = dz

    def save(self, filepath):
        """Save the model weights and biases to a file."""
        model_data = {
            "weights": self.weights,
            "biases": self.biases,
            "neurons": self.neurons,
            "activations": [act.__name__ for act in self.activations],
        }
        with open(filepath, "wb") as f:
            pickle.dump(model_data, f)
        print(f"Model saved to {filepath}")

    @classmethod
    def load(cls, filepath, weight_initialization):
        """Load a model from a file."""
        with open(filepath, "rb") as f:
            model_data = pickle.load(f)

        # Recreate activation functions from their names
        activations = [getattr(Activation, act_name) for act_name in model_data["activations"]]

        # Create a new instance of the model
        model = cls(model_data["neurons"], activations, weight_initialization)

        # Load the weights and biases
        model.weights = model_data["weights"]
        model.biases = model_data["biases"]

        print(f"Model loaded from {filepath}")
        return model

In [17]:
class Dataset:
    def __init__(
        self,
        train_set,
        train_labels,
        test_set,
        test_labels,
        val_split=0.15,
        num_classes=10,
    ):
        self.train_set = train_set
        self.test_set = test_set
        self.num_classes = num_classes

        # One-hot encode the labels
        self.train_labels = self.one_hot_encode(train_labels)
        self.test_labels = self.one_hot_encode(test_labels)

        # Create validation set
        val_size = int(len(train_set) * val_split)
        self.val_set = train_set[-val_size:]
        self.val_labels = self.train_labels[-val_size:]
        self.train_set = train_set[:-val_size]
        self.train_labels = self.train_labels[:-val_size]

    def one_hot_encode(self, labels):
        # Ensure labels is a 1D array
        labels = np.array(labels).ravel()
        # Check if labels are in the correct range
        if np.min(labels) < 0 or np.max(labels) >= self.num_classes:
            raise ValueError(f"Labels must be in the range 0 to {self.num_classes - 1}")
        # Create one-hot encoded array
        one_hot = np.zeros((labels.size, self.num_classes))
        one_hot[np.arange(labels.size), labels] = 1
        return one_hot

    def create_batches(self, batch_size, dataset="train"):
        if dataset == "train":
            data, labels = self.train_set, self.train_labels
        elif dataset == "val":
            data, labels = self.val_set, self.val_labels
        elif dataset == "test":
            data, labels = self.test_set, self.test_labels
        else:
            raise ValueError("dataset must be 'train', 'val', or 'test'")

        indices = np.arange(len(data))
        np.random.shuffle(indices)
        for start_idx in range(0, len(data), batch_size):
            batch_indices = indices[start_idx : start_idx + batch_size]
            yield data[batch_indices], labels[batch_indices]

    def get_test_data(self):
        return self.test_set, self.test_labels

    def get_val_data(self):
        return self.val_set, self.val_labels

In [18]:
def update_plots_grad_actv(ax1, ax2, weight_grads, activations):
    # Clear the previous plots
    ax1.clear()
    ax2.clear()

    # --- Update Weight Histogram on ax1 ---
    # Combine all layer weights into a single array for the histogram
    # print(len(activations))
    all_grads = np.concatenate([w.flatten() for w in weight_grads])
    all_activations = np.concatenate([a.flatten() for a in activations])

    # Plot histogram of all weights combined
    ax1.hist(all_grads, bins=50, color="blue", alpha=0.7)

    # Add title and labels
    ax1.set_title("Weight Grads Distribution")
    ax1.set_xlabel("Weight values")
    ax1.set_ylabel("Frequency")

    # --- Plot Loss over Epochs on ax2 ---
    ax2.hist(all_activations, bins=50, color="red", alpha=0.7)

    # Add title and labels
    ax2.set_title("Activations Distribution")
    ax2.set_xlabel("Activation values")
    ax2.set_ylabel("Frequency")

    # Adjust layout to prevent overlap
    plt.tight_layout()

    # Draw the updated plots
    fig = ax1.figure  # Get the figure object associated with the axes
    fig.canvas.draw()
    fig.canvas.flush_events()


def update_plots_loss_acc(bx1, bx2, loss_history, acc_history, total_epochs):
    # Clear the previous plots
    bx1.clear()
    bx2.clear()

    # --- Plot Loss over Epochs on bx2 ---
    bx1.plot(loss_history, color="red")

    # Add title and labels
    bx1.set_title("Loss over Epochs")
    bx1.set_xlabel("Epoch")
    bx1.set_ylabel("Loss")

    bx1.set_xlim(0, total_epochs)  # Set x-axis limits to number of epochs

    # --- Plot accuracy over Epochs on ax2 ---
    bx2.plot(acc_history, color="red")

    # Add title and labels
    bx2.set_title("Accuracy over Epochs in the Validation Data")
    bx2.set_xlabel("Epoch")
    bx2.set_ylabel("Accuracy")

    bx2.set_xlim(0, total_epochs)  # Set x-axis limits to number of epochs
    bx2.set_ylim(0, 1)  # Set y-axis limits to 0-1

    # Adjust layout to prevent overlap
    plt.tight_layout()

    # Draw the updated plots
    fig = bx1.figure  # Get the figure object associated with the axes
    fig.canvas.draw()
    fig.canvas.flush_events()

    # Optional: Pause to allow the plot to update
    # plt.pause(0.01)

In [19]:
def test_whole_set(model, test_data, test_labels, loss_fn):
    # @model_eval
    y_hat = model.forward(test_data)
    total_loss = loss_fn(test_labels, y_hat)

    predictions = np.argmax(y_hat, axis=1)
    true_labels = np.argmax(test_labels, axis=1)
    accuracy = np.mean(predictions == true_labels)

    return accuracy, total_loss


def train(
    model,
    dataset,
    epochs,
    optimizer,
    loss_fn,
    batch_size=32,
    save_path="./models/best_model.pkl",
    plot=True,
):
    metrics = {}
    metrics["train_loss"] = []
    metrics["val_loss"] = []
    metrics["val_acc"] = []
    metrics["network_grads"] = []
    metrics["network_activations"] = []
    metrics["train_time"] = []
    best_val_acc = 0

    if plot:
        # Set up the plot outside the loop with three subplots
        plt.ion()  # Turn on interactive mode for live updating plots
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
        fig1, (bx1, bx2) = plt.subplots(1, 2, figsize=(14, 6))

    for epoch in tqdm(range(epochs)):
        batch_loss = 0

        for X_batch, y_batch in dataset.create_batches(batch_size, "train"):
            start = time.time()
            pred = model.forward(X_batch)
            batch_loss += loss_fn(y_batch, pred)
            model.backward(y_batch, loss_fn)
            model.weights, model.biases = optimizer.step()
            metrics["network_grads"].append([v for v in model.grads.values()])
            metrics["network_activations"].append(
                [v for k, v in model.cache.items() if "a" in k and "0" not in k]
            )
            if plot:
                update_plots_grad_actv(
                    ax1,
                    ax2,
                    metrics["network_grads"][-1],
                    metrics["network_activations"][-1],
                )

        metrics["train_time"].append(time.time() - start)
        # Validate on validation set
        val_acc, val_loss = test_whole_set(model, *dataset.get_val_data(), loss_fn)
        metrics["val_loss"].append(val_loss)
        metrics["val_acc"].append(val_acc)
        metrics["train_loss"].append(batch_loss / (len(dataset.train_set) // batch_size))
        if plot:
            update_plots_loss_acc(bx1, bx2, metrics["train_loss"], metrics["val_acc"], epochs - 1)
        print(
            f"Epoch {epoch} Train Loss: {metrics['train_loss'][-1]:.4f}, Val Acc: {val_acc:.4f}, Val Loss: {val_loss:.4f}"
        )

        # Save the best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            model.save(save_path)
            print(f"New best model saved with validation accuracy: {best_val_acc:.4f}")

    return metrics

In [20]:
train_set = np.load("train_data.npy")
train_labels = np.load("train_labels.npy")
test_set = np.load("test_data.npy")
test_labels = np.load("test_labels.npy")

dataset = Dataset(train_set, train_labels, test_set, test_labels)

defaults = get_dataset_args()
data = make_dataset(defaults)

X_train, y_train = data["x"], data["y"].reshape(-1, 1)
X_val, y_val = data["x_test"], data["y_test"].reshape(-1, 1)


mnist_1d = Dataset(X_train, y_train, X_val, y_val)

FileNotFoundError: [Errno 2] No such file or directory: 'train_data.npy'

In [None]:
EPOCHS = 5
BATCH_SIZE = 64
loss_fn = Loss.CrossEntropy
MODEL = MLP([784, 512, 10], [Activation.relu, Activation.softmax], Initialization.he)
optim = Adam(4e-3, MODEL)
training_metrics = train(MODEL, dataset, EPOCHS, optim, loss_fn, BATCH_SIZE, plot=False)

 20%|██        | 1/5 [00:03<00:15,  3.95s/it]

Epoch 0 Train Loss: 2.6343, Val Acc: 0.9006, Val Loss: 1.5518
Model saved to ./models/best_model.pkl
New best model saved with validation accuracy: 0.9006


 40%|████      | 2/5 [00:07<00:10,  3.60s/it]

Epoch 1 Train Loss: 1.2271, Val Acc: 0.8963, Val Loss: 0.8487


 60%|██████    | 3/5 [00:11<00:07,  3.84s/it]

Epoch 2 Train Loss: 0.6356, Val Acc: 0.9141, Val Loss: 0.5030
Model saved to ./models/best_model.pkl
New best model saved with validation accuracy: 0.9141


 80%|████████  | 4/5 [00:15<00:04,  4.11s/it]

Epoch 3 Train Loss: 0.4925, Val Acc: 0.9006, Val Loss: 0.5398


100%|██████████| 5/5 [00:21<00:00,  4.24s/it]

Epoch 4 Train Loss: 0.4703, Val Acc: 0.9043, Val Loss: 0.4095





In [None]:
EPOCHS = 2
BATCH_SIZE = 64
loss_fn = Loss.CrossEntropy
MODEL = MLP(
    [40, 512, 256, 128, 127, 10],
    [Activation.relu, Activation.softmax],
    Initialization.he,
)
optim = Adam(1e-3, MODEL)
training_metrics = train(MODEL, mnist_1d, EPOCHS, optim, loss_fn, BATCH_SIZE)

 50%|█████     | 1/2 [00:08<00:08,  8.09s/it]

Epoch 0 Train Loss: 2.1075, Val Acc: 0.3550, Val Loss: 1.6645
Model saved to ./models/best_model.pkl
New best model saved with validation accuracy: 0.3550


100%|██████████| 2/2 [00:15<00:00,  7.99s/it]

Epoch 1 Train Loss: 1.4582, Val Acc: 0.4300, Val Loss: 1.4615
Model saved to ./models/best_model.pkl
New best model saved with validation accuracy: 0.4300





In [None]:
loaded_model = MLP.load("models/best_model.pkl", Initialization.he)

test_acc, test_loss = test_whole_set(loaded_model, *mnist_1d.get_test_data(), loss_fn)
print(f"Test Accuracy: {test_acc:.4f}, Test Loss: {test_loss:.4f}")

Model loaded from models/best_model.pkl
Test Accuracy: 0.4490, Test Loss: 1.4374
