# <font color=Red># Prologue : CNNs implementation without Torch </font>

해당 프로젝트에서는 PyTorch 없이 CNNs를 구현하고, </br>
간단한 이미지(MNIST 데이터세트)를 분류하는 Classification Task를 수행합니다. </br>
해당 프로젝트의 Experiment Setting은 CNNs Implementation with Torch를 참고합니다. </br> 
</br>
### Preliminary 
모든 Neural Networks는 세 개의 요소로 구성되어 있습니다. </br>
- ```Architecture``` : Neuron과 Layer를 Initialization하고, Forward Propagation을 수행합니다.</br>
- ```Cost function``` : Model과 Label간의 차이를 계산합니다. </br>
- ```Optimization``` : Backward Propagation을 수행하고, weight와 bias로 구성된 Parameter를 Update합니다. </br>

이 세 가지의 구성요소를 코드로 구현하는 것은 매우 복잡하고 지루한 과정이기 때문에, </br>
자주 사용되는 method는 ```Tensorflow```나 ```PyTorch```와 같은 Library에서 제공되는 것을 사용하는 것이 일반적입니다. </br>
</br>
그러나 만약 Library를 사용하지 않고 CNNs를 구현한다면, 어떤 부분이 기존 Library에서 제공되었는지를 파악해야 합니다, </br>

In [1]:
import numpy as np
import cupy as cp
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

  from .autonotebook import tqdm as notebook_tqdm


# <font color=Red># Modulization </font>

먼저, 신경망에 사용되는 여러 가지 함수들을 ```Module```화 하는 것이 중요합니다. </br>
여기서 Module이란 여러 학문 분야에서 사용하는 용어인데, Computer Science에서 사용되는 경우 </br>
특정 프로그램을 구성하는 기본 단위를 의미합니다. </br>
</br>
신경망을 구현하는 데 필요한 각종 Module들은 앞서 설명되었듯이, 여러 Library에서 제공되지만 </br>
해당 프로젝트에서는 Torch 없이 Module들을 구현하는 것을 목표로 하므로, 어떤 Module이 사용되는지 먼저 살펴봅니다. </br>
신경망에서 자주 사용되는 Module들에는 다음과 같은 것들이 있습니다. </br>

### Convolution
가장 먼저, 이미지의 각종 특징(Feature)들을 추출하는 ```Convolution``` 연산을 수행합니다.</br>
Convolution은, ```Filter``` 혹은 ```Kernel```이라 불리는 Matrix를 원본 이미지에 곱함으로써 수행됩니다.</br>
이때, Filter Size는 ```VGGNet```의 설정을 참고하여, 모든 레이어에서 3x3으로 고정합니다.

**Ref:Very Deep Convolutional Networks for Large-Scale Image Recognition**

In [2]:
def conv2d(input, filters, bias, stride=1, padding=1):
    batch_size, in_channel, in_height, in_width = input.shape  # 입력 텐서의 형태 (배치 크기, 채널 수, 높이, 너비) 구함
    out_channel, _, filter_height, filter_width = filters.shape  # 필터(커널)의 형태 (출력 채널 수, 입력 채널 수, 필터 높이, 필터 너비) 구함
    
    # 입력 텐서에 패딩 추가
    padded_input = cp.pad(input, ((0, 0), (0, 0), (padding, padding), (padding, padding)), mode='constant')
    
    # 출력 텐서의 높이와 너비 계산
    out_height = (in_height - filter_height + 2 * padding) // stride + 1
    out_width = (in_width - filter_width + 2 * padding) // stride + 1

    # 출력 텐서 초기화 (배치 크기, 출력 채널 수, 출력 높이, 출력 너비)
    output = cp.zeros((batch_size, out_channel, out_height, out_width))
    
    # 출력 텐서의 각 위치에 대해 계산
    for i in range(out_height):
        for j in range(out_width):
            h_start = i * stride  # 입력 텐서에서 현재 위치의 시작 높이
            h_end = h_start + filter_height  # 입력 텐서에서 현재 위치의 끝 높이
            w_start = j * stride  # 입력 텐서에서 현재 위치의 시작 너비
            w_end = w_start + filter_width  # 입력 텐서에서 현재 위치의 끝 너비
            region = padded_input[:, :, h_start:h_end, w_start:w_end]  # 입력 텐서에서 현재 위치의 영역 추출

            # 각 출력 채널에 대해 필터를 적용하여 값 계산
            for k in range(out_channel):
                output[:, k, i, j] = cp.sum(region * filters[k, :, :, :], axis=(1, 2, 3))
    
    # 바이어스를 출력에 더해줌
    output += bias[None, :, None, None]
    return output  # 최종 출력 텐서 반환


def conv2d_backward(dout, input, filters, stride=1, padding=1):
    batch_size, in_channel, in_height, in_width = input.shape  # 입력 텐서의 형태 구함
    out_channel, _, filter_height, filter_width = filters.shape  # 필터(커널)의 형태 구함
    _, _, out_height, out_width = dout.shape  # dout의 형태 (배치 크기, 출력 채널 수, 출력 높이, 출력 너비) 구함

    # 입력 텐서에 패딩 추가
    padded_input = cp.pad(input, ((0, 0), (0, 0), (padding, padding), (padding, padding)), mode='constant')
    
    # 역전파를 위한 초기화
    dpadded_input = cp.zeros_like(padded_input)  # 패딩된 입력에 대한 변화도
    dfilters = cp.zeros_like(filters)  # 필터에 대한 변화도
    dbias = cp.sum(dout, axis=(0, 2, 3))  # 바이어스에 대한 변화도

    # 각 위치에서 역전파 수행
    for i in range(out_height):
        for j in range(out_width):
            h_start = i * stride  # 입력 텐서에서 현재 위치의 시작 높이
            h_end = h_start + filter_height  # 입력 텐서에서 현재 위치의 끝 높이
            w_start = j * stride  # 입력 텐서에서 현재 위치의 시작 너비
            w_end = w_start + filter_width  # 입력 텐서에서 현재 위치의 끝 너비
            region = padded_input[:, :, h_start:h_end, w_start:w_end]  # 입력 텐서에서 현재 위치의 영역 추출

            # 각 출력 채널에 대해 역전파 수행
            for k in range(out_channel):
                dfilters[k, :, :, :] += cp.sum(region * dout[:, k:k+1, i:i+1, j:j+1], axis=0)  # 필터의 변화도 누적
                dpadded_input[:, :, h_start:h_end, w_start:w_end] += filters[k, :, :, :] * dout[:, k:k+1, i:i+1, j:j+1]  # 패딩된 입력의 변화도 누적

    # 패딩을 제거하여 입력에 대한 변화도 계산
    dinput = dpadded_input[:, :, padding:-padding, padding:-padding] if padding else dpadded_input

    return dinput, dfilters, dbias  # 입력, 필터, 바이어스에 대한 변화도 반환


### Pooling
해당 이미지의 특정 영역의 값을 압축하는 ```Pooling```연산을 수행합니다.</br>
Classification Task에서는 다른 Pooling 방법에 비해 특정 영역을 해당 영역의 최댓값으로 압축하는 ```Max Polling```이 일반적으로 더 좋은 성능을 보여주므로,</br>
해당 코드에서도 Max Pooling을 기본적으로 구현합니다.</br>
Max Pooling 역시 Convolution과 동일하게 Pooling Kernel을 설정하지만 </br>
Convoltuion과는 다르게 Parameter를 저장할 필요가 없습니다.

In [None]:
def max_pool2d(input, pool_size=2, stride=2):
    batch_size, channel, height, width = input.shape  # 입력 텐서의 형태 (배치 크기, 채널 수, 높이, 너비) 구함
    out_height = (height - pool_size) // stride + 1  # 출력 텐서의 높이 계산
    out_width = (width - pool_size) // stride + 1  # 출력 텐서의 너비 계산
    output = cp.zeros((batch_size, channel, out_height, out_width))  # 출력 텐서 초기화
    max_indices = cp.zeros_like(input)  # 최대값 인덱스 초기화

    # 출력 텐서의 각 위치에 대해 최대 풀링 수행
    for i in range(out_height):
        for j in range(out_width):
            h_start = i * stride  # 입력 텐서에서 현재 위치의 시작 높이
            h_end = h_start + pool_size  # 입력 텐서에서 현재 위치의 끝 높이
            w_start = j * stride  # 입력 텐서에서 현재 위치의 시작 너비
            w_end = w_start + pool_size  # 입력 텐서에서 현재 위치의 끝 너비
            region = input[:, :, h_start:h_end, w_start:w_end]  # 입력 텐서에서 현재 위치의 영역 추출
            output[:, :, i, j] = cp.max(region, axis=(2, 3))  # 영역 내의 최대값 계산하여 출력 텐서에 저장
            max_indices[:, :, h_start:h_end, w_start:w_end] += (region == output[:, :, i:i+1, j:j+1])  # 최대값 위치 저장

    return output, max_indices  # 최대 풀링 결과와 최대값 인덱스 반환

def max_pool2d_backward(dout, max_indices, pool_size=2, stride=2):
    batch_size, channel, height, width = max_indices.shape  # 최대값 인덱스의 형태 (배치 크기, 채널 수, 높이, 너비) 구함
    dinput = cp.zeros_like(max_indices)  # 입력에 대한 변화도 초기화

    out_height = dout.shape[2]  # dout의 높이 구함
    out_width = dout.shape[3]  # dout의 너비 구함

    # 역전파 수행
    for i in range(out_height):
        for j in range(out_width):
            h_start = i * stride  # 입력 텐서에서 현재 위치의 시작 높이
            h_end = h_start + pool_size  # 입력 텐서에서 현재 위치의 끝 높이
            w_start = j * stride  # 입력 텐서에서 현재 위치의 시작 너비
            w_end = w_start + pool_size  # 입력 텐서에서 현재 위치의 끝 너비
            dinput[:, :, h_start:h_end, w_start:w_end] += (max_indices[:, :, h_start:h_end, w_start:w_end] * dout[:, :, i:i+1, j:j+1])  # 최대값 위치에 dout 적용

    return dinput  # 입력에 대한 변화도 반환

### Activation Function
- Sigmoid, ReLU, Softmax
기본적으로 Activation Function으로는 ```ReLU```를 사용하고, </br>
Classification Task를 수행하기 때문에</br>
Classifier의 마지막 Layer에서는 ```Softmax```를 사용합니다. 


In [4]:
def relu(x):
    return cp.maximum(0, x)

def relu_backward(dout, x):
    return dout * (x > 0)

**Convolution과 Pooling으로 구성된 Feature Extractor의 설정은 다음과 같습니다.**

|Layer|in_channel|out_chnnel|kernel_size|stride|padding|
|---|---|---|---|---|---|
|Conv1|1|32|3|1|1|
|Conv2|32|32|3|1|1|
|Pool1|2|2|0|
|Conv3|32|128|1|1|1|
|Conv4|128|128|3|1|1|
|Pool2|2|2|0|
|Conv5|128|256|3|1|1|
|Conv6|256|256|3|1|1|
|Pool3|2|2|

### Linear Combination(FC layer)
Feature extractor에서 생성한 Feature의 정보를 모아서 분류를 수행하는 Classifier를 정의합니다. </br>
Classifier에서는 Feature의 정보를 모으기 위해 Convolutional Layer를 Flatten(Vectorization)하기 때문에 </br>
각 Neuron들을 ```Linear Combination```으로 연결하는 ```Fully Connected(FC) Layer```를 구현해야 합니다. </br>

**Classifier의 설정은 다음과 같습니다.**

|Layer|in_features|out_features|
|---|---|---|
|FC1|3x3x256|4096|
|Dropout(0.5)|
|FC2|4096|10|

In [5]:
def flatten(input):
    return input.reshape(input.shape[0], -1)  # 입력 텐서를 평탄화하여 2차원 텐서로 변환 (배치 크기, 나머지 차원)

def fully_connected(input, weights, bias):
    return cp.dot(input, weights) + bias  # 입력 텐서와 가중치를 행렬 곱셈하고 바이어스를 더하여 선형 변환 수행

def fully_connected_backward(dout, input, weights):
    dinput = cp.dot(dout, weights.T)  # dout과 가중치의 전치를 곱하여 입력에 대한 변화도 계산
    dweights = cp.dot(input.T, dout)  # 입력의 전치와 dout을 곱하여 가중치에 대한 변화도 계산
    dbias = cp.sum(dout, axis=0)  # dout의 모든 배치에 대해 합을 구하여 바이어스에 대한 변화도 계산
    return dinput, dweights, dbias  # 입력, 가중치, 바이어스에 대한 변화도 반환

### Dropout
더 이상의 자세한 설명은 생략한다.

# <font color=Red># Class </font>

Python에서는 Class를 이용하여 Modulization을 수행할 수 있습니다. </br>
이때, 중요한 점은 해당 Class의 method는, 각 module의 ```Forward Propagation``` 부분과</br> 
```Backpropagation```이 모두 구현되어 있어야 한다는 점입니다.</br>

따라서, 모든 클래스에는 Forward Propagation method에서 해당 함수의 연산을 수행하는 부분을 정의하고, </br>
Backpropagation method에서 해당 함수의 ```gradient```를 계산하는 연산을 수행하는 부분을 정의합니다. 

### Dropout Module

#### Forward Propagation
define p : dropout probability </br>

$$\text{mask} = \frac{\mathbf{r}}{1-p}$$
r = uniformly distributed matrix for range [0, 1]

- **During Training**
$$\mathbf{y} = \mathbf{x} \odot \text{mask}$$
여기서, $\mathbf{y}$는 Dropout이 적용된 출력, $\mathbf{x}$는 입력, $\odot$는 element-wise multiplication을 나타냄.

- **During Test**
$$\mathbf{y} = \mathbf{x}$$
Test 시에는 Dropout이 적용되지 않음.

#### Backward Propagation
$$\mathbf{dout} = \mathbf{dout} \odot \text{mask}$$

In [6]:
class Dropout:
    def __init__(self, p=0.5):
        self.p = p  # 드롭아웃 확률 설정
        self.mask = None  # 마스크 초기화

    def forward(self, input, train=True):
        if train:  # 학습 중일 때
            self.mask = (cp.random.rand(*input.shape) > self.p) / (1 - self.p)  # 드롭아웃 마스크 생성
            return input * self.mask  # 입력에 마스크 적용하여 드롭아웃 수행
        else:  # 평가 중일 때
            return input  # 입력 그대로 반환

    def backward(self, dout):
        return dout * self.mask  # 역전파 시 마스크 적용하여 그래디언트 계산


### Convolution Module

In [7]:
class Conv2D:
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=1):
        self.stride = stride  # 스트라이드 값 설정
        self.padding = padding  # 패딩 값 설정
        # He 초기화를 사용하여 필터 가중치 초기화
        self.weights = cp.random.randn(out_channels, in_channels, kernel_size, kernel_size) * cp.sqrt(2 / (in_channels * kernel_size * kernel_size))
        self.bias = cp.random.randn(out_channels)  # 바이어스 초기화
        self.dweights = cp.zeros_like(self.weights)  # 필터 가중치 변화도 초기화
        self.dbias = cp.zeros_like(self.bias)  # 바이어스 변화도 초기화

    def forward(self, input):
        self.input = input  # 입력값 저장 (역전파를 위해)
        # 컨벌루션 연산 수행 후 ReLU 활성화 함수 적용하여 출력 계산
        self.output = relu(conv2d(input, self.weights, self.bias, self.stride, self.padding))
        return self.output  # 최종 출력 반환

    def backward(self, dout):
        # ReLU의 역전파 수행
        dout = relu_backward(dout, self.output)
        # 컨벌루션 연산의 역전파 수행하여 입력, 필터 가중치, 바이어스에 대한 변화도 계산
        dinput, self.dweights, self.dbias = conv2d_backward(dout, self.input, self.weights, self.stride, self.padding)
        return dinput  # 입력에 대한 변화도 반환


### Maxpool Module

In [8]:
class MaxPool2D:
    def __init__(self, pool_size=2, stride=2):
        self.pool_size = pool_size  # 풀링 크기 설정
        self.stride = stride  # 스트라이드 값 설정

    def forward(self, input):
        self.input = input  # 입력값 저장 (역전파를 위해)
        # 최대 풀링 연산 수행, 출력값과 최대값 인덱스 저장
        self.output, self.max_indices = max_pool2d(input, self.pool_size, self.stride)
        return self.output  # 최종 출력 반환

    def backward(self, dout):
        # 최대 풀링의 역전파 수행하여 입력에 대한 변화도 계산
        return max_pool2d_backward(dout, self.max_indices, self.pool_size, self.stride)

### ReLU Module

In [9]:
class ReLU:
    @staticmethod
    def forward(x):
        return relu(x)

    @staticmethod
    def backward(dout, x):
        return relu_backward(dout, x)

### Vectorization Module

In [10]:
class Flatten:
    def forward(self, input):
        self.input_shape = input.shape
        output = flatten(input)
        return output

    def backward(self, dout):
        return dout.reshape(self.input_shape)


### Linear Module

In [11]:
class FullyConnected:
    def __init__(self, in_features, out_features):
        # He initialization을 사용하여 가중치 초기화
        self.weights = cp.random.randn(in_features, out_features) * cp.sqrt(2 / in_features)
        self.bias = cp.random.randn(out_features)  # 바이어스 초기화
        self.dweights = cp.zeros_like(self.weights)  # 가중치 변화도 초기화
        self.dbias = cp.zeros_like(self.bias)  # 바이어스 변화도 초기화

    def forward(self, input):
        self.input = input  # 입력값 저장 (역전파를 위해)
        self.output = fully_connected(input, self.weights, self.bias)  # 선형 변환 수행
        return relu(self.output)  # ReLU 활성화 함수 적용 후 반환

    def backward(self, dout):
        dout = relu_backward(dout, self.output)  # ReLU의 역전파 수행
        # 선형 변환의 역전파 수행하여 입력, 가중치, 바이어스에 대한 변화도 계산
        dinput, self.dweights, self.dbias = fully_connected_backward(dout, self.input, self.weights)
        return dinput  # 입력에 대한 변화도 반환


### MC Dropout for ESC

In [12]:
def mc_dropout_predict(test_batches, model, num_samples=10):
    predictions = []  # 예측값을 저장할 리스트 초기화

    for _ in range(num_samples):  # 지정된 샘플 수만큼 반복
        batch_predictions = []  # 각 배치에 대한 예측값을 저장할 리스트 초기화
        for input, _ in test_batches:  # 테스트 배치 반복
            output = model.forward(input, train=True)  # Dropout을 활성화된 상태로 추론
            batch_predictions.append(cp.expand_dims(cp.argmax(output, axis=1), axis=0))  # 예측값을 차원 확장하여 리스트에 추가
        predictions.append(cp.concatenate(batch_predictions, axis=0))  # 배치별 예측값을 연결하여 리스트에 추가

    predictions = cp.stack(predictions, axis=0)  # 예측값을 스택하여 [num_samples, num_batches, batch_size] 형태로 만듦
    mean_predictions = cp.mean(predictions, axis=0)  # 각 배치에 대해 예측값의 평균 계산
    final_predictions = cp.argmax(mean_predictions, axis=1)  # 평균 예측값에서 가장 높은 확률을 가진 클래스 선택
    
    return final_predictions  # 최종 예측값 반환


# <font color=Red># Architecture </font>

In [13]:
class SimpleCNN:
    def __init__(self):
        self.layers = [
            Conv2D(1, 32, 3, stride=1, padding=1), # 28 x 28
            Conv2D(32, 32, 3, stride=1, padding=1), # 28 x 28
            MaxPool2D(2, 2), # 14 x 14
            Conv2D(32, 128, 3, stride=1, padding=1), # 14 x 14
            Conv2D(128, 128, 3, stride=1, padding=1), # 14 x 14
            MaxPool2D(2, 2), # 7 x 7
            Conv2D(128, 256, 3, stride=1, padding=1), # 7 x 7
            Conv2D(256, 256, 3, stride=1, padding=1), # 7 x 7
            MaxPool2D(2, 2), # 3 x 3
            Flatten(),  
            FullyConnected(256 * 3 * 3, 4096),  
            Dropout(0.5),  
            FullyConnected(4096, 128),  
            Dropout(0.5),  
            FullyConnected(128, 10)  
        ]

    def forward(self, x):
        for layer in self.layers:
            x = layer.forward(x)
#             print(f"Layer {layer.__class__.__name__} output shape: {x.shape}")
        return x

    def backward(self, dout):
        for layer in reversed(self.layers):
            dout = layer.backward(dout)
        return dout
    def get_params(self):
        params = []
        for layer in self.layers:
            if isinstance(layer, Conv2D) or isinstance(layer, FullyConnected):
                params.append({'value': layer.weights, 'grad': layer.dweights})
                params.append({'value': layer.bias, 'grad': layer.dbias})
        return params

# <font color=Red># Cost Function </font>

In [14]:
def cross_entropy_loss(output, target):
    batch_size = target.shape[0]
    output = cp.clip(output, 1e-12, 1. - 1e-12)
    loss = -cp.sum(target * cp.log(output)) / batch_size
    return loss

def cross_entropy_derivative(output, target):
    return output - target
def softmax(x):
    exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
    return exp_x / np.sum(exp_x, axis=1, keepdims=True)

# <font color=Red># Data Loader </font>

In [15]:
def load_data(batch_size=32, train_samples=2000, test_samples=500):
    transform = transforms.Compose([
        transforms.ToTensor()
    ])

    train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
    test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

    train_data = train_dataset.data.numpy()
    train_labels = train_dataset.targets.numpy()
    test_data = test_dataset.data.numpy()
    test_labels = test_dataset.targets.numpy()

    train_data = train_data / 255.0
    test_data = test_data / 255.0

    train_data = np.expand_dims(train_data, axis=1)
    test_data = np.expand_dims(test_data, axis=1)

    train_data = (train_data - 0.5) / 0.5
    test_data = (test_data - 0.5) / 0.5

    
    train_indices = np.random.choice(len(train_data), train_samples, replace=False)
    test_indices = np.random.choice(len(test_data), test_samples, replace=False)

    sampled_train_data = train_data[train_indices]
    sampled_train_labels = train_labels[train_indices]
    sampled_test_data = test_data[test_indices]
    sampled_test_labels = test_labels[test_indices]

    def create_batches(data, labels, batch_size):
        for start in range(0, len(data), batch_size):
            end = start + batch_size
            batch_data = cp.asarray(data[start:end])
            batch_labels = cp.asarray(labels[start:end])
            
            yield batch_data, batch_labels

    train_batches = list(create_batches(sampled_train_data, sampled_train_labels, batch_size))
    test_batches = list(create_batches(sampled_test_data, sampled_test_labels, batch_size))

    return train_batches, test_batches

# <font color=Red># Optimization </font>

### ADAM(Adaptive Moment Estimation)

**0. Initialize 1st momentum and 2nd momentum**
$$m_0 = 0 $$ 
$$v_0 = 0 $$ </br>

**1. Update Time step** </br>
$$t = t + 1$$


**2. Update 1st Momentum and 2nd Momentum(using the gradient g)**

$$m_t = \beta_1 \cdot m_{t-1} + (1 - \beta_1) \cdot g_t$$
$$v_t = \beta_2 \cdot v_{t-1} + (1 - \beta_2) \cdot g_t^2$$

**3. Update estimation**
$$\hat{m}_t = \frac{m_t}{1 - \beta_1^t}$$
$$\hat{v}_t = \frac{v_t}{1 - \beta_2^t}$$

**4. Update parameters(w, b)**

$$theta_t = \theta_{t-1} - \alpha \cdot \frac{\hat{m}_t}{\sqrt{\hat{v}_t} + \epsilon}$$

- Momentum coefficient(Friction)$(\beta_1,\,\,\beta_2)$ , Generally choosen 0.9 and 0.999, respectively.


In [16]:
class Adam:
    def __init__(self, params, lr=1e-5, beta1=0.9, beta2=0.999, epsilon=1e-8):
        self.params = params  # w, b (parameters to optimize)
        self.lr = lr  # Learning rate
        self.beta1 = beta1  # coefficient of 1st momentum
        self.beta2 = beta2  # coefficient of 2nd momentum
        self.epsilon = epsilon  # avoid to devided by 0
        
        self.m = [cp.zeros_like(p['value']) for p in params]  # initialize 1st momentum
        self.v = [cp.zeros_like(p['value']) for p in params]  # initialize 2nd momentum
        self.t = 0  # initialize time step

    def step(self):
        self.t += 1  # 타임스텝 증가
        for i, param in enumerate(self.params):  # For each parameters
            grad = param['grad']  # Gradient of the parameters(w, b)
            # Update 1st momentum
            self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grad
            # Update 2nd momentum
            self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * (grad ** 2)
            # update 1st momentum estimation
            m_hat = self.m[i] / (1 - self.beta1 ** self.t)
            # update 2nd momentum estimation 
            v_hat = self.v[i] / (1 - self.beta2 ** self.t)
            # update parameters
            param['value'] -= self.lr * m_hat / (cp.sqrt(v_hat) + self.epsilon)


In [17]:

def train(train_batches, model, optimizer, epochs=5):
    for epoch in range(epochs):
        total_loss = 0
        for i, (input, target) in enumerate(train_batches):
            # Forward pass
            output = model.forward(input)
            # Compute loss
            output = softmax(output)
            loss = cross_entropy_loss(output, target)
            total_loss += loss
            # Backward pass
            dout = cross_entropy_derivative(output, target)
            model.backward(dout)
            # Update weights
            optimizer.step()
#             print(f'Epoch {epoch + 1}, Batch {i + 1}, Loss: {loss}')
            print(f'Epoch {epoch + 1}, Batch {i + 1}, Loss: {loss}, Batch Size: {input.shape[0]}')
        print(f'Epoch {epoch + 1}, Average Loss: {total_loss / len(train_batches)}')

def test(test_batches, model):
    correct = 0
    total = 0
    for input, target in test_batches:
        output = model.forward(input)
        output = softmax(output)
        predictions = cp.argmax(output, axis=1)
        labels = cp.argmax(target, axis=1)
        correct += cp.sum(predictions == labels)
        total += labels.size
    accuracy = correct / total
    print(f'Test Accuracy: {accuracy * 100:.2f}%')

In [18]:
def test_mc(test_batches, model, num_samples=10):
    correct = 0
    total = 0
    for input, target in test_batches:
        output = mc_dropout_predict([(input, target)], model, num_samples=num_samples)
        labels = cp.argmax(target, axis=1)
        correct += cp.sum(output == labels)
        total += labels.size
    accuracy = correct / total
    print(f'Test Accuracy: {accuracy * 100:.2f}%')

In [19]:
def to_one_hot(labels, num_classes=10):
    return cp.eye(num_classes)[labels]

In [20]:
# Initialize model
model = SimpleCNN()

# Load data
train_batches, test_batches = load_data()

# Convert labels to one-hot encoding
train_batches = [(data, to_one_hot(labels)) for data, labels in train_batches]
test_batches = [(data, to_one_hot(labels)) for data, labels in test_batches]

In [None]:
params = model.get_params()
optimizer = Adam(params)

# Train the model
train(train_batches, model, optimizer)

Epoch 1, Batch 1, Loss: 8.945937896198348, Batch Size: 32
Epoch 1, Batch 2, Loss: 9.372462881177787, Batch Size: 32
Epoch 1, Batch 3, Loss: 7.8541191926773575, Batch Size: 32
Epoch 1, Batch 4, Loss: 8.993198310464228, Batch Size: 32
Epoch 1, Batch 5, Loss: 8.773692264104056, Batch Size: 32
Epoch 1, Batch 6, Loss: 8.43431199048783, Batch Size: 32
Epoch 1, Batch 7, Loss: 6.84116654431034, Batch Size: 32
Epoch 1, Batch 8, Loss: 9.11850859569022, Batch Size: 32
Epoch 1, Batch 9, Loss: 8.583800705422357, Batch Size: 32
Epoch 1, Batch 10, Loss: 8.294407962411455, Batch Size: 32
Epoch 1, Batch 11, Loss: 9.384853586680766, Batch Size: 32
Epoch 1, Batch 12, Loss: 8.707110608327126, Batch Size: 32
Epoch 1, Batch 13, Loss: 8.999608019940176, Batch Size: 32
Epoch 1, Batch 14, Loss: 9.044055866616286, Batch Size: 32
Epoch 1, Batch 15, Loss: 8.469025550108455, Batch Size: 32
Epoch 1, Batch 16, Loss: 7.9052636195561075, Batch Size: 32
Epoch 1, Batch 17, Loss: 8.793357694355118, Batch Size: 32
Epoch 1

In [None]:
# test(test_batches, model)
test_mc(test_batches, model)