일반적인 validation을 수행할 경우 아래 코드와 같이 수행하면 된다.

In [None]:
class EarlyStopping:
    """Early stops the training if validation loss doesn't improve after a given patience."""
    def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pt'):
        self.patience = patience                    # 학습이 개선되지 않을 경우 참을 횟수
        self.verbose = verbose                      # 학습 개선 메세지를 출력할지 여부 결정
        self.counter = 0                            # 학습이 개선되지 않은 횟수
        self.best_score = None                      # 지금까지 가장 최적의 loss
        self.early_stop = False                     # early_stop을 해야하는 경우 True로 값 변경
        self.val_loss_min = np.Inf                  
        self.delta = delta                          # 최소한의 loss값 개선 수준
        self.path = path                            # 모델을 저장할 주소

    def __call__(self, val_loss, model):
        score = -val_loss                           # loss값을 음수로 바꾼다.

        if self.best_score is None:                 # best_score에 score가 저장된 적이 없다면 
            self.best_score = score                 # 해당값을 best_score에 저장하고
            self.save_checkpoint(val_loss, model)   # loss값과 model을 저장한다.
        elif score < self.best_score + self.delta:  # best_score에 delta를 더한 값보다 score가 작다면 == 이전 최대 점수에 delta를 뺀 값보다 score가 크다면 (delta값은 어느 정도의 학습을 강제하는 느낌인 것 같다.)
            self.counter += 1                       # counter값을 갱신하고
            print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:       # counter값이 patience보다 크다면
                self.early_stop = True              # early_stop을 True로 갱신한다.
        else:
            self.best_score = score                 # 만약 loss값이 알맞게 감소했다면
            self.save_checkpoint(val_loss, model)   # loss값과 model을 저장한다.
            self.counter = 0                        # counter값도 다시 0으로 초기화한다.

    def save_checkpoint(self, val_loss, model):
        '''validation loss가 감소하면 모델을 저장한다.'''
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

cross-validation일 경우 아래 코드를 수행한다.

In [17]:
class EarlyStopping:
    """Early stops the training if validation loss doesn't improve after a given patience."""
    def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pt', model_name = None, fold=None):
        self.fold = fold
        self.model_name = model_name
        
        self.patience = patience                    # 학습이 개선되지 않을 경우 참을 횟수
        self.verbose = verbose                      # 학습 개선 메세지를 출력할지 여부 결정
        self.counter = 0                            # 학습이 개선되지 않은 횟수
        self.best_score = None                      # 지금까지 가장 최적의 loss
        self.early_stop = False                     # early_stop을 해야하는 경우 True로 값 변경
        self.val_loss_min = np.Inf                  
        self.delta = delta                          # 최소한의 loss값 개선 수준
        self.path = path                            # 모델을 저장할 주소
        
    def __call__(self, val_loss, model):
        score = -val_loss                           # loss값을 음수로 바꾼다.

        if self.best_score is None:                 # best_score에 score가 저장된 적이 없다면 
            self.best_score = score                 # 해당값을 best_score에 저장하고
            self.save_checkpoint(val_loss, model)   # loss값과 model을 저장한다.
        elif score < self.best_score + self.delta:  # best_score에 delta를 더한 값보다 score가 작다면 == 이전 최대 점수에 delta를 뺀 값보다 score가 크다면 (delta값은 어느 정도의 학습을 강제하는 느낌인 것 같다.)
            self.counter += 1                       # counter값을 갱신하고
            print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:       # counter값이 patience보다 크다면
                self.early_stop = True              # early_stop을 True로 갱신한다.
        else:
            self.best_score = score                 # 만약 loss값이 알맞게 감소했다면
            self.save_checkpoint(val_loss, model)   # loss값과 model을 저장한다.
            self.counter = 0                        # counter값도 다시 0으로 초기화한다.

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decrease.'''
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        if self.fold is not None and self.model_name is not None:
            torch.save(model.state_dict(), self.model_name + '_'+ str(self.fold)+'fold_'+self.path)
        else:
            torch.save(model.state_dict(),self.path)
        self.val_loss_min = val_loss

참고 출처: https://quokkas.tistory.com/37

## <strong> 적용 </strong>

이전에 Cross Validation 코드를 가져와 early stopping을 적용해보자.

In [18]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets
import torchvision.transforms as transforms

MNIST 데이터를 load해준다.

In [19]:
transform = transforms.Compose([
                                transforms.ToTensor()
])
train_dataset = datasets.MNIST('./MNIST', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST('./MNIST', train=False, download=True, transform=transform)

len_train = len(train_dataset)
print(len_train)

60000


학습시킬 모델을 구성한다.

In [20]:
class ConvNet(nn.Module):
    def __init__(self,h1=96):
        super(ConvNet, self).__init__()

        # input: 1*28*28 
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        # ouput: 16*14*14

        self.fc = nn.Sequential(
            nn.Linear(16*14*14, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Linear(512, 10)   
        )

    def forward(self, x):
        x = self.conv1(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x

gpu를 설정한다.

In [21]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
torch.manual_seed(1)
if device == 'cuda':
  torch.cuda.manual_seed(1)
  torch.cuda.manual_seed_all(1)

교차 검증을 위해 KFold 객체를 선언한다.

In [22]:
from sklearn.model_selection import KFold

splits = KFold(n_splits=5, shuffle=True, random_state=1)
fold_dict = {}

train, validation의 각 epoch에 사용할 함수들을 정의한다.

In [23]:
def train_epoch(model, device, dataloader, loss_fn, optimizer):
  train_loss, train_correct = 0.0, 0
  model.train()
  for images, labels in dataloader:
    images = images.to(device)
    labels = labels.to(device)

    optimizer.zero_grad()                                 # optimizer의 모든 파라미터들의 grad를 초기화한다.
    hypothesis = model(images)                            # images를 model에 넣어주고 예측한다.
    loss = loss_fn(hypothesis, labels)                    # loss값을 구한다.
    loss.backward()                                       # loss값에 대해 미분을 수행한다.
    optimizer.step()                                      # 학습을 진행한다. (optimizer를 한 단계 수행한다.)
    train_loss += loss.item() * images.size(0)            # loss값에 image의 개수를 곱하고 저장한다.
    scores, predictions = torch.max(hypothesis.data, 1)   # max로 가장 높게 예측한 값의 인덱스와 값(정수)를 뽑아준다.
    train_correct += (predictions == labels).sum().item() # 인덱스가 label과 맞는지 확인하고 합을 구해서 맞힌 개수를 저장한다.
  
  return train_loss, train_correct

In [24]:
# train 참고
def valid_epoch(model, device, dataloader, loss_fn):
  valid_loss, val_correct = 0.0, 0
  model.eval()
  for images, labels in dataloader:
      images = images.to(device)
      labels = labels.to(device)
      prediction = model(images)
      loss=loss_fn(prediction,labels)
      valid_loss+=loss.item()*images.size(0)
      scores, predictions = torch.max(prediction.data,1)
      val_correct+=(predictions == labels).sum().item()

  return valid_loss,val_correct

학습시킨다.

In [26]:
import numpy as np
from torch.utils.data import SubsetRandomSampler

criterion = nn.CrossEntropyLoss()
batch_size = 128
num_epochs = 10

for fold, (train_idx,val_idx) in enumerate(splits.split(np.arange(len(train_dataset)))):
    print('Fold {}'.format(fold+1))
    early_stopping = EarlyStopping(patience=2, verbose=True, fold=fold+1, model_name = 'ConvNet')

    train_sampler = SubsetRandomSampler(train_idx)
    valid_sampler = SubsetRandomSampler(val_idx)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)
    valid_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=valid_sampler)
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    model = ConvNet()
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    history = {'train_loss': [], 'valid_loss': [],'train_acc':[],'valid_acc':[]}

    for epoch in range(num_epochs):
        train_loss, train_correct=train_epoch(model,device,train_loader,criterion,optimizer)
        valid_loss, val_correct=valid_epoch(model,device,valid_loader,criterion)

        train_loss = train_loss / len(train_loader.sampler)
        train_acc = train_correct / len(train_loader.sampler) * 100
        valid_loss = valid_loss / len(valid_loader.sampler)
        valid_acc = val_correct / len(valid_loader.sampler) * 100

        print(f"[Epoch:{epoch+1}/{num_epochs}] AVG Training Loss/Acc: {train_loss:.3f}/{train_acc:.2f}, AVG Test Loss/Acc: {valid_loss:.3f}/{valid_acc:.2f}")
        history['train_loss'].append(train_loss)
        history['valid_loss'].append(valid_loss)
        history['train_acc'].append(train_acc)
        history['valid_acc'].append(valid_acc)

        early_stopping(valid_loss, model)
        if early_stopping.early_stop:
            print("Early stopping")
            break

    fold_dict['fold{}'.format(fold+1)] = history

Fold 1
[Epoch:1/10] AVG Training Loss/Acc: 0.131/96.24, AVG Test Loss/Acc: 0.065/98.07
Validation loss decreased (inf --> 0.065480).  Saving model ...
[Epoch:2/10] AVG Training Loss/Acc: 0.041/98.72, AVG Test Loss/Acc: 0.058/98.30
Validation loss decreased (0.065480 --> 0.058408).  Saving model ...
[Epoch:3/10] AVG Training Loss/Acc: 0.021/99.42, AVG Test Loss/Acc: 0.053/98.40
Validation loss decreased (0.058408 --> 0.052824).  Saving model ...
[Epoch:4/10] AVG Training Loss/Acc: 0.013/99.65, AVG Test Loss/Acc: 0.062/98.21
EarlyStopping counter: 1 out of 2
[Epoch:5/10] AVG Training Loss/Acc: 0.010/99.71, AVG Test Loss/Acc: 0.058/98.41
EarlyStopping counter: 2 out of 2
Early stopping
Fold 2
[Epoch:1/10] AVG Training Loss/Acc: 0.128/96.28, AVG Test Loss/Acc: 0.065/97.99
Validation loss decreased (inf --> 0.064749).  Saving model ...
[Epoch:2/10] AVG Training Loss/Acc: 0.040/98.82, AVG Test Loss/Acc: 0.054/98.32
Validation loss decreased (0.064749 --> 0.053900).  Saving model ...
[Epoch:3

학습시켰던 모델들을 불러온다.

In [27]:
model1 = ConvNet(); model1.load_state_dict(torch.load('ConvNet_0fold_checkpoint.pt'))
model2 = ConvNet(); model2.load_state_dict(torch.load('ConvNet_1fold_checkpoint.pt'))
model3 = ConvNet(); model3.load_state_dict(torch.load('ConvNet_2fold_checkpoint.pt'))
model4 = ConvNet(); model4.load_state_dict(torch.load('ConvNet_3fold_checkpoint.pt'))
model5 = ConvNet(); model5.load_state_dict(torch.load('ConvNet_4fold_checkpoint.pt'))

<All keys matched successfully>

In [28]:
batch_size = 128
test_loader = DataLoader(test_dataset, batch_size=batch_size)

모델의 성능을 확인한다. <br>
학습시 추론은 각 모델이 추론한 값을 평균하여 구했다.

In [38]:
def test(models):
  for model in models:
    model.eval()

  with torch.no_grad():
    accuracy = 0
    len_models = len(models)       # 모델 개수
    total_data = len(test_dataset) # 전체 test_dataset 개수
    for x, y in test_loader:
      x = x.to(device)
      y = y.to(device)
      prediction = 0
      for model in models:
        prediction +=  model(x) / len_models
      
      _, prediction = torch.max(prediction, 1)
      correct_prediction = (prediction == y) 
      accuracy += torch.sum(correct_prediction) / total_data
    return accuracy

In [39]:
models = [model1, model2, model3, model4, model5]
test(models).item()

0.9899997115135193