In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from tqdm.auto import tqdm
import torch.nn.functional as F
import os
import imblearn
import numpy as np
from torch.utils.data import TensorDataset

In [2]:
class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)


class Down(nn.Module):
    """Downscaling with maxpool then double conv"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)

class Up(nn.Module):
    """Upscaling then double conv"""

    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()

        # if bilinear, use the normal convolutions to reduce the number of channels
        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
            self.conv = DoubleConv(in_channels, out_channels, in_channels // 2)
        else:
            self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
            self.conv = DoubleConv(in_channels, out_channels)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        # input is CHW
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])
        # if you have padding issues, see
        # https://github.com/HaiyongJiang/U-Net-Pytorch-Unstructured-Buggy/commit/0e854509c2cea854e247a9c615f175f76fbb2e3a
        # https://github.com/xiaopeng-liao/Pytorch-UNet/commit/8ebac70e633bac59fc22bb5195e513d5832fb3bd
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)


class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)

class unet(nn.Module) :
    def __init__(self, n_channels, n_classes):
        super(unet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes

        self.inc = (DoubleConv(n_channels, 64))
        self.down1 = (Down(64, 128))
        self.down2 = (Down(128, 256))
        self.down3 = (Down(256, 512))
        self.down4 = (Down(512, 1024 ))
        
        self.up1 = (Up(1024, 512, bilinear=False ))
        self.up2 = (Up(512, 256, bilinear=False ))
        self.up3 = (Up(256, 128, bilinear=False ))
        self.up4 = (Up(128, 64, bilinear=False))
        self.outc = (OutConv(64, n_classes))
        self.flatten = nn.Flatten()
        #self.softmax= nn.Softmax(dim=1)
    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        x = self.outc(x)
        x = self.flatten(x)
        #x = self.softmax(x)
        return x

In [3]:
log_dir = './log'  # 훈련 로그 디렉토리
train_dataset_path = "./input/data/train"  # 훈련 데이터셋 경로
val_dataset_path = "./input/data/val"
test_dataset_path = "./input/data/test"  # 테스트 데이터셋 경로
batch_size = 128
lr = 1e-3
epochs = 720
num_classes = 7  # 분류 클래스 수

모델 직접 생성

In [None]:

# 모델 생성
model = unet(1,7)

# 모델을 GPU 또는 CPU로 이동
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = model.to(device)

# 옵티마이저, 손실 함수, 스케줄러 정의
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()
#scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10, verbose=True)

# TensorBoard를 사용하여 훈련 로그 기록
writer = SummaryWriter(log_dir)

완성된 모델 불러오기

In [5]:
# 모델을 GPU 또는 CPU로 이동
model = unet(1,7)
model.load_state_dict(torch.load('./model/unetmodel.h5'))

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = model.to(device)

# 옵티마이저, 손실 함수, 스케줄러 정의
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()
#scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10, verbose=True)

In [6]:
# 훈련 데이터셋 변환
train_transforms = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

train_dataset = datasets.ImageFolder(train_dataset_path, transform=train_transforms)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# # train_dataset에서 데이터 가져오기
# all_images = []
# all_labels = []
# for images, labels in train_dataset:
#     all_images.append(images.numpy())
#     all_labels.append(labels)

# np_all_images = np.concatenate(all_images, axis=0)
# np_all_images = np_all_images.reshape(28709, 48*48)
# # 오버 샘플링 적용
# smote = imblearn.over_sampling.SMOTE()
# oversampled_images, oversampled_labels = smote.fit_resample(np_all_images, all_labels)
# oversampled_labels = np.array(oversampled_labels)
# # PyTorch 데이터셋으로 변환
# from torch.utils.data import TensorDataset

# oversampled_images = torch.from_numpy(oversampled_images.reshape(50505,1,48,48)).float()
# oversampled_labels = torch.from_numpy(oversampled_labels)

# oversampled_dataset = TensorDataset(oversampled_images, oversampled_labels)
# oversampled_loader = DataLoader(oversampled_dataset, batch_size=batch_size, shuffle=True)


# 검증 데이터셋 변환
val_transforms = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

val_dataset = datasets.ImageFolder(val_dataset_path, transform=val_transforms)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

# 테스트 데이터셋 변환
test_transforms = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

test_dataset = datasets.ImageFolder(test_dataset_path, transform=test_transforms)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)


In [7]:
def train():

    # 조기 종료를 위한 변수 정의
    best_acc = 0

    for epoch in range(epochs):
        model.train()
        train_loss = 0.0
        for images, labels in tqdm(iter(train_loader)):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model.forward(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            #scheduler.step(loss)
            train_loss += loss.item()
        # 검증
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for images, labels in tqdm(iter(val_loader)):
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()

        # 훈련 및 검증 손실, 정확도 기록
        writer.add_scalar('Loss/train', train_loss / len(train_loader), epoch)
        writer.add_scalar('Loss/val', val_loss / len(val_loader), epoch)
        writer.add_scalar('Accuracy/val', 100 * correct / total, epoch)

        if best_acc < 100 * correct / total:
            best_acc = 100 * correct / total
            torch.save(model.state_dict(),'./besmodel.h5')


        print(f'epoch [{epoch + 1}/{epochs}], '
            f'train_loss: {train_loss / len(train_loader):.4f}, '
            f'val_loss: {val_loss / len(val_loader):.4f}, '
              f'accuracy: {100 * correct / total:.2f}%')



In [8]:
# 테스트
def test():
  model.eval()
  test_loss = 0.0
  correct = 0
  total = 0
  with torch.no_grad():
      for images, labels in tqdm(iter(test_loader)):
          images, labels = images.to(device), labels.to(device)
          outputs = model.forward(images)
          loss = criterion(outputs, labels)
          test_loss += loss.item()
          _, predicted = outputs.max(1)
          total += labels.size(0)
          correct += predicted.eq(labels).sum().item()

  print(f'test_loss: {test_loss / len(test_loader):.4f}')
  print(f'test_accuracy: {100 * correct / total:.2f}%')

In [None]:
train()

학습은 코랩으로 진행했습니다

In [9]:
test()

  0%|          | 0/28 [00:00<?, ?it/s]

test_loss: 1.9461
test_accuracy: 68.87%


-![nn](result/unet_result/acc.JPG)


-![nn](result/unet_result/train_loss.JPG)


-![nn](result/unet_result/val_loss.JPG)