# Assignment 1 - Code Example - Part B

This achieves an accuracy of 93.26% on test set.

In [None]:
# provide some basic operators like matrix multiplication
import cupy as cp

## Some Useful Classes

In [None]:
# base class
class Module:
    @property
    def params(self): # trainable parameters
        return []

    def __call__(self, *args, **kwargs):
        return self.forward( *args, **kwargs)

# sequential module
class Sequential(Module, list):
    def __init__(self, *module_lst):
        super().__init__(module_lst)

    @property
    def params(self):
        return sum([m.params for m in self], []) # concat all params

    def forward(self, x):
        y = x
        for module in self:
            y = module(y)
        return y

    def backward(self, dy):
        dx = dy
        for module in self[::-1]:
            dx = module.backward(dx)
        return dx

## Softplus

This implements the [Softplus](https://pytorch.org/docs/stable/generated/torch.nn.Softplus.html) function.

$y = \frac{1}{\beta} \ln(1+e^{\beta x})$

$y' = \frac{1}{1+e^{-\beta x}}$

Default: $\beta=1$

$e^{\beta x}$ might be too large and unstable; so we use linear function to approximate it when $\beta x$ is above the threshold $20$.

In [None]:
class Softplus(Module):
    def __init__(self, beta=1.0, threshold=20.0):
        assert beta > 0 and threshold > 0
        self.beta = beta
        self.threshold = threshold

    def forward(self, x):
        self.beta_x = self.beta * x # save the input for backward use
        y = cp.log(1 + cp.exp(self.beta_x)) / self.beta
        y_relu = cp.where(x > 0, x, 0)
        return cp.where(x < self.threshold, y, y_relu)

    def backward(self, dy):
        grad = 1 / (1 + cp.exp(-self.beta_x))
        grad_relu = cp.where(self.beta_x > 0, 1, 0)
        return dy * cp.where(self.beta_x < self.threshold, grad, grad_relu)

## LinearNoBias

This implements the [Linear](https://pytorch.org/docs/stable/generated/torch.nn.Linear.html) layer but without the bias term.

$y = x W^T$

$dy/dx = W$

$dy/dW = x$

In [None]:
class LinearNoBias(Module):
    def __init__(self, in_features, out_features):
        self.weight = (cp.random.rand(out_features, in_features) * 2 - 1) / in_features ** 0.5
        self.weight_grad = cp.zeros_like(self.weight)

    @property
    def params(self):
        return [dict(val=self.weight, grad=self.weight_grad)]

    def forward(self, x):
        self.x = x
        return x @ self.weight.T

    def backward(self, dy):
        self.weight_grad[:] = dy.T @ self.x
        return dy @ self.weight


## CrossEntropyLoss

This implements the [CrossEntropyLoss](https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html) loss.


In [None]:
def onehot(x, num_classes=10):
    y = cp.zeros([len(x), num_classes])
    y[cp.arange(len(x)), x] = 1
    return y


class CrossEntropyLoss(Module):
    def forward(self, x_logit, x_target):
        self.x_logit = x_logit
        self.x_target = x_target

        # softmax with numerical stability
        x_logit_sub = cp.exp(x_logit - cp.max(x_logit, axis=1, keepdims=True))
        x_softmax = x_logit_sub / cp.sum(x_logit_sub, axis=1, keepdims=True)
        x_softmax = cp.clip(x_softmax, a_min=1e-15, a_max=None)  # Corrected line
        self.x_softmax = x_softmax

        # loss calculation
        loss_x = -cp.log(x_softmax)[cp.arange(len(x_target)), x_target]
        return loss_x.mean()

    def backward(self, dy):
        return dy * (self.x_softmax - onehot(self.x_target)) / len(self.x_logit)

## Optimizer

In [None]:
class Optim:
    def __init__(self, params, lr=0.01):
        self.params = params
        self.lr = lr

    def zero_grad(self):
        for idx in range(len(self.params)):
            self.params[idx]["grad"][:] = 0.0

In [None]:
class SGD(Optim):
    def step(self):
        for idx in range(len(self.params)):
            self.params[idx]["val"] -= self.lr * self.params[idx]["grad"]

##Sigmoid

In [None]:
class Sigmoid(Module):
    def forward(self, x):
        self.sigmoid = 1 / (1 + cp.exp(-x))
        return self.sigmoid

    def backward(self, dy):
        return dy * self.sigmoid * (1 - self.sigmoid)

##LeakyReLU Activation

In [None]:
class LeakyReLU(Module):
    def __init__(self, negative_slope=0.01):
        self.negative_slope = negative_slope

    def forward(self, x):
        self.x = x
        return cp.where(x > 0, x, self.negative_slope * x)

    def backward(self, dy):
        return dy * cp.where(self.x > 0, 1, self.negative_slope)

##SELU Activation

In [None]:
class SELU(Module):
    def __init__(self):
        self.alpha = 1.6732632423543772848170429916717
        self.scale = 1.0507009873554804934193349852946

    def forward(self, x):
        self.x = x
        mask = x > 0
        return self.scale * (x * mask + self.alpha * (cp.exp(x * ~mask) - 1) * ~mask)

    def backward(self, dy):
        mask = self.x > 0
        grad = cp.where(mask, self.scale, self.scale * self.alpha * cp.exp(self.x))
        return dy * grad

##Linear Layer

In [None]:
class Linear(Module):
    def __init__(self, in_features, out_features, bias=True):
        self.in_features = in_features
        self.out_features = out_features
        self.weight = cp.random.randn(out_features, in_features) * cp.sqrt(2. / in_features)
        self.weight_grad = cp.zeros_like(self.weight)
        if bias:
            self.bias = cp.zeros(out_features)
            self.bias_grad = cp.zeros_like(self.bias)
        else:
            self.bias = None

    @property
    def params(self):
        params = [{'val': self.weight, 'grad': self.weight_grad}]
        if self.bias is not None:
            params.append({'val': self.bias, 'grad': self.bias_grad})
        return params

    def forward(self, x):
        self.x = x
        output = x @ self.weight.T
        if self.bias is not None:
            output += self.bias
        return output

    def backward(self, dy):
        self.weight_grad[:] = dy.T @ self.x
        if self.bias is not None:
            self.bias_grad[:] = dy.sum(axis=0)
        return dy @ self.weight

##Conv2d Layer

In [None]:
from cupy.lib.stride_tricks import as_strided

class Conv2d(Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, bias=True):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = (kernel_size, kernel_size) if isinstance(kernel_size, int) else kernel_size
        self.stride = (stride, stride) if isinstance(stride, int) else stride
        self.bias = bias

        kernel_h, kernel_w = self.kernel_size
        scale = cp.sqrt(2. / (in_channels * kernel_h * kernel_w))  # He initialization
        self.weights = cp.random.normal(0, scale, size=(out_channels, in_channels, kernel_h, kernel_w))
        self.weights_grad = cp.zeros_like(self.weights)

        if self.bias:
            self.bias_weights = cp.zeros(out_channels)
            self.bias_grad = cp.zeros_like(self.bias_weights)
        else:
            self.bias_weights = None
            self.bias_grad = None

        self.x = None
        self.H_out = None
        self.W_out = None

    def forward(self, x):
        self.x = x
        N, C_in, H_in, W_in = x.shape
        kernel_h, kernel_w = self.kernel_size
        stride_h, stride_w = self.stride

        H_out = (H_in - kernel_h) // stride_h + 1
        W_out = (W_in - kernel_w) // stride_w + 1
        self.H_out = H_out
        self.W_out = W_out

        x_col = self.im2col(x, kernel_h, kernel_w, stride_h, stride_w, H_out, W_out)

        weight_matrix = self.weights.reshape(self.out_channels, -1)
        output = weight_matrix @ x_col.T
        output = output.reshape(self.out_channels, N, H_out, W_out).transpose(1, 0, 2, 3)

        if self.bias:
            output += self.bias_weights.reshape(1, -1, 1, 1)

        return output

    def backward(self, grad_output):
        x = self.x
        N, C_in, H_in, W_in = x.shape
        C_out = self.out_channels
        kernel_h, kernel_w = self.kernel_size
        stride_h, stride_w = self.stride
        H_out = self.H_out
        W_out = self.W_out

        x_col = self.im2col(x, kernel_h, kernel_w, stride_h, stride_w, H_out, W_out)

        grad_output_reshaped = grad_output.transpose(1, 0, 2, 3).reshape(C_out, -1)

        self.weights_grad[...] = (grad_output_reshaped @ x_col).reshape(self.weights.shape)

        if self.bias:
            self.bias_grad[...] = grad_output.sum(axis=(0, 2, 3))

        weight_matrix = self.weights.reshape(C_out, C_in * kernel_h * kernel_w)
        dx_col = weight_matrix.T @ grad_output_reshaped  # (C_in * K * K, N * H_out * W_out)

        dx_col = dx_col.T.reshape(N, H_out, W_out, C_in, kernel_h, kernel_w)
        dx_col = dx_col.transpose(0, 3, 4, 5, 1, 2)

        dx = cp.zeros_like(x)
        dx_view = as_strided(dx,
            shape=(N, C_in, kernel_h, kernel_w, H_out, W_out),
            strides=(
                dx.strides[0],
                dx.strides[1],
                dx.strides[2],
                dx.strides[3],
                stride_h * dx.strides[2],
                stride_w * dx.strides[3],
            )
        )
        dx_view += dx_col

        return dx

    def im2col(self, x, kernel_h, kernel_w, stride_h, stride_w, H_out, W_out):
        N, C_in, H_in, W_in = x.shape
        shape = (N, C_in, kernel_h, kernel_w, H_out, W_out)
        strides = (
            x.strides[0],
            x.strides[1],
            x.strides[2],
            x.strides[3],
            stride_h * x.strides[2],
            stride_w * x.strides[3],
        )
        x_col = as_strided(x, shape=shape, strides=strides)
        x_col = x_col.transpose(0, 4, 5, 1, 2, 3).reshape(N * H_out * W_out, -1)
        return x_col

    @property
    def params(self):
        if self.bias:
            return [
                dict(val=self.weights, grad=self.weights_grad),
                dict(val=self.bias_weights, grad=self.bias_grad)
            ]
        else:
            return [dict(val=self.weights, grad=self.weights_grad)]

##Dropout Layer

In [None]:
class Dropout(Module):
    def __init__(self, p=0.5):
        self.p = p
        self.mask = None

    def forward(self, x, training=True):
        if training:
            self.mask = (cp.random.rand(*x.shape) > self.p) / (1 - self.p)
            return x * self.mask
        return x

    def backward(self, dy):
        return dy * self.mask

##BatchNorm2d Layer

In [None]:
class BatchNorm2d(Module):
    def __init__(self, num_features, eps=1e-5, momentum=0.1):
        self.eps = eps
        self.momentum = momentum
        self.gamma = cp.ones(num_features)
        self.beta = cp.zeros(num_features)
        self.running_mean = cp.zeros(num_features)
        self.running_var = cp.ones(num_features)
        self.gamma_grad = cp.zeros_like(self.gamma)
        self.beta_grad = cp.zeros_like(self.beta)

    @property
    def params(self):
        return [{'val': self.gamma, 'grad': self.gamma_grad},
                {'val': self.beta, 'grad': self.beta_grad}]

    def forward(self, x, training=True):
        if training:
            mean = x.mean(axis=(0, 2, 3), keepdims=True)
            var = x.var(axis=(0, 2, 3), keepdims=True)
            self.x_hat = (x - mean) / cp.sqrt(var + self.eps)
            self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * mean.squeeze()
            self.running_var = (1 - self.momentum) * self.running_var + self.momentum * var.squeeze()
        else:
            self.x_hat = (x - self.running_mean[None, :, None, None]) / cp.sqrt(self.running_var[None, :, None, None] + self.eps)
        return self.gamma[None, :, None, None] * self.x_hat + self.beta[None, :, None, None]

    def backward(self, dy):
        N, C, H, W = dy.shape
        dx_hat = dy * self.gamma[None, :, None, None]
        dvar = (dx_hat * self.x_hat).sum(axis=(0, 2, 3), keepdims=True) * (-0.5) * (self.x_hat ** 2 + self.eps) ** (-1.5)
        dmean = (dx_hat * (-1 / cp.sqrt(self.x_hat ** 2 + self.eps))).sum(axis=(0, 2, 3), keepdims=True) + dvar * (-2 * self.x_hat).mean(axis=(0, 2, 3), keepdims=True)
        dx = (dx_hat / cp.sqrt(self.x_hat ** 2 + self.eps)) + dvar * 2 * self.x_hat / (N * H * W) + dmean / (N * H * W)
        self.gamma_grad[:] = (dy * self.x_hat).sum(axis=(0, 2, 3))
        self.beta_grad[:] = dy.sum(axis=(0, 2, 3))
        return dx

##Focal Loss

In [None]:
class FocalLoss(Module):
    def __init__(self, alpha=0.25, gamma=2):
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, x, target):
        x_exp = cp.exp(x - cp.max(x, axis=1, keepdims=True))
        softmax = x_exp / x_exp.sum(axis=1, keepdims=True)
        self.pt = softmax[cp.arange(len(target)), target]
        loss = -self.alpha * (1 - self.pt) ** self.gamma * cp.log(self.pt)
        return loss.mean()

    def backward(self, dy):
        batch_size = len(self.pt)
        pt = self.pt
        grad = cp.zeros_like(self.pt)
        grad_term = self.alpha * (1 - pt) ** self.gamma * (self.gamma * pt * cp.log(pt) + pt - 1)
        grad = grad_term * (1 - self.pt) / batch_size
        return grad

##SGD with Momentum

In [None]:
class SGD(Optim):
    def __init__(self, params, lr=0.01, momentum=0.9):
        super().__init__(params, lr)
        self.momentum = momentum
        self.velocities = [cp.zeros_like(p['val']) for p in params]

    def step(self):
        for i, param in enumerate(self.params):
            self.velocities[i] = self.momentum * self.velocities[i] + param['grad']
            param['val'] -= self.lr * self.velocities[i]

##Adam Optimizer

In [None]:
class Adam(Optim):
    def __init__(self, params, lr=0.001, beta1=0.9, beta2=0.999, eps=1e-8):
        super().__init__(params, lr)
        self.beta1 = beta1
        self.beta2 = beta2
        self.eps = eps
        self.m = [cp.zeros_like(p['val']) for p in params]
        self.v = [cp.zeros_like(p['val']) for p in params]
        self.t = 0

    def step(self):
        self.t += 1
        for i, param in enumerate(self.params):
            self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * param['grad']
            self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * (param['grad'] ** 2)
            m_hat = self.m[i] / (1 - self.beta1 ** self.t)
            v_hat = self.v[i] / (1 - self.beta2 ** self.t)
            param['val'] -= self.lr * m_hat / (cp.sqrt(v_hat) + self.eps)

In [None]:
class Flatten(Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        self.input_shape = x.shape
        return x.reshape(x.shape[0], -1)  # Flatten all dimensions except the batch dimension

    def backward(self, dy):
        return dy.reshape(self.input_shape)  # Reshape back to the original shape

## Your Network

In [None]:
net = Sequential(
    # Feature extractor
    Conv2d(1, 32, kernel_size=3), # 28 -> 26
    BatchNorm2d(32),
    LeakyReLU(),

    Conv2d(32, 64, kernel_size=3),
    BatchNorm2d(64),
    LeakyReLU(),

    Conv2d(64, 128, kernel_size=3),
    BatchNorm2d(128),
    LeakyReLU(),

    # Classifier
    Flatten(),
    Linear(22*22*128, 128),
    LeakyReLU(),
    Dropout(0.3),
    Linear(128, 10)
)

loss_fn = CrossEntropyLoss()

## Training

In [None]:
# torch and torchvision provide some very handy utilities for dataset loading
from torch.utils.data import DataLoader
import torchvision.datasets as tv_datasets
import torchvision.transforms as tv_transforms

In [None]:
# some experimental setup
num_epochs = 5
batch_size = 128
num_workers = 2
print_every = 100

In [None]:
# prepare datasets
dataset, loader = {}, {}
for data_type in ("train", "test"):
    is_train = data_type=="train"
    dataset[data_type] = tv_datasets.MNIST(
        root="./data", train=is_train, download=True,
        transform=tv_transforms.Compose([ # preprocessing pipeline for input images
            tv_transforms.ToTensor(),
            tv_transforms.Normalize((0.1307,), (0.3081,)),
    ]))
    loader[data_type] = DataLoader(
        dataset[data_type], batch_size=batch_size, shuffle=is_train, num_workers=num_workers,
    )


In [None]:
optimizer = Adam(
    params=net.params,
    lr=1e-4,
    beta1=0.9,
    beta2=0.999,
    eps=1e-8
)

for epoch in range(num_epochs):

    running_loss = 0.0
    for i, (img, target) in enumerate(loader["train"]):
        img, target = cp.asarray(img.numpy()), cp.asarray(target.numpy())
        # img = img.reshape(-1, 784)

        loss = loss_fn(net(img), target)

        net.backward(loss_fn.backward(loss))
        optimizer.step()
        optimizer.zero_grad()

        # print statistics
        running_loss += loss.item()
        if i % print_every == print_every - 1:
            print(f"[epoch={epoch + 1:3d}, iter={i + 1:5d}] loss: {running_loss / print_every:.3f}")
            running_loss = 0.0

print("Finished Training")

[epoch=  1, iter=  100] loss: 0.428
[epoch=  1, iter=  200] loss: 0.233
[epoch=  1, iter=  300] loss: 0.194
[epoch=  1, iter=  400] loss: 0.154
[epoch=  2, iter=  100] loss: 0.110
[epoch=  2, iter=  200] loss: 0.105
[epoch=  2, iter=  300] loss: 0.094
[epoch=  2, iter=  400] loss: 0.094
[epoch=  3, iter=  100] loss: 0.072
[epoch=  3, iter=  200] loss: 0.070
[epoch=  3, iter=  300] loss: 0.073
[epoch=  3, iter=  400] loss: 0.065
[epoch=  4, iter=  100] loss: 0.050
[epoch=  4, iter=  200] loss: 0.052
[epoch=  4, iter=  300] loss: 0.049
[epoch=  4, iter=  400] loss: 0.050
[epoch=  5, iter=  100] loss: 0.040
[epoch=  5, iter=  200] loss: 0.038
[epoch=  5, iter=  300] loss: 0.039
[epoch=  5, iter=  400] loss: 0.041
[epoch=  6, iter=  100] loss: 0.034


KeyboardInterrupt: 

Need to be stopped because the GPU quota is running out

## Evaluate

In [None]:
# for each test image
correct, total = 0, 0
for img, target in loader["test"]:
    img, target = cp.asarray(img.numpy()), cp.asarray(target.numpy())
    # img = img.reshape(-1, 784)

    # make prediction
    pred = net(img)

    # accumulate
    total += len(target)
    correct += (cp.argmax(pred, axis=1) == target).sum()

print(f"Accuracy of the network on the {total} test images: {100 * correct / total:.2f}%")

Accuracy of the network on the 10000 test images: 97.59%
