In [1]:
import numpy as np
import matplotlib.pyplot as plt
import math
import random
import time
from dataset_PA1.dataloader import Dataloader
from dataset_PA1.dataloader import datasetIterator
from CNN_classes import Conv2DLayer
from CNN_classes import MaxPoolingLayer
from CNN_classes import LinearLayer
from CNN_classes import ReLULayer
from CNN_classes import softmax
from CNN_classes import cross_entropy_loss
from CNN_classes import softmax_cross_entropy_backward

random.seed(42)

In [2]:
class ThreeLayerCNN:
    def __init__(self, batch_size, input_channels, H_in, W_in, conv1_F, conv2_F, kernel_size, pool_size, learning_rate):
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        
        input_shape = (batch_size, input_channels, H_in, W_in)
        self.conv1 = Conv2DLayer(input_shape=input_shape, out_channels=conv1_F, kernel_size=kernel_size, stride=1, padding=1, learning_rate=learning_rate)
        self.relu1 = ReLULayer()
        self.pool1 = MaxPoolingLayer(pool_size=pool_size, stride=pool_size)
        mid_shape = (batch_size, conv1_F, H_in // pool_size, W_in // pool_size)
        self.conv2 = Conv2DLayer(input_shape=mid_shape, out_channels=conv2_F, kernel_size=kernel_size, stride=1, padding=1, learning_rate=learning_rate)
        self.relu2 = ReLULayer()
        self.pool2 = MaxPoolingLayer(pool_size=pool_size, stride=pool_size)
        
        final_H = H_in // (pool_size * pool_size)
        final_W = W_in // (pool_size * pool_size)
        linear_input_size = conv2_F * final_H * final_W
        output_size = 10
        self.linear3 = LinearLayer(batch_size=batch_size, input_size=linear_input_size, output_size=output_size, learning_rate=learning_rate)

        self.layers = [self.conv1, self.conv2, self.linear3]

    def forward(self, x):
        L1 = self.conv1.forward(x)
        A1 = self.relu1.forward(L1)
        P1 = self.pool1.forward(A1)

        L2 = self.conv2.forward(P1)
        A2 = self.relu2.forward(L2)
        P2 = self.pool2.forward(A2)
        self.pool2_output_shape = P2.shape
        P2_flat = P2.reshape(P2.shape[0], -1)

        L3 = self.linear3.forward(P2_flat)
        pred = softmax(L3)
        return pred
    
    def backward(self, pred, ans):
        dL3 = softmax_cross_entropy_backward(pred, ans)
        dP2_flat = self.linear3.backward(dL3)
        dP2 = dP2_flat.reshape(self.pool2_output_shape)
        dA2 = self.pool2.backward(dP2)
        dL2 = self.relu2.backward(dA2)
        dP1 = self.conv2.backward(dL2)
        dA1 = self.pool1.backward(dP1)
        dL1 = self.relu1.backward(dA1)
        self.conv1.backward(dL1)
    
    def update_params(self):
        for layer in self.layers:
            layer.update_params()

In [3]:
def evaluate(model, data_loader):
    total_loss = 0
    total_correct = 0
    total_samples = 0

    for image, label in data_loader:
        label = np.argmax(label, axis=1)
        pred = model.forward(image)
        loss = cross_entropy_loss(pred, label)
        total_loss += loss * image.shape[0]

        pred_labels = np.argmax(pred, axis=1)
        total_correct += np.sum(pred_labels == label)
        total_samples += image.shape[0]
    avg_loss = total_loss / total_samples
    accuracy = total_correct / total_samples

    return avg_loss, accuracy

def predict_all(model, data_loader):
    all_pred = []
    all_label = []
    for image, label in data_loader:
        pred = model.forward(image)

        all_pred.append(pred)
        all_label.append(label)

    final_pred = np.concatenate(all_pred, axis=0)
    final_label = np.concatenate(all_label, axis=0)
    return final_pred, final_label

In [4]:
def train_cnn(epochs, learning_rate, batch_size, train_loader, test_loader):

    input_channels = 1
    H_in = 28
    W_in = 28
    conv1_F = 16
    conv2_F = 32
    kernel_size = 3
    pool_size = 2
    model = ThreeLayerCNN(batch_size=batch_size, input_channels=input_channels, H_in=H_in, W_in=W_in, conv1_F=conv1_F, conv2_F=conv2_F, kernel_size=kernel_size, pool_size=pool_size, learning_rate=learning_rate)

    train_losses = []
    test_accuracies = []
    test_losses = []

    print("Training start")

    for epoch in range(1, epochs+1):
        start_time = time.time()
        epoch_loss = 0
        total_batches = len(train_loader)

        for _, (image, label) in enumerate(train_loader) :
            image = image.reshape(image.shape[0], 1, 28, 28)

            label = np.argmax(label, axis=1)
            pred = model.forward(image)
            loss = cross_entropy_loss(pred, label)
            epoch_loss += loss

            model.backward(pred, label)
            model.update_params()

        avg_epoch_loss = epoch_loss / total_batches
        train_losses.append(avg_epoch_loss)

        test_loss, test_acc = evaluate(model, test_loader)
        test_accuracies.append(test_acc)
        test_losses.append(test_loss)
        print(f"Epoch {epoch} completed in {time.time() - start_time:.2f}s - Train Loss: {avg_epoch_loss:.4f}, Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")
    
    print("Training finish")

    return model, train_losses, test_losses

In [5]:
def plot_loss(train_losses, test_losses):
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label='Training Loss')
    plt.plot(test_losses, label='Test Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Test Loss over Epochs')
    plt.legend()
    plt.grid()
    plt.show()

def confusion_matrix(pred, label, num_class=10):
    pred_labels = np.argmax(pred, axis=1)
    true_labels = np.argmax(label, axis=1)

    conf_matrix = np.zeros((num_class, num_class), dtype=int)
    num_samples = len(true_labels)
    for i in range(num_samples):
        conf_matrix[true_labels[i], pred_labels[i]] += 1
    print("Confusion Matrix:")
    print(conf_matrix)
    return conf_matrix

def get_top3(pred, image, label, num_class=10):
    fig, axes = plt.subplots(3, num_class, figsize=(20, 6))
    plt.suptitle("Top 3 Predicted Images for Each Class (3-Layer NN)")
    
    for i in range(num_class):
        scores = pred[:, i]
        top3_i = np.argsort(scores)[::-1][:3]
        
        
        for j, img_idx in enumerate(top3_i):
            
            score = scores[img_idx]

            pred_class = i
            real_class = label[img_idx]
            
            ax = axes[j, i]
            
            image_reshaped = image[img_idx].reshape(28, 28)
            ax.imshow(image_reshaped, cmap='gray')
            ax.axis('off')

            ax.set_title(f"{score:.2f}", fontsize=8) 
            
            if j == 0:
                ax.set_xlabel(f"Pred {pred_class}\nTrue {real_class}", fontsize=9, color='blue' if pred_class == real_class else 'red')
            else:
                ax.set_xlabel(f"True {real_class}", fontsize=9, color='blue' if pred_class == real_class else 'red')

            if i == 0:
                 ax.set_ylabel(f"Rank {j+1}", rotation=0, labelpad=15, fontsize=10)

    plt.tight_layout(rect=[0, 0, 1, 0.98])
    plt.show()


In [6]:
data_path = './dataset_PA1'
learning_rate = 0.01
epochs = 5
batch_size = 64
train_loader = Dataloader(path=data_path, is_train=True, batch_size=batch_size)
test_loader = Dataloader(path=data_path, is_train=False, batch_size=batch_size)

cnn_model, train_losses, test_losses = train_cnn(epochs, learning_rate, batch_size, train_loader=train_loader, test_loader=test_loader)


Training start


KeyboardInterrupt: 

In [None]:
pred, gt = predict_all(cnn_model, test_loader)

In [None]:
plot_loss(train_losses, test_losses)
conf_matrix = confusion_matrix(pred, gt, num_class=10)

# Load test data again to show predictions
test_images = []
test_labels = []
for image, label in test_loader:
    test_images.extend(image)
    
    label = np.argmax(label, axis=1)
    test_labels.extend(label)
    

test_images = np.concatenate(test_images, axis=0)
test_labels = np.array(test_labels)
print(test_labels.shape)
print("\n--- 클래스별 상위 3개 예측 --- (모델이 각 숫자로 가장 확신하는 이미지들)")
get_top3(pred, test_images, test_labels, num_class=10)