In [2]:
from model.UNet import UNet
from model.DeepLabV3Plus import DeepLabV3Plus
from model.HRNetV2 import HRNetV2

import os

from tqdm import tqdm
from IPython.display import clear_output

import numpy as np
import torch
import torch.nn as nn
from torchvision import transforms
import torchvision.transforms.functional as TF
from torch.utils.data import Dataset, DataLoader

import ssl
ssl._create_default_https_context = ssl._create_unverified_context

In [4]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(device)

cuda


In [5]:
import cv2

import random

class SegmentationDataset(Dataset):
  def __init__(self, data_type, mode):    
    self.mode = mode
    self.input, self.label = [], []

    # 작업할 폴더의 경로
    path = './dataset/{0}s'.format(data_type) 

    self.input_path = '{0}/{1}/{2}'.format(path, self.mode, "data")
    self.label_path = '{0}/{1}/{2}'.format(path, self.mode, "label")

    # 폴더 안에 있는 데이터 파일들의 이름을 추출
    data_names = [name.split('.')[0] for name in os.listdir(self.input_path)]

    # 데이터 전처리
    for data_name in data_names:
        input_path = '{0}/{1}.png'.format(self.input_path, data_name)
        label_path = '{0}/{1}.png'.format(self.label_path, data_name)

        input = cv2.cvtColor(cv2.imread(input_path), cv2.COLOR_BGR2RGB)
        label = cv2.imread(label_path, cv2.IMREAD_GRAYSCALE)

        input = torch.FloatTensor(input).permute(2, 0, 1)
        label = torch.LongTensor(label)

        self.input.append(input)
        self.label.append(label)

  def __len__(self):
    return len(self.input)

  def __getitem__(self, index):
    input = self.input[index]
    label = self.label[index]

    i, j, h, w = transforms.RandomCrop.get_params(input, output_size=(512, 512))

    input = TF.crop(input, i, j, h, w)
    label = TF.crop(label, i, j, h, w)

    # Random horizontal flipping
    if random.random() > 0.5:
        input = TF.hflip(input)
        label = TF.hflip(label)

    # Random vertical flipping
    if random.random() > 0.5:
        input = TF.vflip(input)
        label = TF.vflip(label)

    return input, label

In [5]:
train_dataset   = SegmentationDataset('building', 'train')
test_dataset    = SegmentationDataset('building', 'test')

#train_dataset   = SegmentationDataset('building', 'train')
#test_dataset    = SegmentationDataset('building', 'test')

In [6]:
from sklearn.metrics import accuracy_score, f1_score

def evaluate(model, dataloader):
  
  model.eval()

  predvs, labels = [], []

  with torch.no_grad():
    for batch in tqdm(dataloader, desc="Test Processed"):
        input, label = batch

        input = input.to(device)

        hypothesis = model(input)

        predv = torch.argmax(hypothesis, dim=1).cpu()

        predvs.extend(predv.numpy().ravel())
        labels.extend(label.numpy().ravel())

    print("Accuracy - {0:f}\n".format(accuracy_score(labels, predvs)))
    print("F1 Score - {0:f}\n".format(f1_score(labels, predvs)))

In [14]:
trained_model_path = "./trained"

def train(model, dataset, num_epoch = 10, lr=0.001):
    dataloader = DataLoader(dataset, batch_size=4, shuffle=True) 

    loss_func = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    for epoch in range(num_epoch):
        print("# | Epoch - {0:03d} / {1:03d} | #".format(epoch + 1, num_epoch))
        print("=========================")
        
        model.train()
        
        costs = []

        for batch in tqdm(dataloader, desc="Batch Processed"):
            input, label = batch

            input = input.to(device)
            label = label.to(device)

            optimizer.zero_grad()

            hypothesis = model(input)
            

            cost = loss_func(hypothesis, label)

            cost.backward()
            optimizer.step()

            costs.append(cost.item())

        print("Current Average Loss - {0:f}".format(np.mean(costs)))

        if (epoch + 1) % 1 == 0:
            path = "{0}/{1}/epoch_{2:05d}.pt".format(trained_model_path, model.__class__.__name__, epoch + 1)
            
            torch.save(model.state_dict(), path)

            print("모델 파일({0}) 저장됨\n".format(path))
        
        #evaluate(model, dataloader)

In [11]:
def load_trained_model(model : nn.Module, epoch):
    model.load_state_dict(torch.load('{0}/{1}/epoch_{2:05d}.pt'.format(trained_model_path, model.__class__.__name__, epoch)))

In [None]:
num_epoch = 100

#model1 = UNet().to(device)
#train(model1, train_dataset, num_epoch)

#model2 = DeepLabV3Plus(num_classes=2).to(device)
#load_trained_model(model2, 20)

#train(model2, train_dataset, num_epoch)

model3 = HRNetV2(num_classes=2).to(device)
train(model3, train_dataset, num_epoch)

In [None]:
#cv2.imshow("predicted", torch.argmax(hypothesis, dim=1).cpu().detach().numpy()[0,:,:].astype(np.uint8) * 255)
#cv2.imshow("label", label.cpu().detach().numpy()[0,:,:].astype(np.uint8) * 255)

#cv2.waitKey(0)
#cv2.destroyAllWindows()