## Pytorch 기반 앙상블 구현

In [None]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from torchvision.models import resnet50
import numpy as np
from tqdm import tqdm

In [None]:
# CIFAR-10 데이터셋 로드
transform = transforms.Compose([
   transforms.ToTensor(),
   transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=128, shuffle=False)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170M/170M [00:04<00:00, 39.7MB/s]


Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified


In [None]:
# CNN 모델 정의
class CNNModel(nn.Module):
   def __init__(self, n_hidden_node, dropout_prob):
       super(CNNModel, self).__init__()

       # ResNet50 backbone
       self.resnet = resnet50(pretrained=True)
       for param in self.resnet.parameters():
           param.requires_grad = False

       # ResNet의 마지막 FC 층 제거
       self.resnet = nn.Sequential(*list(self.resnet.children())[:-1])

       # 새로운 분류기 추가
       self.flatten = nn.Flatten()
       self.fc1 = nn.Linear(2048, n_hidden_node)
       self.relu = nn.ReLU()
       self.dropout = nn.Dropout(dropout_prob)
       self.fc2 = nn.Linear(n_hidden_node, 10)

   def forward(self, x):
       x = self.resnet(x)
       x = self.flatten(x)
       x = self.fc1(x)
       x = self.relu(x)
       x = self.dropout(x)
       x = self.fc2(x)
       return x

# 모델 학습 함수
def train_model(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0

    num_epochs = 1
    for epoch in range(num_epochs):
        progress_bar = tqdm(train_loader, desc='Training')
        for inputs, labels in progress_bar:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            progress_bar.set_postfix({'loss': running_loss/len(train_loader)})

    return running_loss / len(train_loader)

# 모델 평가 함수
def evaluate_model(model, test_loader, device):
   model.eval()
   predictions = []
   with torch.no_grad():
       for inputs, _ in test_loader:
           inputs = inputs.to(device)
           outputs = model(inputs)
           predictions.extend(outputs.cpu().numpy())
   return np.array(predictions)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 5개의 기본 모델 생성
models = [
    CNNModel(1024, 0.5),
    CNNModel(1024, 0.6),
    CNNModel(1024, 0.7),
    CNNModel(1024, 0.8),
    CNNModel(1024, 0.9)
]

# 각 모델 학습
criterion = nn.CrossEntropyLoss()
for i, model in enumerate(models, 1):
    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    print(f"\nTraining model {i}")
    train_model(model, train_loader, criterion, optimizer, device)

# 각 모델의 예측값 계산
predictions = []
for model in models:
    pred = evaluate_model(model, test_loader, device)
    predictions.append(pred)

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 134MB/s]



Training model 1


Training: 100%|██████████| 391/391 [00:23<00:00, 16.51it/s, loss=1.66]



Training model 2


Training: 100%|██████████| 391/391 [00:22<00:00, 17.27it/s, loss=1.7]



Training model 3


Training: 100%|██████████| 391/391 [00:24<00:00, 16.09it/s, loss=1.76]



Training model 4


Training: 100%|██████████| 391/391 [00:22<00:00, 17.02it/s, loss=1.85]



Training model 5


Training: 100%|██████████| 391/391 [00:21<00:00, 17.80it/s, loss=2.05]


In [None]:
# 앙상블 예측
ensemble_pred = np.mean(predictions, axis=0)
ensemble_classes = np.argmax(ensemble_pred, axis=1)

# 정확도 계산
test_labels = np.array([label for _, label in test_loader.dataset])
accuracy = np.mean(ensemble_classes == test_labels)
print(f"\nEnsemble Accuracy: {accuracy:.4f}")


Ensemble Accuracy: 0.4917


In [None]:
# 추가 2개 모델 생성
additional_models = [
    CNNModel(512, 0.5),
    CNNModel(256, 0.5)
]

# 추가 모델 학습
for i, model in enumerate(additional_models, 6):
    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    print(f"\nTraining additional model {i}")
    train_model(model, train_loader, criterion, optimizer, device)

# 추가 모델의 예측값 계산
additional_predictions = []
for model in additional_models:
    pred = evaluate_model(model, test_loader, device)
    additional_predictions.append(pred)

# 모든 모델을 포함한 최종 앙상블
final_predictions = predictions + additional_predictions
final_ensemble_pred = np.mean(final_predictions, axis=0)
final_ensemble_classes = np.argmax(final_ensemble_pred, axis=1)

# 최종 정확도 계산
final_accuracy = np.mean(final_ensemble_classes == test_labels)
print(f"\nFinal Ensemble Accuracy: {final_accuracy:.4f}")


Training additional model 6


Training: 100%|██████████| 391/391 [00:22<00:00, 17.13it/s, loss=1.66]



Training additional model 7


Training: 100%|██████████| 391/391 [00:22<00:00, 17.34it/s, loss=1.7]



Final Ensemble Accuracy: 0.4952
