In [1]:
import torch
import torch.nn as nn

'''

ML 기말 프로젝트 base model 자리에 있질 않네 아주주

* 모델 구조 변경으로 인한 성능 향상은 평가에서 제외.
  구조 변경 예시) Conv & Linear layer 추가

'''

class BaseModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True))

        self.conv1_M = nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True))

        self.conv2_M = nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True))

        self.conv3_M = nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv4 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True))

        self.conv4_M = nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv5 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True))

        self.GAP = nn.AdaptiveAvgPool2d(1)

        self.classifier = nn.Linear(512, 10)

    def forward(self, x):
        out = self.conv1(x)
        out = self.conv1_M(out)

        out = self.conv2(out)
        out = self.conv2_M(out)

        out = self.conv3(out)
        out = self.conv3_M(out)

        out = self.conv4(out)
        out = self.conv4_M(out)

        out = self.conv5(out)
        out = self.GAP(out)
        # 해당 위치 out : classifier 직전 layer의 feature --

        out = out.view(out.size(0), -1)
        out = self.classifier(out)
        return out


In [2]:
# import torch
# from torchvision import datasets, transforms
# from torch.utils.data import DataLoader

# # # train과 test 데이터 디렉토리 경로 설정
# train_dir = '/home/gidaseul/Documents/GitHub/ML_2/datas/MNIST/train'
# test_dir = '/home/gidaseul/Documents/GitHub/ML_2/datas/MNIST/test'

# # 데이터 전처리: MNIST 이미지를 RGB 채널로 처리
# transform = transforms.Compose([
#     transforms.Resize((224, 224)),  # 입력 크기를 모델에 맞게 조정 (BaseModel은 32x32 크기 가정)
#     transforms.ToTensor(),        # 이미지를 텐서로 변환
#     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # RGB 정규화
# ])

# # 데이터셋 로드
# train_dataset = datasets.ImageFolder(root=train_dir, transform=transform)
# test_dataset = datasets.ImageFolder(root=test_dir, transform=transform)

# # DataLoader 생성
# train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=2)
# test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=2)

# # 데이터셋 크기 확인
# print(f"Number of training samples: {len(train_dataset)}")
# print(f"Number of test samples: {len(test_dataset)}")

# # 데이터 클래스 확인
# print(f"Classes: {train_dataset.classes}")


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import numpy as np
import torch.nn.functional as F

class PadToSquare(object):
    """이미지 크기를 최대 크기로 패딩하여 정사각형으로 만듭니다."""
    def __init__(self, padding_value=0):
        self.padding_value = padding_value
        self.to_tensor = transforms.ToTensor()  # PIL 이미지 -> Tensor
        self.to_pil = transforms.ToPILImage()  # Tensor -> PIL 이미지

    def __call__(self, image):
        # PIL 이미지를 텐서로 변환
        image_tensor = self.to_tensor(image)

        # 현재 이미지 크기 가져오기
        _, height, width = image_tensor.shape
        max_size = max(height, width)

        # 패딩 계산 (좌, 상, 우, 하)
        padding = (0, 0, max_size - width, max_size - height)  # (left, top, right, bottom)

        # 패딩 추가 (텐서에 패딩)
        padded_tensor = F.pad(image_tensor, padding, value=self.padding_value)

        # 텐서를 다시 PIL 이미지로 변환
        padded_image = self.to_pil(padded_tensor)
        return padded_image

# 데이터 디렉토리 설정
train_dir = '/home/gidaseul/Documents/GitHub/ML_2/datas/SportsBall/train'
test_dir = '/home/gidaseul/Documents/GitHub/ML_2/datas/SportsBall/test'

# 데이터 전처리: MNIST 이미지를 RGB 채널로 처리
transform = transforms.Compose([
    PadToSquare(),  # 이미지 크기를 가장 큰 크기로 맞추기 위해 패딩
    transforms.Resize((228, 228)),
    transforms.ToTensor(),        # 이미지를 텐서로 변환
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # RGB 정규화
])

# 데이터셋 로드
train_dataset = datasets.ImageFolder(root=train_dir, transform=transform)
test_dataset = datasets.ImageFolder(root=test_dir, transform=transform)

# DataLoader 생성
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=0)

# 데이터셋 크기 확인
print(f"Number of training samples: {len(train_dataset)}")
print(f"Number of test samples: {len(test_dataset)}")

# 데이터 클래스 확인
print(f"Classes: {train_dataset.classes}")


Number of training samples: 1000
Number of test samples: 100
Classes: ['american_football', 'baseball', 'basketball', 'billiard_ball', 'bowling_ball', 'football', 'golf_ball', 'shuttlecock', 'tennis_ball', 'volleyball']


In [4]:

# 모델, 손실 함수, 옵티마이저 정의
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = BaseModel().to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.5)

# 조기 종료 파라미터 설정
early_stopping_patience = 10  # 개선되지 않으면 5 에폭 후 종료
best_test_loss = float('inf')  # 최적의 검증 손실
patience_counter = 0  # 개선되지 않은 에폭 수 카운트

# Feature map 추출을 위한 forward hook 등록
features = []

def hook_fn(module, input, output):
    features.append(output.cpu().detach().numpy())

hook = model.GAP.register_forward_hook(hook_fn)

# 수정된 Feature 추출 함수 (불필요한 글로벌 변수 제거)
def extract_features(model, loader, device):
    features, labels_list = [], []
    model.eval()
    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            # GAP 레이어를 통해 Latent Feature 추출
            latent = model.GAP(model.conv5(model.conv4(model.conv3(model.conv2(model.conv1(images))))))
            latent = latent.view(latent.size(0), -1)
            features.append(latent.cpu().numpy())
            labels_list.extend(labels.cpu().numpy())
    features = np.concatenate(features, axis=0)
    return features, np.array(labels_list)

def plot_tsne(features, labels, num_classes=10):
    tsne = TSNE(n_components=2, random_state=42)
    tsne_results = tsne.fit_transform(features)

    plt.figure(figsize=(10, 8))
    for class_idx in range(num_classes):
        idx = labels == class_idx
        plt.scatter(tsne_results[idx, 0], tsne_results[idx, 1], label=f'Class {class_idx}', alpha=0.6)
    plt.legend()
    plt.title("t-SNE Visualization of Features")
    plt.show()


In [5]:
# 학습 루프
for epoch in range(100):
    # Training Loop
    model.train()
    running_loss, correct, total = 0.0, 0, 0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    train_acc = 100 * correct / total
    train_loss = running_loss / len(train_loader)

    # Evaluation Loop
    model.eval()
    test_loss, correct, total = 0.0, 0, 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            test_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    test_acc = 100 * correct / total
    test_loss = test_loss / len(test_loader)

    print(f"Epoch [{epoch+1}/100]")
    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
    print(f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%")

    # 학습률 조정 (ReduceLROnPlateau 사용)
    scheduler.step(test_loss)  # test_loss를 전달하여 학습률을 조정

    # Early Stopping 체크
    if test_loss < best_test_loss:
        best_test_loss = test_loss
        patience_counter = 0  # 개선이 있었으므로 카운터 초기화
    else:
        patience_counter += 1
        print(f"Patience Counter: {patience_counter}/{early_stopping_patience}")

    # 만약 patience_counter가 early_stopping_patience 이상이면 조기 종료
    if patience_counter >= early_stopping_patience:
        print(f"Early stopping at epoch {epoch+1}")
        break

    # t-SNE Visualization (5 epoch마다 실행)
    if (epoch + 1) % 5 == 0:
        test_features, test_labels = extract_features(model, test_loader, device)
        plot_tsne(test_features, test_labels, num_classes=10)

OutOfMemoryError: CUDA out of memory. Tried to allocate 204.00 MiB. GPU 0 has a total capacity of 23.64 GiB of which 224.12 MiB is free. Process 20650 has 20.82 GiB memory in use. Including non-PyTorch memory, this process has 2.16 GiB memory in use. Of the allocated memory 1.96 GiB is allocated by PyTorch, and 12.40 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

# base 기반으로 latent space의 중요성을 확인하는 방법에 대한 검증

- 새로운 분류기를 정의하고 이를 Latent Space 기반으로 학습 및 평가

In [None]:
# Latent Space 기반 분류기 정의
class LatentClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        return self.fc(x)


In [None]:
# Latent Feature 추출
train_features, train_labels = extract_features(model, train_loader, device)
test_features, test_labels = extract_features(model, test_loader, device)

# TensorDataset으로 변환
train_features = torch.tensor(train_features, dtype=torch.float32)
train_labels = torch.tensor(train_labels, dtype=torch.long)
test_features = torch.tensor(test_features, dtype=torch.float32)
test_labels = torch.tensor(test_labels, dtype=torch.long)

train_dataset_latent = torch.utils.data.TensorDataset(train_features, train_labels)
test_dataset_latent = torch.utils.data.TensorDataset(test_features, test_labels)

# DataLoader 생성
train_loader_latent = torch.utils.data.DataLoader(train_dataset_latent, batch_size=64, shuffle=True)
test_loader_latent = torch.utils.data.DataLoader(test_dataset_latent, batch_size=64, shuffle=False)


- 추가 부분: Latent Feature 추출 및 DataLoader 구성
- 기존 학습 완료된 모델에서 Latent Space의 Feature를 추출하고, 새로운 DataLoader를 구성.

In [None]:
# Latent Feature 추출
train_features, train_labels = extract_features(model, train_loader, device)
test_features, test_labels = extract_features(model, test_loader, device)

# TensorDataset으로 변환
train_features = torch.tensor(train_features, dtype=torch.float32)
train_labels = torch.tensor(train_labels, dtype=torch.long)
test_features = torch.tensor(test_features, dtype=torch.float32)
test_labels = torch.tensor(test_labels, dtype=torch.long)

train_dataset_latent = torch.utils.data.TensorDataset(train_features, train_labels)
test_dataset_latent = torch.utils.data.TensorDataset(test_features, test_labels)

# DataLoader 생성
train_loader_latent = torch.utils.data.DataLoader(train_dataset_latent, batch_size=64, shuffle=True)
test_loader_latent = torch.utils.data.DataLoader(test_dataset_latent, batch_size=64, shuffle=False)


- 추가 부분: Latent Classifier 학습
- Latent Space에서 추출한 Feature를 기반으로 새로운 분류기를 학습 및 평가.


In [None]:
# Latent Classifier 학습
latent_classifier = LatentClassifier(input_dim=train_features.shape[1], num_classes=10).to(device)
criterion_latent = nn.CrossEntropyLoss()
optimizer_latent = optim.Adam(latent_classifier.parameters(), lr=0.001)

# 학습 Loop
num_epochs = 50
for epoch in range(num_epochs):
    # Training Loop
    latent_classifier.train()
    running_loss, correct, total = 0.0, 0, 0
    for features, labels in train_loader_latent:
        features, labels = features.to(device), labels.to(device)

        optimizer_latent.zero_grad()
        outputs = latent_classifier(features)
        loss = criterion_latent(outputs, labels)
        loss.backward()
        optimizer_latent.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    train_acc = 100 * correct / total
    train_loss = running_loss / len(train_loader_latent)

    # Evaluation Loop
    latent_classifier.eval()
    test_loss, correct, total = 0.0, 0, 0
    with torch.no_grad():
        for features, labels in test_loader_latent:
            features, labels = features.to(device), labels.to(device)
            outputs = latent_classifier(features)
            loss = criterion_latent(outputs, labels)
            test_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    test_acc = 100 * correct / total
    test_loss = test_loss / len(test_loader_latent)

    print(f"Epoch [{epoch+1}/{num_epochs}]")
    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
    print(f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%")


Epoch [1/50]
Train Loss: 2.2172, Train Acc: 20.60%
Test Loss: 2.1584, Test Acc: 17.00%
Epoch [2/50]
Train Loss: 2.0711, Train Acc: 29.20%
Test Loss: 2.0976, Test Acc: 28.00%
Epoch [3/50]
Train Loss: 1.9807, Train Acc: 30.80%
Test Loss: 2.0509, Test Acc: 31.00%
Epoch [4/50]
Train Loss: 1.9097, Train Acc: 35.00%
Test Loss: 1.9880, Test Acc: 26.00%
Epoch [5/50]
Train Loss: 1.8473, Train Acc: 36.30%
Test Loss: 1.9561, Test Acc: 29.00%
Epoch [6/50]
Train Loss: 1.8218, Train Acc: 38.60%
Test Loss: 1.9734, Test Acc: 32.00%
Epoch [7/50]
Train Loss: 1.7946, Train Acc: 38.80%
Test Loss: 1.9190, Test Acc: 38.00%
Epoch [8/50]
Train Loss: 1.7483, Train Acc: 39.00%
Test Loss: 1.9286, Test Acc: 32.00%
Epoch [9/50]
Train Loss: 1.7387, Train Acc: 40.40%
Test Loss: 1.9213, Test Acc: 36.00%
Epoch [10/50]
Train Loss: 1.7094, Train Acc: 41.60%
Test Loss: 1.9034, Test Acc: 37.00%
Epoch [11/50]
Train Loss: 1.6966, Train Acc: 42.10%
Test Loss: 1.8768, Test Acc: 39.00%
Epoch [12/50]
Train Loss: 1.6867, Train A