<a href="https://colab.research.google.com/github/JinSeokH/2024_Sejong_AI/blob/main/AI%EA%B3%B5%EB%AA%A8%EC%A0%84(%EC%84%B8%EC%A2%85)_short.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
import os
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
import matplotlib.pyplot as plt
from torchvision import transforms, datasets
from google.colab import drive
drive.mount('/content/drive')
import sys
sys.path.append("/content/drive/MyDrive/AI경진대회/segmentation_new")
from unet  import *
from dataprocessing  import *

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [8]:
## 이미지와 레이블맵 파일들의 디렉토리와 파일 확장자 설정
dir_data = '/content/drive/MyDrive/AI경진대회/segmentation_new'
img_dir = '/content/drive/MyDrive/AI경진대회/segmentation_new/images'
labelmap_dir = '/content/drive/MyDrive/AI경진대회/segmentation_new/labels'

## train/test/val 폴더 내 파일 개수
nframe_train = 24
nframe_val = 3
nframe_test = 3

dir_save_train = os.path.join(dir_data, 'train')
dir_save_val = os.path.join(dir_data, 'val')
dir_save_test = os.path.join(dir_data, 'test')

In [9]:
# 훈련 파라미터 설정하기
lr = 1e-3
batch_size = 5
num_epoch = 2
base_dir = '/content/drive/MyDrive/DACrew/unet'
data_dir = dir_data
ckpt_dir = os.path.join(base_dir, "checkpoint")
log_dir = os.path.join(base_dir, "log")

# 훈련을 위한 Transform과 DataLoader
transform = transforms.Compose([Normalization(mean=0.5, std=0.5), RandomFlip(), ToTensor()])

dataset_train = Dataset(data_dir=os.path.join(data_dir, 'train'), transform=transform)
loader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True, num_workers=4)

dataset_val = Dataset(data_dir=os.path.join(data_dir, 'val'), transform=transform)
loader_val = DataLoader(dataset_val, batch_size=batch_size, shuffle=False, num_workers=4)

# 네트워크 생성하기
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
net = UNet().to(device)

# 손실함수 정의하기
fn_loss = torch.nn.CrossEntropyLoss().to(device)

# Optimizer 설정하기
optim = torch.optim.Adam(net.parameters(), lr=lr)

# 그밖에 부수적인 variables 설정하기
num_data_train = len(dataset_train)
num_data_val = len(dataset_val)

num_batch_train = np.ceil(num_data_train / batch_size)
num_batch_val = np.ceil(num_data_val / batch_size)

# 그 밖에 부수적인 functions 설정하기
fn_tonumpy = lambda x: x.to('cpu').detach().numpy().transpose(0,2,3,1)

fn_tonumpy_output = lambda x: x.detach().to('cpu').numpy().transpose(0,2,3,1)

fn_denorm = lambda x, mean, std: (x * std) + mean
fn_class = lambda x: 1.0 * (x > 0.5)

# Tensorboard 를 사용하기 위한 SummaryWriter 설정
writer_train = SummaryWriter(log_dir=os.path.join(log_dir, 'train'))
writer_val = SummaryWriter(log_dir=os.path.join(log_dir, 'val'))

# 네트워크 학습시키기
st_epoch = 0
# 학습한 모델이 있을 경우 모델 로드하기
import re

def extract_number_from_filename(filename):
    numbers = re.findall(r'\d+', filename)
    return int(numbers[0]) if numbers else 0

# ckpt_lst.sort(key=lambda f: extract_number_from_filename(f))
# net, optim, st_epoch = load(ckpt_dir=ckpt_dir, net=net, optim=optim)


In [10]:
for epoch in range(st_epoch + 1, num_epoch + 1):
        net.train()
        loss_arr = []

        for batch, data in enumerate(loader_train, 1):
            # forward pass
            label = data['label'].to(device)
            input = data['input'].to(device)
            output = net(input)

            # backward pass
            optim.zero_grad()
            loss = fn_loss(output, label.argmax(dim=1))
            loss.backward()
            optim.step()
            _, output = torch.max(output, 1)

            # 손실함수 계산
            loss_arr += [loss.item()]
            print("TRAIN: EPOCH %04d / %04d | BATCH %04d / %04d | LOSS %.4f" %
                  (epoch, num_epoch, batch, num_batch_train, np.mean(loss_arr)))

            # Tensorboard 저장하기
            label = fn_tonumpy(label)
            label = np.argmax(label, axis=-1)
            label = np.expand_dims(label, axis=-1)

            input = fn_tonumpy(fn_denorm(input, mean=0.5, std=0.5))
            output = torch.unsqueeze(output, axis = -3 )
            output = fn_tonumpy_output(output) # fn_class 없애줌

            writer_train.add_image('label', label, num_batch_train * (epoch - 1) + batch, dataformats='NHWC')
            writer_train.add_image('input', input, num_batch_train * (epoch - 1) + batch, dataformats='NHWC')
            writer_train.add_image('output', output, num_batch_train * (epoch - 1) + batch, dataformats='NHWC')

        writer_train.add_scalar('loss', np.mean(loss_arr), epoch)

        with torch.no_grad():
            net.eval()
            loss_arr = []

            for batch, data in enumerate(loader_val, 1):
                # forward pass
                label = data['label'].to(device)
                input = data['input'].to(device)

                output = net(input)

                # 손실함수 계산하기
                loss = fn_loss(output, label.argmax(dim=1))
                _, output = torch.max(output, 1)
                loss_arr += [loss.item()]
                print("VALID: EPOCH %04d / %04d | BATCH %04d / %04d | LOSS %.4f" %
                      (epoch, num_epoch, batch, num_batch_val, np.mean(loss_arr)))

                # Tensorboard 저장하기

                label = fn_tonumpy(label)
                label = np.argmax(label, axis=-1)
                label = np.expand_dims(label, axis=-1)

                input = fn_tonumpy(fn_denorm(input, mean=0.5, std=0.5))
                output = torch.unsqueeze(output, axis = -3 )
                output = fn_tonumpy_output(output)

                writer_val.add_image('label', label, num_batch_val * (epoch - 1) + batch, dataformats='NHWC')
                writer_val.add_image('input', input, num_batch_val * (epoch - 1) + batch, dataformats='NHWC')
                writer_val.add_image('output', output, num_batch_val * (epoch - 1) + batch, dataformats='NHWC')

        writer_val.add_scalar('loss', np.mean(loss_arr), epoch)

        # epoch 50마다 모델 저장하기
        if epoch % 71 == 0:
            save(ckpt_dir=ckpt_dir, net=net, optim=optim, epoch=epoch)

        writer_train.close()
        writer_val.close()

TRAIN: EPOCH 0001 / 0002 | BATCH 0001 / 0005 | LOSS 1.3280
TRAIN: EPOCH 0001 / 0002 | BATCH 0002 / 0005 | LOSS 1.3050
TRAIN: EPOCH 0001 / 0002 | BATCH 0003 / 0005 | LOSS 1.2420
TRAIN: EPOCH 0001 / 0002 | BATCH 0004 / 0005 | LOSS 1.1986
TRAIN: EPOCH 0001 / 0002 | BATCH 0005 / 0005 | LOSS 1.1458
VALID: EPOCH 0001 / 0002 | BATCH 0001 / 0001 | LOSS 1.2728
TRAIN: EPOCH 0002 / 0002 | BATCH 0001 / 0005 | LOSS 0.8348
TRAIN: EPOCH 0002 / 0002 | BATCH 0002 / 0005 | LOSS 0.8021
TRAIN: EPOCH 0002 / 0002 | BATCH 0003 / 0005 | LOSS 0.7772
TRAIN: EPOCH 0002 / 0002 | BATCH 0004 / 0005 | LOSS 0.7567
TRAIN: EPOCH 0002 / 0002 | BATCH 0005 / 0005 | LOSS 0.7340
VALID: EPOCH 0002 / 0002 | BATCH 0001 / 0001 | LOSS 1.2287


In [None]:
transform = transforms.Compose([Normalization(mean=0.5, std=0.5), ToTensor()])

dataset_test = Dataset(data_dir=os.path.join(data_dir, 'test'), transform=transform)
loader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=False, num_workers=8)

# 그밖에 부수적인 variables 설정하기
num_data_test = len(dataset_test)
num_batch_test = np.ceil(num_data_test / batch_size)

# 결과 디렉토리 생성하기
result_dir = os.path.join(base_dir, 'result_new_test')
if not os.path.exists(result_dir):
    os.makedirs(os.path.join(result_dir, 'png'))
    os.makedirs(os.path.join(result_dir, 'numpy'))


# net, optim, st_epoch = load(ckpt_dir=ckpt_dir, net=net, optim=optim)

with torch.no_grad():
      net.eval()
      loss_arr = []

      for batch, data in enumerate(loader_test, 1):
          # forward pass
          label = data['label'].to(device)
          input = data['input'].to(device)

          output = net(input)

          # 손실함수 계산하기
          loss = fn_loss(output, label)
          _, output = torch.max(output, 1)

          loss_arr += [loss.item()]

          print("TEST: BATCH %04d / %04d | LOSS %.4f" %
                (batch, num_batch_test, np.mean(loss_arr)))

          # Tensorboard 저장하기

          label = fn_tonumpy(label)
          label = np.argmax(label, axis=-1)
          label = np.expand_dims(label, axis=-1)
          input = fn_tonumpy(fn_denorm(input, mean=0.5, std=0.5))
          output = torch.unsqueeze(output, axis = -3 )
          output = fn_tonumpy_output(output)

          # 테스트 결과 저장하기
          for j in range(label.shape[0]):
              id = num_batch_test * (batch - 1) + j

              plt.imsave(os.path.join(result_dir, 'png', 'label_%04d.png' % id), label[j].squeeze(), cmap='gray')
              plt.imsave(os.path.join(result_dir, 'png', 'input_%04d.png' % id), input[j].squeeze(), cmap='gray')
              plt.imsave(os.path.join(result_dir, 'png', 'output_%04d.png' % id), output[j].squeeze(), cmap='gray')

              np.save(os.path.join(result_dir, 'numpy', 'label_%04d.npy' % id), label[j].squeeze())
              np.save(os.path.join(result_dir, 'numpy', 'input_%04d.npy' % id), input[j].squeeze())
              np.save(os.path.join(result_dir, 'numpy', 'output_%04d.npy' % id), output[j].squeeze())

print("AVERAGE TEST: BATCH %04d / %04d | LOSS %.4f" %
        (batch, num_batch_test, np.mean(loss_arr)))

In [None]:
##
lst_data = os.listdir(os.path.join(result_dir, 'numpy'))

lst_label = [f for f in lst_data if f.startswith('label')]
lst_input = [f for f in lst_data if f.startswith('input')]
lst_output = [f for f in lst_data if f.startswith('output')]

lst_label.sort()
lst_input.sort()
lst_output.sort()

##
id = 2

label = np.load(os.path.join(result_dir,"numpy", lst_label[id]))
input = np.load(os.path.join(result_dir,"numpy", lst_input[id]))
output = np.load(os.path.join(result_dir,"numpy", lst_output[id]))

## 플롯 그리기
plt.figure(figsize=(8,6))
plt.subplot(131)
plt.imshow(input)
plt.title('input')

plt.subplot(132)
plt.imshow(label)
plt.title('label')

plt.subplot(133)
plt.imshow(output)
print(output.min())
plt.title('output')

plt.show()


In [11]:
! pip freeze > requirements.txt