강의_6기_AI응용_5차시_01_Lenet.ipynb

In [1]:
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms

import torch.optim as optim
from torch.cuda.amp import autocast, GradScaler
from torch.utils.data import Dataset, TensorDataset, DataLoader, Subset
from torchvision.models import resnet18

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.metrics import (
    confusion_matrix,
    classification_report,
    precision_score,
    recall_score,
    f1_score
)

import os
import zipfile
from PIL import Image
from tqdm import tqdm
from glob import glob
import warnings

print(f"PyTorch 버전: {torch.__version__}")
print(f"NumPy 버전: {np.__version__}")

device = 'cuda' if torch.cuda.is_available() else 'cpu'

torch.manual_seed(55)

PyTorch 버전: 2.9.1+cu130
NumPy 버전: 2.2.6


<torch._C.Generator at 0x23bbbf1bfd0>

In [2]:
# 폴더
from pathlib import Path
import os

folder = Path('C:', 'Dev', 'rokey', 'AI_basic', 'trial_class', 'project1_251111', 'data')

train_dir = folder.joinpath('dat', 'train')
val_dir = folder.joinpath('dat', 'test')

print(folder)
for _ in folder.iterdir():
    print(_.name)

C:Dev\rokey\AI_basic\trial_class\project1_251111\data
test_data
train_data
train_data.csv


In [13]:
class MyDataset(Dataset):
    def __init__(self, data_path, transform_img=None, train=True):
        self.name2label = pd.read_csv(data_path.joinpath("train_data.csv"))
        self.train = train
        if self.train:
            tr_path = data_path.joinpath("train_data")
            self.imgs = list(tr_path.rglob("*.png"))
            self.labels = [self.name2label[p.parent.name] for p in self.imgs]
        else:
            vl_path = data_path.joinpath("val_data")
            self.imgs = list(vl_path.rglob("*.png"))

    def __len__(self):
        return len(self.imgs)

    def __getitem__(self, index):
        img = Image.open(self.imgs[index])
        if img.mode != 'RGB':
            img = img.convert('RGB')

        if self.transform_img:
            img = self.transform_img(img)

        if self.train:
            if self.transform_la:
                label = self.labels[index]
            return img, label
        else:
            return img, self.imgs[index].name


In [11]:
mean = (0.4377, 0.4438, 0.4728)    # 채널별 평균
std  = (0.1980, 0.2010, 0.1970)    # 채널별 표준편차

train_ten = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std),
])
val_ten = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std)
])

In [12]:
tr_dataset = MyDataset(folder, train_ten, train=True)
vl_dataset = MyDataset(folder, val_ten, train=False)

UnboundLocalError: cannot access local variable 'tr_path' where it is not associated with a value

In [None]:
tr_dataset = MyDataset(folder, train_ten, train=True)
vl_dataset = MyDataset(folder, val_ten, train=False)

tr = DataLoader(tr_dataset, batch_size=128, shuffle=True)
vl = DataLoader(vl_dataset, batch_size=128, shuffle=False)

In [None]:
import random
import matplotlib.pyplot as plt

# 데이터셋의 전체 길이 확인 (선택사항)
print(f"전체 데이터 개수: {len(tr_dataset)}")

# 무작위 인덱스 5개 추출
indices = random.sample(range(len(tr_dataset)), 5)

# 시각화 또는 데이터 확인
plt.figure(figsize=(15, 3))
for i, idx in enumerate(indices):
    data = tr_dataset[idx]
    
    # 데이터가 (이미지, 라벨) 튜플 형태라고 가정
    image, label = data 
    
    # 이미지 텐서를 numpy로 변환 (필요한 경우)
    if hasattr(image, 'permute'):
        image = image.permute(1, 2, 0).numpy() # (C, H, W) -> (H, W, C)
        
    plt.subplot(1, 5, i + 1)
    plt.imshow(image)
    plt.title(f"Label: {label}")
    plt.axis('off')

plt.show()

In [None]:
# LeNet 클래스를 정의함. nn.Module을 상속받아 PyTorch 신경망 모듈로 만듦.
class LeNet(nn.Module):
    # 클래스의 인스턴스를 초기화함. 레이어들을 정의함.
    def __init__(self):
        # 부모 클래스(nn.Module)의 생성자를 호출함.
        super(LeNet, self).__init__()
        # 첫 번째 합성곱 레이어(cn1)를 정의함.
        # 입력 채널 3개(RGB 이미지), 출력 특징 맵 6개, 커널 크기 5x5를 사용함.
        self.cn1 = nn.Conv2d(3, 6, 5)
        # 두 번째 합성곱 레이어(cn2)를 정의함.
        # 입력 채널 6개, 출력 특징 맵 16개, 커널 크기 5x5를 사용함.
        self.cn2 = nn.Conv2d(6, 16, 5)
        # 첫 번째 완전 연결 레이어(fc1)를 정의함.
        # 입력 크기는 16 * 5 * 5 (이전 레이어의 출력 특징 맵 수 * 공간 크기), 출력 크기는 120임.
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        # 두 번째 완전 연결 레이어(fc2)를 정의함.
        # 입력 120, 출력 84를 사용함.
        self.fc2 = nn.Linear(120, 84)
        # 세 번째 완전 연결 레이어(fc3)를 정의함.
        # 입력 84, 최종 출력 클래스 수 10개를 사용함.
        self.fc3 = nn.Linear(84, 10)

    # 데이터가 신경망을 통과하는 순서를 정의함 (순전파).
    def forward(self, x):
        # cn1을 적용하고 ReLU 활성화 함수를 통과시킴.
        x = F.relu(self.cn1(x))
        # 2x2 크기의 맥스 풀링을 적용함. 공간 크기를 절반으로 줄임.
        x = F.max_pool2d(x, (2, 2))
        # cn2를 적용하고 ReLU 활성화 함수를 통과시킴.
        x = F.relu(self.cn2(x))
        # 2x2 크기의 맥스 풀링을 다시 적용함.
        x = F.max_pool2d(x, (2, 2))
        # 데이터를 평탄화(flatten)함. 배치 차원(-1)을 제외한 모든 차원을 하나의 벡터로 만듦.
        x = x.view(-1, self.flattened_features(x))
        # 첫 번째 완전 연결 레이어와 ReLU를 통과시킴.
        x = F.relu(self.fc1(x))
        # 두 번째 완전 연결 레이어와 ReLU를 통과시킴.
        x = F.relu(self.fc2(x))
        # 최종 출력 레이어를 통과시킴.
        x = self.fc3(x)
        # 최종 결과를 반환함.
        return x

    # 데이터를 평탄화하기 위해 특징들의 총 개수를 계산하는 헬퍼 함수임.
    def flattened_features(self, x): # (batch, C, H, W) -> C*H*W
        # 배치 차원(첫 번째 차원)을 제외한 나머지 차원들의 크기를 가져옴.
        # -> 이미지 사이즈 확인작업
        size = x.size()[1:]
        num_feats = 1
        # 모든 차원 크기를 곱하여 총 특징 개수를 계산함.
        for s in size:
            num_feats *= s
        # 총 특징 개수를 반환함.
        return num_feats

# LeNet 클래스의 인스턴스를 생성함.
model = LeNet()
# 생성된 모델의 구조를 출력함.
print(model)

In [None]:
# 학습함수
import torch.nn.functional as F
from torch.optim.lr_scheduler import CosineAnnealingLR

checkpoints = folder.joinpath("checkpoints")

scaler = GradScaler()

criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
scheduler = CosineAnnealingLR(optimizer, T_max=100, eta_min=0.001)

In [None]:
# train, eval 함수
def train_one_epoch(model, loader, optimizer, criterion):
    model.train()
    tot_loss, tot_acc, tot_cnt = 0.0, 0.0, 0

    pbar = tqdm(loader, desc='훈련')
    for i, (x, y) in enumerate(pbar):
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        with autocast(device_type=device):
            out = model(x)
            loss = criterion(out, y)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        current_lr = optimizer.param_groups[0]['lr']
        scheduler.step()

        tot_loss += loss.item() * y.size(0)
        tot_acc  += (out.argmax(1) == y).float().sum().item() # 정답수 누적
        tot_cnt  += y.size(0)

        pbar.set_postfix({'LR': current_lr})

    return tot_loss/tot_cnt, tot_acc/tot_cnt, current_lr

def evaluate(model, loader, criterion):
    model.eval()
    tot_loss, tot_acc, tot_cnt = 0.0, 0.0, 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)

            with autocast(device_type=device):
                out = model(x)
                loss = criterion(out, y)

            tot_loss += loss.item() * y.size(0)
            tot_acc  += (out.argmax(1) == y).float().sum().item() # 정답수 누적
            tot_cnt  += y.size(0)

    return tot_loss/tot_cnt, tot_acc/tot_cnt

In [None]:
tr_hist, te_hist, lr_hist = [], [], []
te_loss, best_te_loss, patience_counter = 0, 0, 0
for ep in range(1, 5):
    tr_loss, tr_acc, current_lr = train_one_epoch(model, train_loader, optimizer, criterion) # tr
    te_loss, te_acc = evaluate(model, val_loader, criterion) # va

    tr_hist.append((tr_loss, tr_acc))
    te_hist.append((te_loss, te_acc))
    lr_hist.append(current_lr)
    print(f"Epoch {ep}/{EPOCHS} | train /////{tr_acc:.3f}/////{tr_loss:.3f} | test /////{te_acc:.3f}/////{te_loss:.3f}")
    if ep % 5 == 0:
        print(f"Epoch {ep}/{EPOCHS} | train {tr_acc:.3f}/{tr_loss:.3f} | test {te_acc:.3f}/{te_loss:.3f}")

        torch.save(checkpoints.joinpath(f"ep{ep}.pth"))
        print(f"Checkpoint saved: {checkpoints.joinpath(f"ep{ep}.pth")}")

        if te_loss < best_te_loss:
            best_te_loss = te_loss
            patience_counter = 0
        else:
            patience_counter += 1

        if patience_counter >= 5:
            print(f"Early stopping at epoch {ep}")
            break

print('학습완료')