In [1]:
import numpy as np
from torchvision.datasets import CIFAR100
from torchvision import transforms
from torch.utils.data import DataLoader
import torch

# 1. CIFAR-100 train/test 데이터셋 불러오기
train_dataset = CIFAR100(root='./data', train=True, download=True)
test_dataset = CIFAR100(root='./data', train=False, download=True)


# 2. numpy로 변환 (float32로 스케일링)
X_train = train_dataset.data.astype(np.float32) / 255.0  # shape: (50000, 32, 32, 3)
y_train = np.array(train_dataset.targets)                # shape: (50000,)

X_test = test_dataset.data.astype(np.float32) / 255.0
y_test = np.array(test_dataset.targets)

# 3. 정규화 (채널 별 평균/표준편차)
mean = np.array([0.5071, 0.4867, 0.4408])
std = np.array([0.2675, 0.2565, 0.2761])

X_train = (X_train - mean) / std
X_test = (X_test - mean) / std

# 4. One-hot encoding (100-class)
y_train_oh = np.eye(100)[y_train]  # shape: (50000, 100)
y_test_oh = np.eye(100)[y_test]    # shape: (10000, 100)

# 확인용 출력
print("X_train:", X_train.shape)
print("y_train (one-hot):", y_train_oh.shape)
print("X_test:", X_test.shape)
print("y_test (one-hot):", y_test_oh.shape)

Downloading https://www.cs.toronto.edu/~kriz/cifar-100-python.tar.gz to ./data\cifar-100-python.tar.gz


100%|███████████████████████████████████████████████████████████████| 169001437/169001437 [01:00<00:00, 2795616.85it/s]


Extracting ./data\cifar-100-python.tar.gz to ./data
Files already downloaded and verified
X_train: (50000, 32, 32, 3)
y_train (one-hot): (50000, 100)
X_test: (10000, 32, 32, 3)
y_test (one-hot): (10000, 100)


In [2]:
def save_cifar100(X_train, y_train_oh, X_test, y_test_oh, path='./'):
    np.save(path + 'X_train.npy', X_train)
    np.save(path + 'y_train_oh.npy', y_train_oh)
    np.save(path + 'X_test.npy', X_test)
    np.save(path + 'y_test_oh.npy', y_test_oh)

def load_cifar100(path='./'):
    X_train = np.load(path + 'X_train.npy')
    y_train_oh = np.load(path + 'y_train_oh.npy')
    X_test = np.load(path + 'X_test.npy')
    y_test_oh = np.load(path + 'y_test_oh.npy')
    return X_train, y_train_oh, X_test, y_test_oh

In [3]:
# 저장
save_cifar100(X_train, y_train_oh, X_test, y_test_oh)

In [4]:
# 불러오기
X_train, y_train_oh, X_test, y_test_oh = load_cifar100()

In [5]:
class Conv2D:
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0):
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size if isinstance(kernel_size, tuple) else (kernel_size, kernel_size)
        self.stride = stride
        self.padding = padding

        kh, kw = self.kernel_size
        fan_in = in_channels * kh * kw
        fan_out = out_channels * kh * kw
        limit = np.sqrt(6 / (fan_in + fan_out))
        self.weights = np.random.uniform(-limit, limit, size=(out_channels, in_channels, kh, kw))
        self.bias = np.zeros(out_channels)

    def forward(self, x):
        self.x = x  # 입력 저장 (for backward)
        N, H, W, C = x.shape
        kh, kw = self.kernel_size
        sh, sw = self.stride, self.stride

        x_padded = np.pad(x, ((0, 0), (self.padding, self.padding), (self.padding, self.padding), (0, 0)), mode='constant')
        self.x_padded = x_padded  # backward에서 사용

        H_out = (H + 2 * self.padding - kh) // sh + 1
        W_out = (W + 2 * self.padding - kw) // sw + 1
        out = np.zeros((N, H_out, W_out, self.out_channels))

        for n in range(N):
            for h in range(H_out):
                for w in range(W_out):
                    for c_out in range(self.out_channels):
                        h_start = h * sh
                        h_end = h_start + kh
                        w_start = w * sw
                        w_end = w_start + kw

                        region = x_padded[n, h_start:h_end, w_start:w_end, :]  # shape: (kh, kw, in_channels)
                        kernel = self.weights[c_out].transpose(1, 2, 0)  # (in_channels, kh, kw) → (kh, kw, in_channels)
                        out[n, h, w, c_out] = np.sum(region * kernel) + self.bias[c_out]
        return out

    def backward(self, grad_output):
        x_padded = self.x_padded
        N, H_out, W_out, C_out = grad_output.shape
        kh, kw = self.kernel_size
        sh, sw = self.stride, self.stride

        self.grad_weights = np.zeros_like(self.weights)
        self.grad_bias = np.zeros_like(self.bias)
        dx_padded = np.zeros_like(x_padded)

        for n in range(N):
            for h in range(H_out):
                for w in range(W_out):
                    for c_out in range(self.out_channels):
                        h_start = h * sh
                        h_end = h_start + kh
                        w_start = w * sw
                        w_end = w_start + kw

                        region = x_padded[n, h_start:h_end, w_start:w_end, :]  # (kh, kw, in_channels)

                        self.grad_weights[c_out] += region.transpose(2, 0, 1) * grad_output[n, h, w, c_out]
                        self.grad_bias[c_out] += grad_output[n, h, w, c_out]
                        dx_padded[n, h_start:h_end, w_start:w_end, :] += self.weights[c_out].transpose(1, 2, 0) * grad_output[n, h, w, c_out]

        # 패딩 제거
        if self.padding > 0:
            dx = dx_padded[:, self.padding:-self.padding, self.padding:-self.padding, :]
        else:
            dx = dx_padded
        return dx

    def update(self, lr):
        self.weights -= lr * self.grad_weights
        self.bias -= lr * self.grad_bias

In [6]:
class ReLU:
    def forward(self, x):
        self.mask = (x > 0)
        return x * self.mask

    def backward(self, grad_output):
        return grad_output * self.mask

In [7]:
class MaxPool2D:
    def __init__(self, kernel_size=2, stride=2):
        self.kernel_size = kernel_size
        self.stride = stride

    def forward(self, x):
        self.x = x
        N, H, W, C = x.shape
        kh, kw = self.kernel_size, self.kernel_size
        sh, sw = self.stride, self.stride

        H_out = (H - kh) // sh + 1
        W_out = (W - kw) // sw + 1
        out = np.zeros((N, H_out, W_out, C))
        self.max_mask = np.zeros_like(x)

        for n in range(N):
            for h in range(H_out):
                for w in range(W_out):
                    for c in range(C):
                        h_start = h * sh
                        w_start = w * sw
                        h_end = h_start + kh
                        w_end = w_start + kw

                        window = x[n, h_start:h_end, w_start:w_end, c]
                        max_val = np.max(window)
                        out[n, h, w, c] = max_val

                        # mask에 해당 위치만 1로 설정
                        for i in range(kh):
                            for j in range(kw):
                                if window[i, j] == max_val:
                                    self.max_mask[n, h_start + i, w_start + j, c] = 1
                                    break  # 첫 max만 선택
                            else:
                                continue
                            break
        return out

    def backward(self, grad_output):
        N, H_out, W_out, C = grad_output.shape
        kh, kw = self.kernel_size, self.kernel_size
        sh, sw = self.stride, self.stride

        grad_input = np.zeros_like(self.x)

        for n in range(N):
            for h in range(H_out):
                for w in range(W_out):
                    for c in range(C):
                        h_start = h * sh
                        w_start = w * sw
                        h_end = h_start + kh
                        w_end = w_start + kw

                        for i in range(kh):
                            for j in range(kw):
                                if self.max_mask[n, h_start + i, w_start + j, c] == 1:
                                    grad_input[n, h_start + i, w_start + j, c] = grad_output[n, h, w, c]
                                    break
                            else:
                                continue
                            break
        return grad_input

In [8]:
class Flatten:
    def forward(self, x):
        self.input_shape = x.shape
        return x.reshape(x.shape[0], -1)

    def backward(self, grad_output):
        return grad_output.reshape(self.input_shape)

In [9]:
class FullyConnected:
    def __init__(self, in_features, out_features):
        self.weights = np.random.randn(in_features, out_features) * np.sqrt(2. / in_features)
        self.bias = np.zeros(out_features)

    def forward(self, x):
        self.input = x
        return x @ self.weights + self.bias

    def backward(self, grad_output):
        self.grad_weights = self.input.T @ grad_output
        self.grad_bias = np.sum(grad_output, axis=0)
        return grad_output @ self.weights.T

    def update(self, lr):
        self.weights -= lr * self.grad_weights
        self.bias -= lr * self.grad_bias

In [10]:
class Softmax:
    def forward(self, x):
        exps = np.exp(x - np.max(x, axis=1, keepdims=True))
        self.out = exps / np.sum(exps, axis=1, keepdims=True)
        return self.out

    def backward(self, grad_output):
        return grad_output  # CrossEntropy랑 같이 쓰면 별도 처리 불필요

In [11]:
class CrossEntropyLoss:
    def forward(self, y_pred, y_true):  # y_true는 one-hot
        self.y_pred = y_pred
        self.y_true = y_true
        loss = -np.sum(y_true * np.log(y_pred + 1e-9)) / y_pred.shape[0]
        return loss

    def backward(self):
        return (self.y_pred - self.y_true) / self.y_pred.shape[0]

In [12]:
class SGD:
    def __init__(self, parameters, lr=0.01):
        self.parameters = parameters
        self.lr = lr

    def step(self):
        for p in self.parameters:
            p.update(self.lr)

In [13]:
class BaselineCNN:
    def __init__(self):
        self.conv1 = Conv2D(3, 16, 3, stride=1, padding=1)
        self.relu1 = ReLU()
        self.pool1 = MaxPool2D()

        self.conv2 = Conv2D(16, 32, 3, stride=1, padding=1)
        self.relu2 = ReLU()
        self.pool2 = MaxPool2D()

        self.flatten = Flatten()
        self.fc1 = FullyConnected(8*8*32, 512)
        self.relu3 = ReLU()
        self.fc2 = FullyConnected(512, 100)
        self.softmax = Softmax()

    def forward(self, x):
        x = self.conv1.forward(x)
        x = self.relu1.forward(x)
        x = self.pool1.forward(x)

        x = self.conv2.forward(x)
        x = self.relu2.forward(x)
        x = self.pool2.forward(x)

        x = self.flatten.forward(x)
        x = self.fc1.forward(x)
        x = self.relu3.forward(x)
        x = self.fc2.forward(x)
        x = self.softmax.forward(x)
        return x

    def backward(self, grad_output):
        grad = self.fc2.backward(grad_output)
        grad = self.relu3.backward(grad)
        grad = self.fc1.backward(grad)
        grad = self.flatten.backward(grad)
        # Pool, Conv backward 생략 가능 (baseline에서는)

    def parameters(self):
        return [self.fc1, self.fc2]  # Conv2D도 파라미터 있으면 여기에 추가

In [14]:
def train(model, X_train, y_train, epochs=5, batch_size=64, lr=0.01):
    loss_fn = CrossEntropyLoss()
    optimizer = SGD(parameters=model.parameters(), lr=lr)

    N = X_train.shape[0]
    for epoch in range(epochs):
        perm = np.random.permutation(N)
        X_train = X_train[perm]
        y_train = y_train[perm]

        total_loss = 0
        correct = 0

        for i in range(0, N, batch_size):
            x_batch = X_train[i:i+batch_size]
            y_batch = y_train[i:i+batch_size]

            y_pred = model.forward(x_batch)
            loss = loss_fn.forward(y_pred, y_batch)
            total_loss += loss

            # Accuracy
            pred_labels = np.argmax(y_pred, axis=1)
            true_labels = np.argmax(y_batch, axis=1)
            correct += np.sum(pred_labels == true_labels)

            # Backward
            grad_loss = loss_fn.backward()
            model.backward(grad_loss)

            optimizer.step()

        acc = correct / N * 100
        print(f"Epoch {epoch+1}: Loss = {total_loss:.4f}, Accuracy = {acc:.2f}%")

In [15]:
# 모델 선언
model = BaselineCNN()

In [16]:
# BaselineCNN 저장 함수
def save_model_baseline(model, filename='baseline_model.npz'):
    np.savez(filename,
             fc1_w=model.fc1.weights,
             fc1_b=model.fc1.bias,
             fc2_w=model.fc2.weights,
             fc2_b=model.fc2.bias)

# BaselineCNN 불러오기 함수
def load_model_baseline(model, filename='baseline_model.npz'):
    data = np.load(filename)
    model.fc1.weights = data['fc1_w']
    model.fc1.bias = data['fc1_b']
    model.fc2.weights = data['fc2_w']
    model.fc2.bias = data['fc2_b']

In [17]:
# 학습 시작
train(model, X_train, y_train_oh, epochs=3, batch_size=64, lr=0.01)

Epoch 1: Loss = 3467.5375, Accuracy = 4.52%
Epoch 2: Loss = 3231.1973, Accuracy = 9.90%
Epoch 3: Loss = 3076.9595, Accuracy = 13.01%


In [18]:
# 모델 학습 후 저장
save_model_baseline(model, 'baseline_epoch3.npz')

In [19]:
# 불러오기
model = BaselineCNN()
load_model_baseline(model, 'baseline_epoch3.npz')

In [20]:
def evaluate(model, X_test, y_test):
    loss_fn = CrossEntropyLoss()
    total_loss = 0
    correct = 0
    N = X_test.shape[0]
    batch_size = 64

    for i in range(0, N, batch_size):
        x_batch = X_test[i:i+batch_size]
        y_batch = y_test[i:i+batch_size]

        y_pred = model.forward(x_batch)
        loss = loss_fn.forward(y_pred, y_batch)
        total_loss += loss

        pred_labels = np.argmax(y_pred, axis=1)
        true_labels = np.argmax(y_batch, axis=1)
        correct += np.sum(pred_labels == true_labels)

    acc = correct / N * 100
    avg_loss = total_loss / (N // batch_size)
    return avg_loss, acc

In [21]:
test_loss, test_acc = evaluate(model, X_test, y_test_oh)
print(f"[Baseline Test] Loss = {test_loss:.4f}, Accuracy = {test_acc:.2f}%")

[Baseline Test] Loss = 4.7937, Accuracy = 1.36%


In [26]:
import numpy as np

class Conv2D_vec:
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0):
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size if isinstance(kernel_size, tuple) else (kernel_size, kernel_size)
        self.stride = stride
        self.padding = padding

        kh, kw = self.kernel_size
        fan_in = in_channels * kh * kw
        fan_out = out_channels * kh * kw
        limit = np.sqrt(6 / (fan_in + fan_out))
        self.weights = np.random.uniform(-limit, limit, (out_channels, in_channels, kh, kw))
        self.bias = np.zeros(out_channels)

    def im2col(self, x, kh, kw, sh, sw):
        N, H, W, C = x.shape
        H_out = (H + 2 * self.padding - kh) // sh + 1
        W_out = (W + 2 * self.padding - kw) // sw + 1

        x_padded = np.pad(x, ((0, 0), (self.padding, self.padding), (self.padding, self.padding), (0, 0)), mode='constant')
        col = np.zeros((N, H_out, W_out, kh, kw, C))

        for y in range(H_out):
            for x in range(W_out):
                y_start = y * sh
                x_start = x * sw
                col[:, y, x, :, :, :] = x_padded[:, y_start:y_start+kh, x_start:x_start+kw, :]

        col = col.reshape(N * H_out * W_out, kh * kw * C)
        return col, H_out, W_out

    def forward(self, x):
        self.x = x
        N, H, W, C = x.shape
        kh, kw = self.kernel_size
        sh, sw = self.stride, self.stride

        col, H_out, W_out = self.im2col(x, kh, kw, sh, sw)
        self.col = col

        W_col = self.weights.reshape(self.out_channels, -1)
        out = col @ W_col.T + self.bias
        out = out.reshape(N, H_out, W_out, self.out_channels)
        return out

    def backward(self, grad_output):
        N, H, W, C = self.x.shape
        kh, kw = self.kernel_size
        sh, sw = self.stride, self.stride
        col = self.col
        H_out, W_out = grad_output.shape[1], grad_output.shape[2]

        grad_output_reshaped = grad_output.reshape(-1, self.out_channels)

        self.dW = grad_output_reshaped.T @ col
        self.dW = self.dW.reshape(self.out_channels, kh, kw, C).transpose(0, 3, 1, 2)
        self.db = np.sum(grad_output_reshaped, axis=0)

        W_col = self.weights.reshape(self.out_channels, -1)
        dcol = grad_output_reshaped @ W_col
        dcol = dcol.reshape(N, H_out, W_out, kh, kw, C)

        dx_padded = np.zeros((N, H + 2 * self.padding, W + 2 * self.padding, C))
        for y in range(H_out):
            for x in range(W_out):
                y_start = y * sh
                x_start = x * sw
                dx_padded[:, y_start:y_start+kh, x_start:x_start+kw, :] += dcol[:, y, x, :, :, :]

        if self.padding == 0:
            return dx_padded
        return dx_padded[:, self.padding:-self.padding, self.padding:-self.padding, :]

    def update(self, lr):
        self.weights -= lr * self.dW
        self.bias -= lr * self.db

In [28]:
class FusionBlock:
    def __init__(self, in_channels, out_channels):
        self.conv_1x1 = Conv2D_vec(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.conv_3x3 = Conv2D_vec(in_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.relu = ReLU()
        self.alpha = np.array([0.5])
        self.grad_alpha = 0.0

    def forward(self, x):
        self.x = x
        self.out1 = self.conv_1x1.forward(x)
        self.out2 = self.conv_3x3.forward(x)
        self.output = self.alpha * self.out1 + (1 - self.alpha) * self.out2
        return self.relu.forward(self.output)

    def backward(self, grad_output):
        grad_output_relu = self.relu.backward(grad_output)
        self.grad_alpha = np.sum((self.out1 - self.out2) * grad_output_relu)
        grad1 = grad_output_relu * self.alpha
        grad2 = grad_output_relu * (1 - self.alpha)
        dx1 = self.conv_1x1.backward(grad1)
        dx2 = self.conv_3x3.backward(grad2)
        return dx1 + dx2

    def update(self, lr):
        self.alpha -= lr * self.grad_alpha
        self.alpha = np.clip(self.alpha, 0, 1)
        self.conv_1x1.update(lr)
        self.conv_3x3.update(lr)

class ResidualBlock:
    def __init__(self, channels):
        self.conv1 = Conv2D_vec(channels, channels, kernel_size=3, stride=1, padding=1)
        self.relu1 = ReLU()
        self.conv2 = Conv2D_vec(channels, channels, kernel_size=3, stride=1, padding=1)
        self.relu2 = ReLU()

    def forward(self, x):
        self.x = x
        self.out = self.conv1.forward(x)
        self.out = self.relu1.forward(self.out)
        self.out = self.conv2.forward(self.out)
        return self.relu2.forward(self.out + x)

    def backward(self, grad_output):
        grad = self.relu2.backward(grad_output)
        grad_skip = grad
        grad = self.conv2.backward(grad)
        grad = self.relu1.backward(grad)
        grad = self.conv1.backward(grad)
        return grad + grad_skip

    def update(self, lr):
        self.conv1.update(lr)
        self.conv2.update(lr)

class DFFRCNN:
    def __init__(self):
        self.conv1 = Conv2D_vec(3, 16, 3, stride=1, padding=1)
        self.relu1 = ReLU()
        self.res_block = ResidualBlock(16)
        self.fusion = FusionBlock(16, 32)
        self.pool = MaxPool2D()
        self.flatten = Flatten()
        self.fc1 = FullyConnected(16*16*32, 512)
        self.relu2 = ReLU()
        self.fc2 = FullyConnected(512, 100)
        self.softmax = Softmax()

    def forward(self, x):
        x = self.conv1.forward(x)
        x = self.relu1.forward(x)
        x = self.res_block.forward(x)
        x = self.fusion.forward(x)
        x = self.pool.forward(x)
        x = self.flatten.forward(x)
        x = self.fc1.forward(x)
        x = self.relu2.forward(x)
        x = self.fc2.forward(x)
        x = self.softmax.forward(x)
        return x

    def backward(self, grad_output):
        grad = self.fc2.backward(grad_output)
        grad = self.relu2.backward(grad)
        grad = self.fc1.backward(grad)
        grad = self.flatten.backward(grad)
        grad = self.pool.backward(grad)
        grad = self.fusion.backward(grad)
        grad = self.res_block.backward(grad)
        grad = self.relu1.backward(grad)
        grad = self.conv1.backward(grad)

    def parameters(self):
        return [self.fc1, self.fc2, self.fusion, self.res_block, self.conv1]

In [30]:
model = DFFRCNN()

In [32]:
# DFFRCNN 저장 및 불러오기 함수
def save_model_dff(model, filename='dffrcnn_model.npz'):
    np.savez(filename,
             conv1_w=model.conv1.weights,
             conv1_b=model.conv1.bias,

             res_conv1_w=model.res_block.conv1.weights,
             res_conv1_b=model.res_block.conv1.bias,
             res_conv2_w=model.res_block.conv2.weights,
             res_conv2_b=model.res_block.conv2.bias,

             fusion_conv1x1_w=model.fusion.conv_1x1.weights,
             fusion_conv1x1_b=model.fusion.conv_1x1.bias,
             fusion_conv3x3_w=model.fusion.conv_3x3.weights,
             fusion_conv3x3_b=model.fusion.conv_3x3.bias,
             fusion_alpha=model.fusion.alpha,

             fc1_w=model.fc1.weights,
             fc1_b=model.fc1.bias,
             fc2_w=model.fc2.weights,
             fc2_b=model.fc2.bias)

def load_model_dff(model, filename='dffrcnn_model.npz'):
    data = np.load(filename)
    model.conv1.weights = data['conv1_w']
    model.conv1.bias = data['conv1_b']

    model.res_block.conv1.weights = data['res_conv1_w']
    model.res_block.conv1.bias = data['res_conv1_b']
    model.res_block.conv2.weights = data['res_conv2_w']
    model.res_block.conv2.bias = data['res_conv2_b']

    model.fusion.conv_1x1.weights = data['fusion_conv1x1_w']
    model.fusion.conv_1x1.bias = data['fusion_conv1x1_b']
    model.fusion.conv_3x3.weights = data['fusion_conv3x3_w']
    model.fusion.conv_3x3.bias = data['fusion_conv3x3_b']
    model.fusion.alpha = data['fusion_alpha']

    model.fc1.weights = data['fc1_w']
    model.fc1.bias = data['fc1_b']
    model.fc2.weights = data['fc2_w']
    model.fc2.bias = data['fc2_b']

In [34]:
train(model, X_train, y_train_oh, epochs=3, batch_size=64, lr=0.01)

Epoch 1: Loss = 3350.7378, Accuracy = 6.71%
Epoch 2: Loss = 2915.8623, Accuracy = 15.18%
Epoch 3: Loss = 2694.9769, Accuracy = 19.88%


In [35]:
# 모델 저장
save_model_dff(model, 'dffrcnn_model_epoch3.npz')

In [36]:
# 모델 불러오기
model = DFFRCNN()
load_model_dff(model, 'dffrcnn_model_epoch3.npz')

In [37]:
test_loss, test_acc = evaluate(model, X_test, y_test_oh)
print(f"[Test] Loss: {test_loss:.4f}, Accuracy: {test_acc:.2f}%")

[Test] Loss: 3.6379, Accuracy: 17.33%
