<a href="https://colab.research.google.com/github/MichalSlowakiewicz/Deep-Neural-Network/blob/master/dnn_lab_2_backprop_student_version.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<center><img src='https://drive.google.com/uc?id=1_utx_ZGclmCwNttSe40kYA6VHzNocdET' height="60"></center>

AI TECH - Akademia Innowacyjnych Zastosowań Technologii Cyfrowych. Program Operacyjny Polska Cyfrowa na lata 2014-2020
<hr>

<center><img src='https://drive.google.com/uc?id=1BXZ0u3562N_MqCLcekI-Ens77Kk4LpPm'></center>

<center>
Projekt współfinansowany ze środków Unii Europejskiej w ramach Europejskiego Funduszu Rozwoju Regionalnego
Program Operacyjny Polska Cyfrowa na lata 2014-2020,
Oś Priorytetowa nr 3 "Cyfrowe kompetencje społeczeństwa" Działanie  nr 3.2 "Innowacyjne rozwiązania na rzecz aktywizacji cyfrowej"
Tytuł projektu:  „Akademia Innowacyjnych Zastosowań Technologii Cyfrowych (AI Tech)”
    </center>

# Laboratory Scenario 2 - Backpropagation and Gradient Checkpointing

In this lab scenario, you are given an implementation of a simple neural network, and your goal is to implement the backpropagation procedure for this network.  

To be more precise, the network inputs a tensor $x$ of shape `(MINI_BATCH_SIZE, 28*28)`, where each element of the batch represents a flattened grayscale image of shape `(28, 28)`.  
In exercise 1, you can assume that images in the minibatch are fed to the network one by one (as tensors of shape `(1, 28*28)` - single image and `(1, 10)` - image class).  
In exercise 2 you are asked to make the backpropagation work without this assumption, on whole mini-batches.  
In exercise 3, you will implement a technique called *gradient checkpointing*, that allows you to reduce the amount of memory used to store activations for backpropagation.

In [3]:
from pathlib import Path
from typing import Sequence, Iterator, cast
from typing_extensions import override

import numpy as np
import PIL.Image
from numpy.typing import NDArray
from tqdm.auto import tqdm

FloatNDArray = NDArray[np.float64]

## Loading the MNIST dataset

In [4]:
!wget --no-verbose -O mnist.npz https://s3.amazonaws.com/img-datasets/mnist.npz

2025-10-15 09:19:22 URL:https://s3.amazonaws.com/img-datasets/mnist.npz [11490434/11490434] -> "mnist.npz" [1]


In [5]:
def load_mnist(
    path: Path = Path('mnist.npz')
) -> tuple[FloatNDArray, FloatNDArray, FloatNDArray, FloatNDArray]:
    '''
    Load the MNIST dataset (grayscale 28 x 28 images of hand-written digits).

    Returns tuple of:
    - x_train: shape (N_train, H * W), grayscale values 0..1.
    - y_train: shape (N_train, 10), one-hot-encoded label, dtype float64.
    - x_test: shape (N_test, H * W), grayscale values 0..1.
    - y_train: shape (N_test, 10), one-hot-encoded label, dtype float64.

    More: https://en.wikipedia.org/wiki/MNIST_database
    '''
    with np.load(path) as f:
        x_train, _y_train = f['x_train'], f['y_train']
        x_test, _y_test = f['x_test'], f['y_test']

    H = W = 28
    N_train = len(x_train)
    N_test = len(x_test)
    assert x_train.shape == (N_train, H, W) and _y_train.shape == (N_train,)
    assert x_test.shape == (N_test, H, W) and _y_test.shape == (N_test,)

    x_train = x_train.reshape(N_train, H * W) / 255.0
    x_test = x_test.reshape(N_test, H * W) / 255.0

    y_train = np.zeros((N_train, 10), dtype=np.float64)
    y_train[np.arange(N_train), _y_train] = 1

    y_test = np.zeros((N_test, 10))
    y_test[np.arange(N_test), _y_test] = 1

    return x_train, y_train, x_test, y_test


x_train, y_train, x_test, y_test = load_mnist()

In [None]:
def to_pillow_image(x: FloatNDArray, scale: int = 10) -> PIL.Image.Image:
    '''Convert example of shape (28 * 28,) and values 0..1 to Pillow image.'''
    H = W = 28
    assert x.shape == (H * W,) and 0 <= x.min() <= x.max() <= 1
    x_int = (x * 255).astype(np.uint8).reshape(H, W)
    img = PIL.Image.fromarray(x_int, mode='L')
    return img.resize((H * scale, W * scale), PIL.Image.Resampling.NEAREST)


display(to_pillow_image(x_train[0]))
print(y_train[0])

## Exercise 1

In this exercise, your task is to fill in the gaps in this code by implementing the backpropagation algorithm.
Once done, you can run the network on the MNIST example and see how it performs.  
Feel free to play with the parameters. Your model should achieve 90%+ accuracy after a few epochs.  

Before you start you should note a few things:
+ `backprop` - is the function that you need to implement
+ `learning_step` - calls `backprop` to get the gradients for network parameters
+ The derivative of the loss is already computed by `cost_derivative`.
+ Your goal is to compute $\frac{d L\left(\text{model}(x), y\right)}{d p}$ for each parameter $p$ of the network


In [6]:
def sigmoid(z: FloatNDArray) -> FloatNDArray:
    return 1.0 / (1.0 + np.exp(-z))


def sigmoid_prime(z: FloatNDArray) -> FloatNDArray:
    '''Derivative of the sigmoid.'''
    return sigmoid(z) * (1 - sigmoid(z))

In [7]:
class Network:
    '''
    Multi-Layer Perceptron.

    A simple neural network with fully-connected layers and sigmoid activations.
    '''

    def __init__(self, sizes: Sequence[int] = (784, 30, 10)):
        '''
        Args:
        - sizes: sequence of layer widths [N^0, ... , N^last]
          These are lengths of activation vectors, where:
          - N^0 is input size: 784.
          - N^last is the number of classes into which we can classify each input: 10.
        '''
        self.sizes = list(sizes)

        # We initialize weights and biases with random normal distribution.

        # List of len(sizes) - 1 vectors of shape (N^1), (N^2), ..., (N^last).
        self.biases = [np.random.randn(n) for n in sizes[1:]]

        # List of len(sizes) - 1 matrices of shape (N^i, N^{i-1}).
        # Weights are indexed by target node first.
        self.weights = [
            np.random.randn(n_out, n_in) / np.sqrt(n_in)
            for n_in, n_out in zip(sizes[:-1], sizes[1:], strict=True)
        ]

    def feedforward(self, x: FloatNDArray) -> FloatNDArray:
        '''
        Run the network on a single case of shape (N^0,).

        Returns last layer activations, shape (N^last,), values 0..1.
        '''
        g = x
        for w, b in zip(self.weights, self.biases, strict=True):
            # TODO
            # f = # pre-activations
            # g = # activations
            g = x
            f =  w @ g + b
            g = sigmoid(f)
            pass
        return g

    def learning_step(
        self, x_mini_batch: FloatNDArray, y_mini_batch: FloatNDArray, learning_rate: float
    ) -> None:
        '''
        Update network parameters with a single mini-batch step of backpropagation and gradient descent.

        Args:
        - x_mini_batch: shape (B, N^0) where B is the mini-batch size.
        - y_mini_batch: shape (B, N^last).
        - learning_rate.
        '''
        # Accumulate gradients by running backprop one dataitem at a time (without vectorization).
        grads_w = [np.zeros(w.shape) for w in self.weights]
        grads_b = [np.zeros(b.shape) for b in self.biases]
        for x, y in zip(x_mini_batch, y_mini_batch):
            item_grads_w, item_grads_b = self.backprop(x, y)
            for i in range(len(grads_w)):
                grads_w[i] = grads_w[i] + item_grads_w[i]
            for i in range(len(grads_b)):
                grads_b[i] = grads_b[i] + item_grads_b[i]

        # Gradient descent step.
        self.weights = [
            w - grad_w * (learning_rate / len(x_mini_batch))
            for w, grad_w in zip(self.weights, grads_w, strict=True)
        ]
        self.biases = [
            b - grad_b * (learning_rate / len(x_mini_batch))
            for b, grad_b in zip(self.biases, grads_b, strict=True)
        ]

    def backprop(
        self, x: FloatNDArray, y: FloatNDArray
    ) -> tuple[list[FloatNDArray], list[FloatNDArray]]:
        '''
        Backpropagation for a single input (not vectorized).

        Args:
        - x: input features, shape (N^0).
        - y: target label (one-hot encoded), shape (N^last).

        Returns (grads_w, grads_b), where:
        - grads_w is a list of gradients over weights (shape (N^i, N^{i-1})), for each layer.
        - grads_b is a list of gradients over biases (shape (N^i)), for each layer.
        '''

        # Go forward, remembering all activations.
        # Pre-activations function, layer by layer, shapes (N^1), ..., (N^last).
        fs: list[FloatNDArray] = []
        # Activations (including inputs to the first layer), shapes (N^0), (N^1), ..., (N^last).
        gs: list[FloatNDArray] = [x]

        # TODO forward pass.
        for w, b in zip(self.weights, self.biases, strict=True):
            f = w @ gs[-1] + b
            fs.append(f)
            g = sigmoid(f)
            gs.append(g)

        assert [f.shape for f in fs] == [(n,) for n in self.sizes[1:]], f'Shape mismatch: {[f.shape for f in fs]} vs {self.sizes[1:]}'
        assert [g.shape for g in gs] == [(n,) for n in self.sizes], f'Shape mismatch: {[g.shape for g in gs]} vs {self.sizes}'

        # Now go backward from the final cost applying backpropagation.
        grad_g = self.cost_derivative(gs[-1], y)  # shape initially (N^last), then layer by layer.

        # TODO backward pass. May be useful:
        # - reversed(list(...)) can be used to iterate in reverse.
        # - list.reverse() reverses a list in-place.
        # - np.outer() computes an outer product (a_{i,j} = b_i c_j) very fast.

        grad_f_list = []
        for w, g in reversed(list(zip(self.weights, gs[1:], strict=True))):
            grad_f = grad_g * g * (1 - g)
            grad_f_list.append(grad_f)
            grad_g = w.T @ grad_f

        grad_f_list.reverse()


        grads_w = [np.outer(grad_f, g) for grad_f, g in zip(grad_f_list, gs[:-1], strict=True)]
        grads_b = grad_f_list


        # Now grads_w should have shapes (N^1, N^0), ..., (N^last, N^{last-1}).
        # Now grads_b should have shapes (N^1), ..., (N^last).
        for grad_b, b in zip(grads_b, self.biases, strict=True):
            assert grad_b.shape == b.shape, f'Shape mismatch: {grad_b.shape=} but {b.shape=}'
        for grad_w, w in zip(grads_w, self.weights, strict=True):
            assert grad_w.shape == w.shape, f'Shape mismatch: {grad_w.shape=} but {w.shape=}'

        return grads_w, grads_b

    def cost_derivative(self, g: FloatNDArray, y: FloatNDArray) -> FloatNDArray:
        '''
        Gradient of loss (MSE) over output activations, for a single sample.

        Args:
        - g: output activations, shape (N^last).
        - y: target values (one-hot encoded labels), shape (N^last).

        Returns gradients, shape (N^last).
        '''
        assert g.shape == y.shape, f'Shape mismatch: {g.shape=} but {y.shape=}'
        N_last, = g.shape
        return (2 / N_last) * (g - y.astype(np.float64))

    def evaluate(
        self, x_test_data: FloatNDArray, y_test_data: FloatNDArray
    ) -> np.float64:
        '''
        Compute accuracy: the ratio of correct answers for test_data.

        Args (here B is the number of test dataitems):
        - x_test_data: shape (B, N^0).
        - y_test_data: shape (B, N^last).
        '''
        test_results: list[bool] = []
        for x, y in zip(x_test_data, y_test_data):
            output_label: np.int64 = np.argmax(self.feedforward(x))
            target_label: np.int64 = np.argmax(y)
            test_results.append(output_label == target_label)

        return np.mean(test_results)

    def SGD(
        self,
        training_data: tuple[FloatNDArray, FloatNDArray],
        test_data: tuple[FloatNDArray, FloatNDArray] | None = None,
        epochs: int = 2,
        mini_batch_size: int = 100,
        learning_rate: float = 1.0,
    ) -> None:
        x_train, y_train = training_data
        for epoch in tqdm(range(epochs)):
            for i in range(x_train.shape[0] // mini_batch_size):
                i_begin = i * mini_batch_size
                i_end = (i + 1) * mini_batch_size
                self.learning_step(x_train[i_begin:i_end], y_train[i_begin:i_end], learning_rate)
            if test_data:
                x_test, y_test = test_data
                accuracy = self.evaluate(x_test, y_test)
                tqdm.write(f'Epoch: {epoch}, Accuracy: {accuracy * 100:.2f} %')
            else:
                tqdm.write(f'Epoch: {epoch}')

In [8]:
import numpy as np
from typing import Sequence
from tqdm import tqdm

def sigmoid(z: np.ndarray) -> np.ndarray:
    return 1 / (1 + np.exp(-z))

FloatNDArray = np.ndarray


class Network:
    '''
    Multi-Layer Perceptron — simple fully connected NN with sigmoid activations.
    '''

    def __init__(self, sizes: Sequence[int] = (784, 30, 10)):
        '''
        Args:
        - sizes: sequence of layer widths [N^0, ... , N^last]
        '''
        self.sizes = list(sizes)
        self.biases = [np.random.randn(n) for n in sizes[1:]]
        self.weights = [
            np.random.randn(n_out, n_in) / np.sqrt(n_in)
            for n_in, n_out in zip(sizes[:-1], sizes[1:], strict=True)
        ]

    def feedforward(self, x: FloatNDArray) -> FloatNDArray:
        g = x
        for w, b in zip(self.weights, self.biases, strict=True):
            f = w @ g + b
            g = sigmoid(f)
        return g

    def learning_step(
        self, x_mini_batch: FloatNDArray, y_mini_batch: FloatNDArray, learning_rate: float
    ) -> None:
        grads_w = [np.zeros_like(w) for w in self.weights]
        grads_b = [np.zeros_like(b) for b in self.biases]

        for x, y in zip(x_mini_batch, y_mini_batch):
            item_grads_w, item_grads_b = self.backprop(x, y)
            for i in range(len(grads_w)):
                grads_w[i] += item_grads_w[i]
                grads_b[i] += item_grads_b[i]

        n = len(x_mini_batch)
        self.weights = [w - (learning_rate / n) * dw for w, dw in zip(self.weights, grads_w, strict=True)]
        self.biases = [b - (learning_rate / n) * db for b, db in zip(self.biases, grads_b, strict=True)]


    def backprop(self, x: FloatNDArray, y: FloatNDArray):
        fs, gs = [], [x]

        # --- Forward pass ---
        for w, b in zip(self.weights, self.biases, strict=True):
            f = w @ gs[-1] + b
            fs.append(f)
            g = sigmoid(f)
            gs.append(g)

        # --- Backward pass ---
        grad_g = self.cost_derivative(gs[-1], y)
        grad_f_list = []

        for w, g in reversed(list(zip(self.weights, gs[1:], strict=True))):
            grad_f = grad_g * g * (1 - g)  # sigmoid'(f)
            grad_f_list.append(grad_f)
            grad_g = w.T @ grad_f

        grad_f_list.reverse()  # dopasowanie do kolejności warstw

        grads_w = [np.outer(grad_f, g_prev) for grad_f, g_prev in zip(grad_f_list, gs[:-1], strict=True)]
        grads_b = grad_f_list

        # sanity check
        for grad_b, b in zip(grads_b, self.biases, strict=True):
            assert grad_b.shape == b.shape, f"grad_b {grad_b.shape=} != b {b.shape=}"
        for grad_w, w in zip(grads_w, self.weights, strict=True):
            assert grad_w.shape == w.shape, f"grad_w {grad_w.shape=} != w {w.shape=}"

        return grads_w, grads_b

    def cost_derivative(self, g: FloatNDArray, y: FloatNDArray) -> FloatNDArray:
        assert g.shape == y.shape
        N_last, = g.shape
        return (2 / N_last) * (g - y.astype(np.float64))

    def evaluate(self, x_test_data: FloatNDArray, y_test_data: FloatNDArray) -> np.float64:
        results = [
            np.argmax(self.feedforward(x)) == np.argmax(y)
            for x, y in zip(x_test_data, y_test_data)
        ]
        return np.mean(results)

    def SGD(
        self,
        training_data: tuple[FloatNDArray, FloatNDArray],
        test_data: tuple[FloatNDArray, FloatNDArray] | None = None,
        epochs: int = 2,
        mini_batch_size: int = 100,
        learning_rate: float = 1.0,
    ) -> None:
        x_train, y_train = training_data
        for epoch in tqdm(range(epochs)):
            for i in range(x_train.shape[0] // mini_batch_size):
                i_begin = i * mini_batch_size
                i_end = (i + 1) * mini_batch_size
                self.learning_step(x_train[i_begin:i_end], y_train[i_begin:i_end], learning_rate)
            if test_data:
                x_test, y_test = test_data
                acc = self.evaluate(x_test, y_test)
                tqdm.write(f"Epoch {epoch}: Accuracy {acc * 100:.2f}%")
            else:
                tqdm.write(f"Epoch {epoch}")


In [None]:
%%time
# Smaller test on part of the dataset.
# The unvectorized version should take about 2s per epoch
# and achieve accuracy ~75% or more (for most executions).
network = Network([784, 30, 10])
network.SGD(
    (x_train[:3000], y_train[:3000]),
    test_data=(x_test[:3000], y_test[:3000]),
    epochs=5,
    mini_batch_size=100,
    learning_rate=10.0
)

In [None]:
%%time
# Full test.
# The unvectorized version should take about 20s per epoch
# and achieve accuracy ~90% or more.
network = Network([784, 30, 10])
network.SGD(
    (x_train, y_train),
    test_data=(x_test, y_test),
    epochs=10,
    mini_batch_size=100,
    learning_rate=5.0
)

## Exercise 2

Implement a "fully vectorized" version, i.e. one using matrix operations instead of going over examples one by one within a minibatch.


In [9]:
class NetworkVectorized:
    '''Multi-Layer Perceptron with vectorized (batched) methods.'''

    def __init__(self, sizes: Sequence[int] = (784, 30, 10)):
        '''
        Args:
        - sizes: sequence of layer widths [N^0, ... , N^last]
          These are lengths of activation vectors, where:
          - N^0 is input size: 784.
          - N^last is the number of classes into which we can classify each input: 10.
        '''
        self.sizes = list(sizes)

        # We initialize weights and biases with random normal distribution.

        # List of len(sizes) - 1 vectors of shape (N^1), (N^2), ..., (N^last)
        self.biases = [np.random.randn(n) for n in sizes[1:]]

        # List of len(sizes) - 1 matrices of shape (N^i, N^{i-1}).
        # Weights are indexed by target node first.
        self.weights = [
            np.random.randn(n_out, n_in) / np.sqrt(n_in)
            for n_in, n_out in zip(sizes[:-1], sizes[1:], strict=True)
        ]

    def feedforward(self, x: FloatNDArray) -> FloatNDArray:
        '''
        Run the network on a batch of cases of shape (B, N^0), values 0..1.

        Returns last layer activations, shape (B, N^last), values 0..1.
        '''
        g = x
        for w, b in zip(self.weights, self.biases, strict=True):
            # TODO
            f = g @ w.T + b
            g = sigmoid(f)
            pass
        return g

    def learning_step(self, x_mini_batch: FloatNDArray, y_mini_batch: FloatNDArray, learning_rate: float) -> None:
        '''
        Update network parameters with a single mini-batch step of backpropagation and gradient descent.

        Args:
        - x_mini_batch: shape (B, N^0) where B is mini_batch_size.
        - y_mini_batch: shape (B, N^last).
        - learning_rate.
        '''
        grads_w, grads_b = self.backprop_vectorized(x_mini_batch, y_mini_batch)

        # Gradient descent step.
        self.weights = [
            w - learning_rate * grad_w
            for w, grad_w in zip(self.weights, grads_w, strict=True)
        ]
        self.biases = [
            b - learning_rate * grad_b
            for b, grad_b in zip(self.biases, grads_b, strict=True)
        ]

    def backprop_vectorized(
        self, x: FloatNDArray, y: FloatNDArray
    ) -> tuple[list[FloatNDArray], list[FloatNDArray]]:
        '''Backpropagation for a mini-batch (vectorized).

        Args:
        - x: input, shape (B, N^0)
        - y: target label (one-hot encoded), shape (B, N^last)

        Returns (grads_w, grads_b), where:
        - grads_w: list of gradients over weights (shape (N^i, N^{i-1})), for each layer.
        - grads_b: list of gradients over biases (shape (N^i)), for each layer.
        '''
        B, N0 = x.shape
        assert N0 == self.sizes[0]

        # Go forward, remembering all activations.

        # Values after activation function (including inputs to the first layer),
        # shapes (B, N^0), (B, N^1), ..., (B, N^last).
        gs: list[FloatNDArray] = [x]

        # TODO
        for w, b in zip(self.weights, self.biases, strict=True):
            f = gs[-1] @ w.T + b
            g = sigmoid(f)
            gs.append(g)

        assert [g.shape for g in gs] == [(B, n) for n in self.sizes], \
            f'Shape mismatch: {[g.shape for g in gs]} vs {[(B, n) for n in self.sizes]}'

        # Now go backward from the final cost applying backpropagation.
        grad_g = self.cost_derivative(gs[-1], y)  # shape initially (B, N^last), then layer by layer.

        # TODO
        grads_w = []
        grads_b = []
        """for w, g in reversed(list(zip(self.weights, self.biases, strict=True))):
          grad_f = grad_g*g(1-g)
          grads_w.append(np.outer(grad_f, gs[-1]))
          grads_b.append(grad_f)
          grad_g = w.T @ grad_f"""

        for layer_idx in reversed(range(len(self.weights))):
          w = self.weights[layer_idx]
          g = gs[layer_idx +1]
          g_prev = gs[layer_idx]

          grad_f = grad_g*g*(1-g)  #(B,Nout)
          grad_b = np.mean(grad_f, axis =0) #(Nout,)
          grad_w =grad_f.T @ g_prev / B #(Nout,Nin)
          grad_g = grad_f @ w #(B,Nin)

          grads_w.append(grad_w)
          grads_b.append(grad_b)

        grads_w.reverse()
        grads_b.reverse()

        # Now grads_w should have shapes (N^1, N^0), ..., (N^last, N^{last-1}).
        # Now grads_b should have shapes (N^1,) ..., (N^last,).
        for grad_b, b in zip(grads_b, self.biases, strict=True):
            assert grad_b.shape == b.shape, f'Shape mismatch: {grad_b.shape=} but {b.shape=}'
        for grad_w, w in zip(grads_w, self.weights, strict=True):
            assert grad_w.shape == w.shape, f'Shape mismatch: {grad_w.shape=} but {w.shape=}'

        return grads_w, grads_b

    def cost_derivative(self, g: FloatNDArray, y: FloatNDArray) -> FloatNDArray:
        '''
        Gradient of loss (MSE) over output activations.

        Args:
        - g: output activations, shape (B, N^last).
        - y: target values (one-hot encoded labels), shape (B, N^last).

        Returns gradients, shape (B, N^last).
        '''
        assert g.shape == y.shape, f'Shape mismatch: {g.shape=} but {y.shape=}'
        B, N_last = g.shape
        return (2 / (B * N_last)) * (g - y.astype(np.float64))

    def evaluate(self, x_test_data: FloatNDArray, y_test_data: FloatNDArray) -> np.float64:
        '''
        Compute accuracy: the ratio of correct answers for test_data.

        Args:
        - x_test_data: shape (B, N^0).
        - y_test_data: shape (B, N^last).
        '''
        predictions = np.argmax(self.feedforward(x_test_data), axis=1)
        targets = np.argmax(y_test_data, axis=1)
        return np.mean(predictions == targets)

    def SGD(
        self,
        training_data: tuple[FloatNDArray, FloatNDArray],
        test_data: tuple[FloatNDArray, FloatNDArray] | None = None,
        epochs: int = 2,
        mini_batch_size: int = 100,
        learning_rate: float = 1.0
    ) -> None:
        x_train, y_train = training_data
        for epoch in tqdm(range(epochs)):
            for i in range(x_train.shape[0] // mini_batch_size):
                i_begin = i * mini_batch_size
                i_end = (i + 1) * mini_batch_size
                self.learning_step(x_train[i_begin:i_end], y_train[i_begin:i_end], learning_rate)
            if test_data:
                x_test, y_test = test_data
                accuracy = self.evaluate(x_test, y_test)
                print(f'Epoch: {epoch}, Accuracy: {accuracy * 100:.2f} %')
            else:
                print(f'Epoch: {epoch}')

In [None]:
%%time
# The vectorized version takes about ~1s per epoch.
network = NetworkVectorized([784, 30, 10])
network.SGD(
    (x_train, y_train),
    test_data=(x_test, y_test),
    epochs=50,
    mini_batch_size=100,
    learning_rate=5.0
)

# Exercise 3 (optional)

The standard backpropagation method requires memorization of all outputs of all layers computed during the forward pass, for use in the backward pass, which can take much of precious GPU memory.
Instead of doing that, one can checkpoint (memorize) only activations of a select few layers and then recompute the rest as they are needed (redoing the forward pass between checkpoints).  
Your task is to complete the code below to implement backpropagation with checkpoints.

In [24]:
class NetworkWithCheckpoints(NetworkVectorized):
    '''Multi-Layer Perceptron with gradient checkpointing.'''

    def __init__(self, sizes: Sequence[int] = (784, 30, 10), checkpoints: Sequence[int] = (1,)):
        '''
        Args:
        - sizes: sequence of layer widths [N^0, ... , N^last]
          These are lengths of activation vectors, where:
          - N^0 is input size: 784.
          - N^last is the number of classes into which we can classify each input: 10.
        - checkpoints: Indices of layers whose activations we want to checkpoint
          (between 1 and last inclusive (last=len(sizes) - 1).
        '''
        super().__init__(sizes)
        last_layer_id = len(sizes) - 1
        # Always store the input and last activations.
        self.checkpoints = sorted(set(checkpoints) | {0, last_layer_id})

    def backprop_generator(
        self, x: FloatNDArray, y: FloatNDArray
    ) -> Iterator[tuple[FloatNDArray, FloatNDArray]]:
        '''
        Backpropagation for a mini-batch (vectorized).

        Args:
        - x: input, shape (B, N^0)
        - y: target label (one-hot encoded), shape (B, N^last)

        Yields (grad_w, grad_b) for each layer from i=last down to i=1, where:
        - grads_w: shape (N^i, N^{i-1})).
        - grads_b: shape (N^i)).
        '''
        B, N0 = x.shape
        assert N0 == self.sizes[0]

        # Go forward, remembering only some activations and no pre-activations.

        # Values after activation function (including inputs to the first layer),
        # shapes (B, N^0), (B, N^1), ..., (B, N^last);
        # layers that are not checkpointed get None instead.
        gs: list[FloatNDArray | None] = [x]

        # TODO forward pass.
        g = x
        for i, (w, b) in enumerate(zip(self.weights, self.biases, strict=True), start=1):
          f = g @ w.T + b
          g = sigmoid(f)
          if i in self.checkpoints:
            gs.append(g)
          else:
            gs.append(None)

        # Now go backward from the final cost applying backpropagation.
        grad_g = self.cost_derivative(g, y)  # shape initially (B, N^last), then layer by layer.

        # TODO backward pass.
        # for each interval between consecutive checkpoints:
            # Forward pass to restore intermediate activations.
            # for all layers in interval:
            # ...
        #print("Checkpoints:", self.checkpoints)

        for start_i, end_i in zip(reversed(self.checkpoints[:-1]), reversed(self.checkpoints[1:])):
            #print(f"Processing segment: start_i={start_i}, end_i={end_i}")
            restored = [gs[start_i]]
            g = gs[start_i]
            for j in range(start_i+1,end_i+1):
              w, b = self.weights[j-1], self.biases[j-1]
              f = g @ w.T + b
              g = sigmoid(f)
              restored.append(g)

            for j in range(end_i, start_i, -1):
              g_prev = restored[j-start_i-1]
              g_curr = restored[j-start_i]
              w = self.weights[j-1]

              grad_f = grad_g*g_curr*(1-g_curr)
              grad_b = np.mean(grad_f, axis =0)
              grad_w =grad_f.T @ g_prev / B

              yield grad_w, grad_b

              grad_g = grad_f @ w

            # Now g and grad_g have shape (B, N^{end_i}).
            #assert g.shape == grad_g.shape == (B, self.sizes[end_i])

            # Backward pass between the checkpoints.
            # for all layers in the interval, backwards:
            #   # Compute grad_w, grad_b.
            #   # Yield them as we go, instead of returning a list.
            #   # https://docs.python.org/3/tutorial/classes.html#generators
            #   yield (grad_w, grad_b)

            # Now grad_g has shape (B, N^{start_i}).
            assert grad_g.shape == (B, self.sizes[start_i])

    @override
    def learning_step(self, x_mini_batch: FloatNDArray, y_mini_batch: FloatNDArray, learning_rate: float) -> None:
        '''
        Update network parameters with a single mini-batch step of backpropagation and gradient descent.

        Args:
        - x_mini_batch: shape (B, N^0) where B is mini_batch_size.
        - y_mini_batch: shape (B, N^last).
        - learning_rate.
        '''
        indices = range(len(self.sizes) - 1, 0, -1)  # From `last` down to 1 inclusive.
        for i, (grad_w, grad_b) in zip(indices, self.backprop_generator(x_mini_batch, y_mini_batch), strict=True):
            self.weights[i - 1] = self.weights[i - 1] - learning_rate * grad_w
            self.biases[i - 1] = self.biases[i - 1] - learning_rate * grad_b


In [27]:
%%time
# Compare on the same seed, results should be exactly the same.

np.random.seed(42)
network = NetworkWithCheckpoints([784, 20, 15, 13, 10], checkpoints=[2])
network.SGD(
    (x_train, y_train),
    test_data=(x_test, y_test),
    epochs=1,
    mini_batch_size=100,
    learning_rate=50.
)

np.random.seed(42)
network = NetworkVectorized([784, 20, 15, 13, 10])
network.SGD(
    (x_train, y_train),
    test_data=(x_test, y_test),
    epochs=50,
    mini_batch_size=100,
    learning_rate=50.
)

100%|██████████| 1/1 [00:01<00:00,  1.09s/it]


Epoch: 0, Accuracy: 10.27 %


  2%|▏         | 1/50 [00:00<00:39,  1.24it/s]

Epoch: 0, Accuracy: 10.27 %


  4%|▍         | 2/50 [00:01<00:38,  1.24it/s]

Epoch: 1, Accuracy: 11.35 %


  6%|▌         | 3/50 [00:02<00:38,  1.23it/s]

Epoch: 2, Accuracy: 11.35 %


  8%|▊         | 4/50 [00:03<00:38,  1.20it/s]

Epoch: 3, Accuracy: 11.35 %


 10%|█         | 5/50 [00:04<00:37,  1.19it/s]

Epoch: 4, Accuracy: 11.35 %


 12%|█▏        | 6/50 [00:04<00:36,  1.21it/s]

Epoch: 5, Accuracy: 11.35 %


 14%|█▍        | 7/50 [00:05<00:35,  1.20it/s]

Epoch: 6, Accuracy: 11.35 %


 16%|█▌        | 8/50 [00:08<00:59,  1.42s/it]

Epoch: 7, Accuracy: 11.35 %


 18%|█▊        | 9/50 [00:10<00:59,  1.45s/it]

Epoch: 8, Accuracy: 11.35 %


 20%|██        | 10/50 [00:10<00:50,  1.27s/it]

Epoch: 9, Accuracy: 11.41 %


 22%|██▏       | 11/50 [00:12<00:50,  1.29s/it]

Epoch: 10, Accuracy: 12.02 %


 24%|██▍       | 12/50 [00:13<00:43,  1.15s/it]

Epoch: 11, Accuracy: 14.09 %


 26%|██▌       | 13/50 [00:13<00:39,  1.06s/it]

Epoch: 12, Accuracy: 16.39 %


 28%|██▊       | 14/50 [00:14<00:35,  1.02it/s]

Epoch: 13, Accuracy: 18.01 %


 30%|███       | 15/50 [00:15<00:32,  1.07it/s]

Epoch: 14, Accuracy: 19.15 %


 32%|███▏      | 16/50 [00:16<00:30,  1.11it/s]

Epoch: 15, Accuracy: 19.92 %


 34%|███▍      | 17/50 [00:17<00:29,  1.13it/s]

Epoch: 16, Accuracy: 20.44 %


 36%|███▌      | 18/50 [00:18<00:27,  1.15it/s]

Epoch: 17, Accuracy: 20.71 %


 38%|███▊      | 19/50 [00:18<00:26,  1.17it/s]

Epoch: 18, Accuracy: 20.76 %


 40%|████      | 20/50 [00:19<00:28,  1.06it/s]

Epoch: 19, Accuracy: 20.83 %


 42%|████▏     | 21/50 [00:23<00:45,  1.59s/it]

Epoch: 20, Accuracy: 22.18 %


 44%|████▍     | 22/50 [00:23<00:38,  1.37s/it]

Epoch: 21, Accuracy: 25.70 %


 46%|████▌     | 23/50 [00:24<00:32,  1.22s/it]

Epoch: 22, Accuracy: 27.56 %


 48%|████▊     | 24/50 [00:25<00:28,  1.11s/it]

Epoch: 23, Accuracy: 27.30 %


 50%|█████     | 25/50 [00:26<00:25,  1.02s/it]

Epoch: 24, Accuracy: 26.57 %


 52%|█████▏    | 26/50 [00:27<00:23,  1.03it/s]

Epoch: 25, Accuracy: 25.83 %


 54%|█████▍    | 27/50 [00:28<00:21,  1.06it/s]

Epoch: 26, Accuracy: 25.57 %


 56%|█████▌    | 28/50 [00:29<00:20,  1.10it/s]

Epoch: 27, Accuracy: 26.38 %


 58%|█████▊    | 29/50 [00:29<00:18,  1.12it/s]

Epoch: 28, Accuracy: 28.21 %


 60%|██████    | 30/50 [00:30<00:17,  1.16it/s]

Epoch: 29, Accuracy: 30.01 %


 62%|██████▏   | 31/50 [00:31<00:16,  1.17it/s]

Epoch: 30, Accuracy: 31.62 %


 64%|██████▍   | 32/50 [00:32<00:15,  1.17it/s]

Epoch: 31, Accuracy: 36.05 %


 66%|██████▌   | 33/50 [00:34<00:20,  1.19s/it]

Epoch: 32, Accuracy: 40.40 %


 68%|██████▊   | 34/50 [00:36<00:23,  1.50s/it]

Epoch: 33, Accuracy: 43.46 %


 70%|███████   | 35/50 [00:37<00:19,  1.30s/it]

Epoch: 34, Accuracy: 46.67 %


 72%|███████▏  | 36/50 [00:38<00:16,  1.16s/it]

Epoch: 35, Accuracy: 49.84 %


 74%|███████▍  | 37/50 [00:39<00:13,  1.06s/it]

Epoch: 36, Accuracy: 52.96 %


 76%|███████▌  | 38/50 [00:39<00:11,  1.00it/s]

Epoch: 37, Accuracy: 55.28 %


 78%|███████▊  | 39/50 [00:40<00:10,  1.05it/s]

Epoch: 38, Accuracy: 57.36 %


 80%|████████  | 40/50 [00:41<00:09,  1.10it/s]

Epoch: 39, Accuracy: 58.99 %


 82%|████████▏ | 41/50 [00:42<00:07,  1.14it/s]

Epoch: 40, Accuracy: 60.23 %


 84%|████████▍ | 42/50 [00:43<00:06,  1.16it/s]

Epoch: 41, Accuracy: 61.45 %


 86%|████████▌ | 43/50 [00:44<00:05,  1.17it/s]

Epoch: 42, Accuracy: 62.76 %


 88%|████████▊ | 44/50 [00:44<00:05,  1.18it/s]

Epoch: 43, Accuracy: 64.17 %


 90%|█████████ | 45/50 [00:45<00:04,  1.18it/s]

Epoch: 44, Accuracy: 65.63 %


 92%|█████████▏| 46/50 [00:46<00:03,  1.05it/s]

Epoch: 45, Accuracy: 67.24 %


 94%|█████████▍| 47/50 [00:49<00:04,  1.56s/it]

Epoch: 46, Accuracy: 68.77 %


 96%|█████████▌| 48/50 [00:50<00:02,  1.34s/it]

Epoch: 47, Accuracy: 70.55 %


 98%|█████████▊| 49/50 [00:51<00:01,  1.19s/it]

Epoch: 48, Accuracy: 72.21 %


100%|██████████| 50/50 [00:52<00:00,  1.05s/it]

Epoch: 49, Accuracy: 73.51 %
CPU times: user 1min 32s, sys: 202 ms, total: 1min 32s
Wall time: 53.5 s





 Memory usage can be checked by installing `memory_profiler` and using `%%memit` (instead of `%%time`),
 but it will be very hard to see a difference from checkpointing for an MLP.

# JAX Playground (Optional)
JAX is a framework that allows the creation of neural networks with numpy-like syntax.  
In this course, we will use Pytorch instead of JAX, but for this lab scenario, JAX can help us test our gradient computation implementation.  
Let's give it a try  

In [28]:
!pip3 install jax



In [32]:
import jax
import jax.numpy as jnp
from textwrap import dedent


def jax_sigmoid(z: jax.Array) -> jax.Array:
    return 1.0 / (1.0 + jnp.exp(-z))


def jax_sigmoid_prime(z: jax.Array) -> jax.Array:
    return sigmoid(z) * (1 - sigmoid(z))


# Define a jax function.
# We emphasize that this is a function, not a jax procedure,
# and in fact there are more requirements for writing good jax code,
# but this is just an example.
# (see https://jax.readthedocs.io/en/latest/tutorials.html)
def jax_forward(x: jax.Array, w: jax.Array, b: jax.Array) -> jax.Array:
    # f = TODO
    f = x @ w.T + b
    g = jax_sigmoid(f)
    loss = g.sum()  # Just a dummy loss for simplicity.
    return loss, g


# This will calculate gradient for first, second, and third argument.
# has_aux tells that in addition to loss our function returns something more.
auto_backward = jax.value_and_grad(fun=jax_forward, argnums=[0, 1, 2], has_aux=True)


def manual_backward(x: jax.Array, w: jax.Array, b: jax.Array) -> jax.Array:
    # f = TODO
    f = x @ w.T + b
    g = jax_sigmoid(f)

    grad_g = jnp.ones_like(f)  # Grad of the dummy loss over activations g.
    # grad_f = TODO
    grad_f =grad_g*g*(1-g)
    # grad_b = TODO
    grad_b = jnp.mean(grad_f, axis=0)
    #grad_b  = grad_f.sum(axis=0)
    # grad_w = TODO
    grad_w = grad_f.T @ x
    # grad_x = TODO
    grad_x = grad_f @ w
    return grad_x, grad_w, grad_b


def example():
    B, N0, N1 = 3, 5, 7

    key = jax.random.key(42)
    key, subkey = jax.random.split(key)
    w = jax.random.normal(subkey, (N1, N0))
    key, subkey = jax.random.split(key)
    b = jax.random.normal(subkey, (N1,))
    x = jnp.arange(N0, dtype=w.dtype).reshape(1, N0) * jnp.ones((B, N0))

    (loss, res), (jax_dx, jax_dw, jax_db) = auto_backward(x, w, b)
    dx, dw, db = manual_backward(x, w, b)

    print(dedent(f'''
        diff dx = {jnp.mean(jnp.abs(jax_dx - dx))}
        diff dw = {jnp.mean(jnp.abs(jax_dw - dw))}
        diff db = {jnp.mean(jnp.abs(jax_db - db))}
        dtype={dx.dtype} (eps={np.finfo(dx.dtype).eps})
    ''').strip())


example()

diff dx = 4.619360183255594e-08
diff dw = 9.451593996345764e-08
diff db = 0.09255658835172653
dtype=float32 (eps=1.1920928955078125e-07)
