In [1]:
# Pytorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
# Helper libraries
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

torch.manual_seed(237)  # for reproducibility

<torch._C.Generator at 0x19b7f5f7650>

In [2]:
class CatDogDataset(Dataset):
    def __init__(self, root_dir, file_list, transform=None):
        self.root_dir = root_dir
        self.files = file_list
        self.transform = transform

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        filename = self.files[idx]
        path = os.path.join(self.root_dir, filename)

        img = Image.open(path).convert("RGB")

        if "cat" in filename.lower():
            label = 0
        else:
            label = 1

        if self.transform:
            img = self.transform(img)

        return img, label

In [3]:
import os
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split


    
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

root = "data/catdog/train"
files = os.listdir(root)
files = [f for f in files if f.lower().endswith(("jpg","jpeg","png"))]

# 파일명 → 라벨
labels = []
for f in files:
    if "cat" in f.lower():
        labels.append(0)
    else:
        labels.append(1)

labels = np.array(labels)
files  = np.array(files)

# ---------------------------------------
# ⭐ Stratified Train / Val / Test Split
# ---------------------------------------

# 1) 먼저 train vs temp (val+test)
train_files, temp_files, train_labels, temp_labels = train_test_split(
    files, labels, test_size=0.3, stratify=labels, random_state=42
)

transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor()
])

trainset = CatDogDataset(root, train_files, transform)
testset   = CatDogDataset(root, temp_files, transform)

trainloader = DataLoader(trainset, batch_size=64, shuffle=True)
testloader   = DataLoader(testset, batch_size=64, shuffle=False)

In [4]:
image, label = trainset[0]

print(f"Image shape: {image.shape}")
print(f"Label: {label}")

train_size = torch.tensor(len(trainset))
test_size = torch.tensor(len(testset))

print(f"Train dataset size: {train_size} (Shape: {train_size.shape})")
print(f"Test dataset size: {test_size} (Shape: {test_size.shape})")

Image shape: torch.Size([3, 224, 224])
Label: 1
Train dataset size: 17500 (Shape: torch.Size([]))
Test dataset size: 7500 (Shape: torch.Size([]))


![image.png](attachment:image.png)

In [5]:
class BasicBlock(nn.Module):
    def __init__(self, in_channel, out_channel, stride=1, skip=True):
        super().__init__()
        self.skip = skip
        
        self.conv1 = nn.Conv2d(in_channel, out_channel, 3, stride=stride, padding=1, bias=False)
        self.bn1   = nn.BatchNorm2d(out_channel)
        
        self.conv2 = nn.Conv2d(out_channel, out_channel, 3, stride=1, padding=1, bias=False)
        self.bn2   = nn.BatchNorm2d(out_channel)
        
        # downsample = Conv1x1 + BN (stride 반영)
        if skip and (stride != 1 or in_channel != out_channel):
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channel, out_channel, 1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channel)
            )
        else:
            self.downsample = None

        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.skip:
            if self.downsample is not None:
                identity = self.downsample(identity)
            out += identity

        out = self.relu(out)
        return out

class BottleneckBlock(nn.Module):
    expansion = 4

    def __init__(self, in_channel, mid_channel, stride=1, skip=True):
        super().__init__()
        self.skip = skip

        # conv1 (1x1)
        self.conv1 = nn.Conv2d(in_channel, mid_channel, 1, stride=stride, bias=False)
        self.bn1   = nn.BatchNorm2d(mid_channel)

        # conv2 (3x3)
        self.conv2 = nn.Conv2d(mid_channel, mid_channel, 3, stride=1, padding=1, bias=False)
        self.bn2   = nn.BatchNorm2d(mid_channel)

        # conv3 (1x1)
        self.conv3 = nn.Conv2d(mid_channel, mid_channel * self.expansion, 1, bias=False)
        self.bn3   = nn.BatchNorm2d(mid_channel * self.expansion)

        # downsample ● conv1x1 with BN
        out_channel = mid_channel * self.expansion
        if skip and (stride != 1 or in_channel != out_channel):
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channel, out_channel, 1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channel)
            )
        else:
            self.downsample = None

        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.skip:
            if self.downsample is not None:
                identity = self.downsample(identity)
            out += identity

        out = self.relu(out)
        return out

In [6]:
# Resnet 모델 자체를 생성하는 클래스입니다.
class RESnet(nn.Module):
    def __init__(self,num_list = [3,4,6,3] ,skip_connection = True, num_classes=2, bottleneck = False):
        super(RESnet, self).__init__()

        layers = [
            nn.Conv2d(3,64,kernel_size=7,padding=3),
            nn.ReLU(True),
            nn.MaxPool2d(kernel_size=2,stride=2) 
        ]
        channel_list = [64,128,256,512]
        pre_channel = 64
        stride = 1
        for i, (num_blocks, channel) in enumerate(zip(num_list,channel_list)):              
            for j in range(num_blocks):
                if (j == 0) and (i != 0):
                    stride = 2
                    pre_channel = channel_list[i-1] if not bottleneck else 2*channel_list[i] 
                elif (j != 0):
                    stride = 1
                    pre_channel = channel_list[i] if not bottleneck else 4*channel_list[i]
                if bottleneck:
                    layers.append(BottleneckBlock(pre_channel,channel, stride, skip_connection))
                else :
                    layers.append(BasicBlock(pre_channel,channel, stride, skip_connection))
                    
        
        
        c = 2048 if bottleneck else 512
            
        self.feature_extractor = nn.Sequential(*layers)
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(1),
            nn.Linear(c, num_classes)
        )



    def forward(self, x):
        x = self.feature_extractor(x)
        x = self.classifier(x)
        return x

In [7]:
resnet34 = RESnet()
plain34 = RESnet(skip_connection=False)
resnet34(torch.rand((1,3,224,224)))
plain34(torch.rand((1,3,224,224)))
print(resnet34)

RESnet(
  (feature_extractor): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(1, 1), padding=(3, 3))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
    )
    (4): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05,

In [8]:
resnet50 = RESnet(num_list=[3,4,6,3],bottleneck=True)
plain50 = RESnet(bottleneck=True,skip_connection=False)
resnet50(torch.rand((1,3,224,224)))
plain50(torch.rand((1,3,224,224)))
print(resnet50)

RESnet(
  (feature_extractor): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(1, 1), padding=(3, 3))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): BottleneckBlock(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (

In [9]:
EPOCH = 50
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [None]:
import time

current_time = time.time()

resnet34 = RESnet().to(device)

for param in resnet34.parameters():
    param.requires_grad = True

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(resnet34.parameters(), lr=0.001)

resnet34_train_losses = []
resnet34_val_accuracy = []

# === Early Stopping 설정 ===
patience = 5              # 몇 epoch 동안 개선 없으면 멈출지
best_val_acc = 0.0        # 지금까지의 최고 검증 정확도
patience_counter = 0       # 개선 안 된 epoch 수
best_model_path = "best_resnet34.pth"  # 최고 성능 모델 저장 경로

for epoch in range(EPOCH):
    resnet34.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for i, (inputs, labels) in enumerate(trainloader, 0):
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()

        outputs = resnet34(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        if i % 100 == 99:
            print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 100:.3f}")
            running_loss = 0.0

    # 이 부분은 running_loss를 epoch 전체 loss로 쓰고 싶으면
    # 에폭 한 번 동안의 총 loss를 다시 계산해서 나누는 게 더 정확하지만,
    # 일단 기존 구조를 크게 안 건드리고 둘게.
    train_loss = running_loss / max(1, len(trainloader))
    train_acc = 100 * correct / total
    resnet34_train_losses.append(train_loss)

    print(f"Epoch {epoch + 1}: Train Accuracy: {train_acc:.2f}%")

    # ====== Validation ======
    resnet34.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in testloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = resnet34(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_acc = 100 * correct / total
    resnet34_val_accuracy.append(val_acc)

    print(f"Epoch {epoch + 1}: Validation Accuracy: {val_acc:.2f}%")

    # ====== Early Stopping 체크 ======
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        patience_counter = 0
        torch.save(resnet34.state_dict(), best_model_path)
        print(f">>> Best model updated. val_acc = {best_val_acc:.2f}% 저장됨")
    else:
        patience_counter += 1
        print(f">>> EarlyStopping counter: {patience_counter} / {patience}")
        if patience_counter >= patience:
            print(">>> 검증 정확도 개선이 없어 학습을 조기 종료합니다.")
            break

print("Finished Training")
print("총 학습 시간:", time.time() - current_time, "초")

# 나중에 best model 불러올 때
# best_model = RESnet().to(device)
# best_model.load_state_dict(torch.load(best_model_path))
# best_model.eval()

[1,   100] loss: 0.704
[1,   200] loss: 0.662
Epoch 1: Train Accuracy: 59.76%
Epoch 1: Validation Accuracy: 50.08%
>>> Best model updated. val_acc = 50.08% 저장됨
[2,   100] loss: 0.642
[2,   200] loss: 0.625
Epoch 2: Train Accuracy: 63.48%
Epoch 2: Validation Accuracy: 58.68%
>>> Best model updated. val_acc = 58.68% 저장됨
[3,   100] loss: 0.635
[3,   200] loss: 0.606
Epoch 3: Train Accuracy: 65.73%
Epoch 3: Validation Accuracy: 50.07%
>>> EarlyStopping counter: 1 / 5
[4,   100] loss: 0.608
[4,   200] loss: 0.596
Epoch 4: Train Accuracy: 68.00%
Epoch 4: Validation Accuracy: 67.40%
>>> Best model updated. val_acc = 67.40% 저장됨
[5,   100] loss: 0.586
[5,   200] loss: 0.571
Epoch 5: Train Accuracy: 70.23%
Epoch 5: Validation Accuracy: 53.93%
>>> EarlyStopping counter: 1 / 5
[6,   100] loss: 0.549
[6,   200] loss: 0.552
Epoch 6: Train Accuracy: 72.30%
Epoch 6: Validation Accuracy: 66.20%
>>> EarlyStopping counter: 2 / 5
[7,   100] loss: 0.558
[7,   200] loss: 0.524
Epoch 7: Train Accuracy: 73.27

In [None]:
import time

current_time = time.time()

plain34 = RESnet(skip_connection=False).to(device)

for param in plain34.parameters():
    param.requires_grad = True

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(plain34.parameters(), lr=0.001)

plain34_train_losses = []
plain34_val_accuracy = []

# === Early Stopping 설정 ===
patience = 5              # 몇 epoch 동안 개선 없으면 멈출지
best_val_acc = 0.0        # 지금까지의 최고 검증 정확도
patience_counter = 0       # 개선 안 된 epoch 수
best_model_path = "best_plain34.pth"  # 최고 성능 모델 저장 경로

for epoch in range(EPOCH):
    plain34.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for i, (inputs, labels) in enumerate(trainloader, 0):
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()

        outputs = plain34(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        if i % 100 == 99:
            print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 100:.3f}")
            running_loss = 0.0

    # 이 부분은 running_loss를 epoch 전체 loss로 쓰고 싶으면
    # 에폭 한 번 동안의 총 loss를 다시 계산해서 나누는 게 더 정확하지만,
    # 일단 기존 구조를 크게 안 건드리고 둘게.
    train_loss = running_loss / max(1, len(trainloader))
    train_acc = 100 * correct / total
    plain34_train_losses.append(train_loss)

    print(f"Epoch {epoch + 1}: Train Accuracy: {train_acc:.2f}%")

    # ====== Validation ======
    plain34.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in testloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = plain34(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_acc = 100 * correct / total
    plain34_val_accuracy.append(val_acc)

    print(f"Epoch {epoch + 1}: Validation Accuracy: {val_acc:.2f}%")

    # ====== Early Stopping 체크 ======
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        patience_counter = 0
        torch.save(plain34.state_dict(), best_model_path)
        print(f">>> Best model updated. val_acc = {best_val_acc:.2f}% 저장됨")
    else:
        patience_counter += 1
        print(f">>> EarlyStopping counter: {patience_counter} / {patience}")
        if patience_counter >= patience:
            print(">>> 검증 정확도 개선이 없어 학습을 조기 종료합니다.")
            break

print("Finished Training")
print("총 학습 시간:", time.time() - current_time, "초")

# 나중에 best model 불러올 때
# best_model = RESnet().to(device)
# best_model.load_state_dict(torch.load(best_model_path))
# best_model.eval()



Downloading: "https://download.pytorch.org/models/vgg19-dcbb9e9d.pth" to C:\Users\tkdwl/.cache\torch\hub\checkpoints\vgg19-dcbb9e9d.pth


 14%|█▍        | 77.0M/548M [00:06<00:41, 12.0MB/s]

In [None]:
import matplotlib.pyplot as plt

plt.plot(resnet34_train_losses, 'r', label="Resnet34 Training Loss")
plt.plot(plain34_train_losses, 'b', label="Plain34 Training Loss")

plt.title('Model Training Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(loc='upper left')
plt.show()

In [None]:
plt.plot(resnet34_val_accuracy, 'r', label="Resnet34 Validation Accuracy")
plt.plot(plain34_val_accuracy, 'b', label="Plain34 Validation Accuracy")

plt.title('Model Validation Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(loc='upper left')
plt.show()