<a href="https://colab.research.google.com/github/chasubeen/5th-Advanced/blob/1%EC%A3%BC%EC%B0%A8/MTAN_Multi_Task_Attention_Network.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **1. 이미지 간 회귀 작업**
- 일대다 예측(One-to-many)

## **1-1. 사용자 정의 모듈**

### **a) 데이터 생성**
- 데이터 생성 과정

In [None]:
## import Libraries

import os
import fnmatch
import numpy as np
import random

import torch
import torch.nn.functional as F
from torch.utils.data.dataset import Dataset

In [None]:
# 입력 이미지 데이터를 랜덤한 크기로 조절하고 보간하여 반환하는 클래스

class RandomScaleCrop(object):
  ## 초기화를 위한 함수
  def __init__(self, scale=[1.0, 1.2, 1.5]):
    self.scale = scale # 이미지 크기를 랜덤으로 조절하기 위해 스케일 리스트를 초기화

  ## 콜백 함수
  # 객체를 호출할 때 실행될 코드를 정의
  def __call__(self, img, label, depth, normal):
    height, width = img.shape[-2:] # 입력 이미지의 높이와 너비
    sc = self.scale[random.randint(0, len(self.scale) - 1)] # 랜덤한 스케일 선택
    h, w = int(height / sc), int(width / sc) # 높이와 너비를 새로 계산

    # 이미지를 잘라낼 위치를 랜덤하게 선택
    i = random.randint(0, height - h)
    j = random.randint(0, width - w)

    # 이미지 크기를 조절하고 보간(interpolation)
    img_ = F.interpolate(img[None, :, i:i + h, j:j + w], size = (height, width),
                         mode = 'bilinear', align_corners = True).squeeze(0)
    # 라벨 데이터도 동일한 크기로 조절
    label_ = F.interpolate(label[None, None, i:i + h, j:j + w], size = (height, width),
                           mode = 'nearest').squeeze(0).squeeze(0)
    # 깊이(depth) 데이터를 스케일에 맞게 조절
    depth_ = F.interpolate(depth[None, :, i:i + h, j:j + w], size = (height, width), \
                           mode = 'nearest').squeeze(0) / sc

    # 노멀(normal) 데이터도 크기를 조절하고 보간
    normal_ = F.interpolate(normal[None, :, i:i + h, j:j + w], size = (height, width),
                            mode = 'bilinear', align_corners = True).squeeze(0)

    # 조절된 이미지, 라벨, 깊이, 노멀 데이터를 반환
    return img_, label_, depth_, normal_

In [None]:
### NYUv2 데이터셋 가공

class NYUv2(Dataset):
  """
  원본 논문에서는 데이터 증강(data augmentation)을 적용하지 않았다고 하지만, 우리는 NYUv2 데이터셋의 성능을 더 향상시키기 위해 다음과 같은 데이터 증강을 사용할 수 있다.
  이 데이터 증강 기법은 아래 논문들에서 정의되어 있습니다:

  1. 랜덤 스케일(Random Scale): 선택된 비율(1.0, 1.2, 1.5 중에서 랜덤하게 선택)로 이미지의 크기를 조절합니다.
  2. 랜덤 수평 뒤집기(Random Horizontal Flip): 이미지를 수평 방향으로 랜덤하게 뒤집습니다.

  이러한 데이터 증강 기법을 적용하면 모델의 성능을 향상시킬 수 있을 것으로 예상됩니다.
  """

  def __init__(self, root, train = True, augmentation = False):
    self.train = train
    self.root = os.path.expanduser(root)
    self.augmentation = augmentation

    # 데이터 파일 읽기
    if train:
        self.data_path = root + '/train'
    else:
        self.data_path = root + '/val'

    # 데이터 길이 계산
    self.data_len = len(fnmatch.filter(os.listdir(self.data_path + '/image'), '*.npy'))

  def __getitem__(self, index):
    ## 미리 처리된 npy 파일에서 데이터 로드
    image = torch.from_numpy(np.moveaxis(np.load(self.data_path + '/image/{:d}.npy'.format(index)), -1, 0)) # 이미지
    semantic = torch.from_numpy(np.load(self.data_path + '/label/{:d}.npy'.format(index))) # 시맨틱(객체 식별 정보)
    depth = torch.from_numpy(np.moveaxis(np.load(self.data_path + '/depth/{:d}.npy'.format(index)), -1, 0)) # 깊이
    normal = torch.from_numpy(np.moveaxis(np.load(self.data_path + '/normal/{:d}.npy'.format(index)), -1, 0)) # 표면의 방향을 나타내는 벡터

    ## 데이터 증강 적용 여부 확인
    if self.augmentation:
      # 랜덤 스케일과 크롭을 적용
      image, semantic, depth, normal = RandomScaleCrop()(image, semantic, depth, normal)

      # 50% 확률로 이미지를 수평으로 뒤집기
      if torch.rand(1) < 0.5:
        image = torch.flip(image, dims = [2])
        semantic = torch.flip(semantic, dims = [1])
        depth = torch.flip(depth, dims = [2])
        normal = torch.flip(normal, dims = [2])
        normal[0, :, :] = -normal[0, :, :]  # 노멀 벡터의 방향을 역전

    return image.float(), semantic.float(), depth.float(), normal.float()

  def __len__(self):
    return self.data_len

### **b) utils**
- task metrics, 손실함수, trainer 정의

In [None]:
import torch
import torch.nn.functional as F
import numpy as np

In [None]:
### 모델 파라미터 개수를 세는 함수
def count_parameters(model):
  return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [None]:
### 모델의 예측과 정답 간의 손실을 계산하는 함수
def model_fit(x_pred, x_output, task_type):
  device = x_pred.device # 장치

  ## 픽셀 공간을 마스킹하는 이진 mask
  binary_mask = (torch.sum(x_output, dim = 1) != 0).float().unsqueeze(1).to(device)

  ## 각 작업의 특징에 따라 다른 손실 함수 적용
  if task_type == 'semantic':
    # semantic loss: depth-wise cross entropy
    loss = F.nll_loss(x_pred, x_output, ignore_index = -1) # Negative Log Likelihood
  if task_type == 'depth':
    # depth loss: L1 norm
    loss = torch.sum(torch.abs(x_pred - x_output) * binary_mask) / torch.nonzero(binary_mask, as_tuple = False).size(0)
  if task_type == 'normal':
    # normal loss: dot product
    loss = 1 - torch.sum((x_pred * x_output) * binary_mask) / torch.nonzero(binary_mask, as_tuple = False).size(0)
  return loss

In [None]:
### 모델의 분류 결과와 실제 레이블 간의 혼동 행렬(Confusion Matrix)을 계산하고 관련된 분류 메트릭을 계산
class ConfMatrix(object):
  ## 클래스를 초기화하는 생성자
  def __init__(self, num_classes):
    self.num_classes = num_classes
    self.mat = None

  ## 모델의 예측 (pred)과 실제 레이블 (target)을 입력으로 받아 혼동 행렬을 업데이트
  def update(self, pred, target):
    n = self.num_classes
    if self.mat is None:
      self.mat = torch.zeros((n, n), dtype=torch.int64, device = pred.device)
    with torch.no_grad():
      # 정상 범위 내의 레이블과 예측을 선택
      k = (target >= 0) & (target < n)
      # 인덱스 계산
      inds = n * target[k].to(torch.int64) + pred[k]
      # 혼동 행렬 업데이트
      self.mat += torch.bincount(inds, minlength = n ** 2).reshape(n, n)

  ## 업데이트된 혼동 행렬을 기반으로 분류 메트릭을 계산
  def get_metrics(self):
    h = self.mat.float()
    acc = torch.diag(h).sum() / h.sum() # 정확도
    iu = torch.diag(h) / (h.sum(1) + h.sum(0) - torch.diag(h)) # IoU (Intersection over Union)
    return torch.mean(iu).item(), acc.item()

In [None]:
### 깊이(depth) 예측 모델의 오차를 계산하는 함수
def depth_error(x_pred, x_output):
  device = x_pred.device
  ## 정의되지 않은 영역을 이진 마스크로 설정
  binary_mask = (torch.sum(x_output, dim=1) != 0).unsqueeze(1).to(device)

  ## 이진 마스크를 사용하여 정의된 영역에서 예측 값과 실제 값 추출
  x_pred_true = x_pred.masked_select(binary_mask)
  x_output_true = x_output.masked_select(binary_mask)

  ## 절대 오차 및 상대 오차 계산
  abs_err = torch.abs(x_pred_true - x_output_true)
  rel_err = torch.abs(x_pred_true - x_output_true) / x_output_true

  ## 평균 오차 반환
  return (torch.sum(abs_err) / torch.nonzero(binary_mask, as_tuple=False).size(0)).item(), \
          (torch.sum(rel_err) / torch.nonzero(binary_mask, as_tuple=False).size(0)).item()

In [None]:
### 표면 normal 예측 모델의 오차를 계산하는 함수
def normal_error(x_pred, x_output):
  binary_mask = (torch.sum(x_output, dim = 1) != 0)
  error = torch.acos(torch.clamp(torch.sum(x_pred * x_output, 1).masked_select(binary_mask), -1, 1)).detach().cpu().numpy()
  error = np.degrees(error)
  return np.mean(error), np.median(error), np.mean(error < 11.25), np.mean(error < 22.5), np.mean(error < 30)

In [None]:
### Multi-task모델을 훈련하는 함수
def multi_task_trainer(train_loader, test_loader, multi_task_model, device, optimizer, scheduler, opt, total_epoch=200):
  train_batch = len(train_loader)
  test_batch = len(test_loader)
  avg_cost = np.zeros([total_epoch, 24], dtype = np.float32)
  lambda_weight = np.ones([3, total_epoch])
  for index in range(total_epoch):
    cost = np.zeros(24, dtype=np.float32) # 비용

    ## Dynamic Weight Average(DWA) 적용
    # 각 작업의 손실 변화율을 고려하여 시간에 따라 작업 가중치를 평균화하는 방법
    if opt.weight == 'dwa':
      if index == 0 or index == 1:
        lambda_weight[:, index] = 1.0
      else:
        w_1 = avg_cost[index - 1, 0] / avg_cost[index - 2, 0]
        w_2 = avg_cost[index - 1, 3] / avg_cost[index - 2, 3]
        w_3 = avg_cost[index - 1, 6] / avg_cost[index - 2, 6]
        lambda_weight[0, index] = 3 * np.exp(w_1 / T) / (np.exp(w_1 / T) + np.exp(w_2 / T) + np.exp(w_3 / T))
        lambda_weight[1, index] = 3 * np.exp(w_2 / T) / (np.exp(w_1 / T) + np.exp(w_2 / T) + np.exp(w_3 / T))
        lambda_weight[2, index] = 3 * np.exp(w_3 / T) / (np.exp(w_1 / T) + np.exp(w_2 / T) + np.exp(w_3 / T))

    ## 모든 batch에 대해 반복
    # 학습
    multi_task_model.train()
    train_dataset = iter(train_loader)
    # 오차 행렬
    conf_mat = ConfMatrix(multi_task_model.class_nb)

    for k in range(train_batch):
      train_data, train_label, train_depth, train_normal = next(train_dataset)
      train_data, train_label = train_data.to(device), train_label.long().to(device)
      train_depth, train_normal = train_depth.to(device), train_normal.to(device)

      train_pred, logsigma = multi_task_model(train_data)

      optimizer.zero_grad()

      train_loss = [model_fit(train_pred[0], train_label, 'semantic'),
                    model_fit(train_pred[1], train_depth, 'depth'),
                    model_fit(train_pred[2], train_normal, 'normal')]

      # 손실 계산
      if opt.weight == 'equal' or opt.weight == 'dwa':
        loss = sum([lambda_weight[i, index] * train_loss[i] for i in range(3)])
      else:
        loss = sum(1 / (2 * torch.exp(logsigma[i])) * train_loss[i] + logsigma[i] / 2 for i in range(3))

      loss.backward()
      optimizer.step()

      # 훈련 이미지의 각 픽셀에서 레이블 예측을 누적
      conf_mat.update(train_pred[0].argmax(1).flatten(), train_label.flatten())

      # 비용 계산
      cost[0] = train_loss[0].item()
      cost[3] = train_loss[1].item()
      cost[4], cost[5] = depth_error(train_pred[1], train_depth)
      cost[6] = train_loss[2].item()
      cost[7], cost[8], cost[9], cost[10], cost[11] = normal_error(train_pred[2], train_normal)
      avg_cost[index, :12] += cost[:12] / train_batch

    # mIoU와 Acc 계산
    avg_cost[index, 1:3] = np.array(conf_mat.get_metrics())

    ## test 데이터에 대한 평가
    multi_task_model.eval()
    conf_mat = ConfMatrix(multi_task_model.class_nb) # 오차 행렬
    with torch.no_grad():  # operations inside don't track history
      test_dataset = iter(test_loader)
      for k in range(test_batch):
        test_data, test_label, test_depth, test_normal = next(test_dataset)
        test_data, test_label = test_data.to(device), test_label.long().to(device)
        test_depth, test_normal = test_depth.to(device), test_normal.to(device)

        test_pred, _ = multi_task_model(test_data)
        test_loss = [model_fit(test_pred[0], test_label, 'semantic'),
                      model_fit(test_pred[1], test_depth, 'depth'),
                      model_fit(test_pred[2], test_normal, 'normal')]

        conf_mat.update(test_pred[0].argmax(1).flatten(), test_label.flatten())

        cost[12] = test_loss[0].item()
        cost[15] = test_loss[1].item()
        cost[16], cost[17] = depth_error(test_pred[1], test_depth)
        cost[18] = test_loss[2].item()
        cost[19], cost[20], cost[21], cost[22], cost[23] = normal_error(test_pred[2], test_normal)
        avg_cost[index, 12:] += cost[12:] / test_batch

      # mIoU와 Acc 계산
      avg_cost[index, 13:15] = np.array(conf_mat.get_metrics())

    scheduler.step()
    print('Epoch: {:04d} | TRAIN: {:.4f} {:.4f} {:.4f} | {:.4f} {:.4f} {:.4f} | {:.4f} {:.4f} {:.4f} {:.4f} {:.4f} {:.4f} ||'
        'TEST: {:.4f} {:.4f} {:.4f} | {:.4f} {:.4f} {:.4f} | {:.4f} {:.4f} {:.4f} {:.4f} {:.4f} {:.4f} '
        .format(index, avg_cost[index, 0], avg_cost[index, 1], avg_cost[index, 2], avg_cost[index, 3],
                avg_cost[index, 4], avg_cost[index, 5], avg_cost[index, 6], avg_cost[index, 7], avg_cost[index, 8],
                avg_cost[index, 9], avg_cost[index, 10], avg_cost[index, 11], avg_cost[index, 12], avg_cost[index, 13],
                avg_cost[index, 14], avg_cost[index, 15], avg_cost[index, 16], avg_cost[index, 17], avg_cost[index, 18],
                avg_cost[index, 19], avg_cost[index, 20], avg_cost[index, 21], avg_cost[index, 22], avg_cost[index, 23]))

In [None]:
### Single-task 모델을 훈련하는 함수
## multi_task_trainer와 비슷하지만 single task에 맞게 설정

def single_task_trainer(train_loader, test_loader, single_task_model, device, optimizer, scheduler, opt, total_epoch=200):
    train_batch = len(train_loader)
    test_batch = len(test_loader)
    avg_cost = np.zeros([total_epoch, 24], dtype=np.float32)
    for index in range(total_epoch):
        cost = np.zeros(24, dtype=np.float32)

        # iteration for all batches
        single_task_model.train()
        train_dataset = iter(train_loader)
        conf_mat = ConfMatrix(single_task_model.class_nb)
        for k in range(train_batch):
            train_data, train_label, train_depth, train_normal = next(train_dataset)
            train_data, train_label = train_data.to(device), train_label.long().to(device)
            train_depth, train_normal = train_depth.to(device), train_normal.to(device)

            train_pred = single_task_model(train_data)
            optimizer.zero_grad()

            if opt.task == 'semantic':
                train_loss = model_fit(train_pred, train_label, opt.task)
                train_loss.backward()
                optimizer.step()

                conf_mat.update(train_pred.argmax(1).flatten(), train_label.flatten())
                cost[0] = train_loss.item()

            if opt.task == 'depth':
                train_loss = model_fit(train_pred, train_depth, opt.task)
                train_loss.backward()
                optimizer.step()
                cost[3] = train_loss.item()
                cost[4], cost[5] = depth_error(train_pred, train_depth)

            if opt.task == 'normal':
                train_loss = model_fit(train_pred, train_normal, opt.task)
                train_loss.backward()
                optimizer.step()
                cost[6] = train_loss.item()
                cost[7], cost[8], cost[9], cost[10], cost[11] = normal_error(train_pred, train_normal)

            avg_cost[index, :12] += cost[:12] / train_batch

        if opt.task == 'semantic':
            avg_cost[index, 1:3] = np.array(conf_mat.get_metrics())

        # evaluating test data
        single_task_model.eval()
        conf_mat = ConfMatrix(single_task_model.class_nb)
        with torch.no_grad():  # operations inside don't track history
            test_dataset = iter(test_loader)
            for k in range(test_batch):
                test_data, test_label, test_depth, test_normal = next(test_dataset)
                test_data, test_label = test_data.to(device),  test_label.long().to(device)
                test_depth, test_normal = test_depth.to(device), test_normal.to(device)

                test_pred = single_task_model(test_data)

                if opt.task == 'semantic':
                    test_loss = model_fit(test_pred, test_label, opt.task)

                    conf_mat.update(test_pred.argmax(1).flatten(), test_label.flatten())
                    cost[12] = test_loss.item()

                if opt.task == 'depth':
                    test_loss = model_fit(test_pred, test_depth, opt.task)
                    cost[15] = test_loss.item()
                    cost[16], cost[17] = depth_error(test_pred, test_depth)

                if opt.task == 'normal':
                    test_loss = model_fit(test_pred, test_normal, opt.task)
                    cost[18] = test_loss.item()
                    cost[19], cost[20], cost[21], cost[22], cost[23] = normal_error(test_pred, test_normal)

                avg_cost[index, 12:] += cost[12:] / test_batch
            if opt.task == 'semantic':
                avg_cost[index, 13:15] = np.array(conf_mat.get_metrics())

        scheduler.step()
        if opt.task == 'semantic':
            print('Epoch: {:04d} | TRAIN: {:.4f} {:.4f} {:.4f} TEST: {:.4f} {:.4f} {:.4f}'
              .format(index, avg_cost[index, 0], avg_cost[index, 1], avg_cost[index, 2], avg_cost[index, 12], avg_cost[index, 13], avg_cost[index, 14]))
        if opt.task == 'depth':
            print('Epoch: {:04d} | TRAIN: {:.4f} {:.4f} {:.4f} TEST: {:.4f} {:.4f} {:.4f}'
              .format(index, avg_cost[index, 3], avg_cost[index, 4], avg_cost[index, 5], avg_cost[index, 15], avg_cost[index, 16], avg_cost[index, 17]))
        if opt.task == 'normal':
            print('Epoch: {:04d} | TRAIN: {:.4f} {:.4f} {:.4f} {:.4f} {:.4f} {:.4f} TEST: {:.4f} {:.4f} {:.4f} {:.4f} {:.4f} {:.4f}'
              .format(index, avg_cost[index, 6], avg_cost[index, 7], avg_cost[index, 8], avg_cost[index, 9], avg_cost[index, 10], avg_cost[index, 11],
                      avg_cost[index, 18], avg_cost[index, 19], avg_cost[index, 20], avg_cost[index, 21], avg_cost[index, 22], avg_cost[index, 23]))

# **1-2. 실험**

In [None]:
## 사용자 정의 모듈 불러오기
# 매번 실행해 주어야 함

from google.colab import files
src = list(files.upload().values())[0]
open('create_dataset.py','wb').write(src)

In [None]:
from google.colab import files
src = list(files.upload().values())[0]
open('utils.py','wb').write(src)

### **a) 단일 작업, 하나의 작업(Single-Task, One Task)**
- 단일 작업 학습을 위한 기본 SegNet

In [None]:
## Import Libraries

import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import argparse

# 사용자 정의 모듈
from create_dataset import *
from utils import *

In [None]:
## 명령행 인수파싱

import argparse
import sys

# 명령행 인수를 리스트로 지정
sys.argv = ['program_name', '--task', 'depth', '--dataroot', 'nyuv2', '--apply_augmentation']

# ArgumentParser 생성
parser = argparse.ArgumentParser(description='Single-task: One Task')

# task 옵션 설정 (기본값은 'semantic')
parser.add_argument('--task', default='semantic', type=str, help='choose task: semantic, depth, normal')

# dataroot 옵션 설정 (기본값은 'nyuv2')
parser.add_argument('--dataroot', default='nyuv2', type=str, help='dataset root')

# apply_augmentation 플래그 설정 (True로 설정하면 데이터 증강이 적용됨)
parser.add_argument('--apply_augmentation', action='store_true', help='toggle to apply data augmentation on NYUv2')

# 명령행 인수 파싱
opt = parser.parse_args()

In [None]:
class SegNet(nn.Module):
    ### 초기화 함수
    def __init__(self):
        super(SegNet, self).__init__()
        ## initialise network parameters: 네트워크 파라미터 초기화
        filter = [64, 128, 256, 512, 512]  # 필터 크기 정의
        self.class_nb = 13  # 클래스 수

        ## define encoder decoder layers: 인코더 및 디코더 레이어 정의
        self.encoder_block = nn.ModuleList([self.conv_layer([3, filter[0]])])  # 인코더 블록 초기화
        self.decoder_block = nn.ModuleList([self.conv_layer([filter[0], filter[0]])])  # 디코더 블록 초기화
        for i in range(4):
            self.encoder_block.append(self.conv_layer([filter[i], filter[i + 1]]))  # 인코더 블록 추가
            self.decoder_block.append(self.conv_layer([filter[i + 1], filter[i]]))  # 디코더 블록 추가

        ## define convolution layer: 컨볼루션 레이어 정의
        self.conv_block_enc = nn.ModuleList([self.conv_layer([filter[0], filter[0]])])  # 인코더 컨볼루션 레이어 초기화
        self.conv_block_dec = nn.ModuleList([self.conv_layer([filter[0], filter[0]])])  # 디코더 컨볼루션 레이어 초기화
        for i in range(4):
            if i == 0:
                self.conv_block_enc.append(self.conv_layer([filter[i + 1], filter[i + 1]]))  # 인코더 컨볼루션 레이어 추가
                self.conv_block_dec.append(self.conv_layer([filter[i], filter[i]]))  # 디코더 컨볼루션 레이어 추가
            else:
                # 두 개의 컨볼루션 레이어를 가진 시퀀셜 블록 추가
                self.conv_block_enc.append(nn.Sequential(self.conv_layer([filter[i + 1], filter[i + 1]]),
                                                         self.conv_layer([filter[i + 1], filter[i + 1]])))
                self.conv_block_dec.append(nn.Sequential(self.conv_layer([filter[i], filter[i]]),
                                                         self.conv_layer([filter[i], filter[i]])))

        ## 예측 레이어 정의
        if opt.task == 'semantic':
            self.pred_task = self.conv_layer([filter[0], self.class_nb], pred=True)  # 클래스별 예측 레이어
        if opt.task == 'depth':
            self.pred_task = self.conv_layer([filter[0], 1], pred=True)  # 깊이 예측 레이어
        if opt.task == 'normal':
            self.pred_task = self.conv_layer([filter[0], 3], pred=True)  # 법선(normal) 벡터 예측 레이어

        ## define pooling and unpooling functions: 풀링 및 언풀링 함수 정의
        self.down_sampling = nn.MaxPool2d(kernel_size=2, stride=2, return_indices=True)  # 다운 샘플링 함수
        self.up_sampling = nn.MaxUnpool2d(kernel_size=2, stride=2)  # 업 샘플링 함수

        ## 네트워크 모듈 초기화
        for m in self.modules():
            if isinstance(m, nn.Conv2d): # 합성곱 층
                nn.init.xavier_normal_(m.weight)  # 컨볼루션 레이어 가중치 초기화
                nn.init.constant_(m.bias, 0)  # 컨볼루션 레이어 편향 초기화
            elif isinstance(m, nn.BatchNorm2d): # 배치 정규화 층
                nn.init.constant_(m.weight, 1)  # 배치 정규화 가중치 초기화
                nn.init.constant_(m.bias, 0)  # 배치 정규화 편향 초기화
            elif isinstance(m, nn.Linear): # 선형 층
                nn.init.xavier_normal_(m.weight)  # 선형 레이어 가중치 초기화
                nn.init.constant_(m.bias, 0)  # 선형 레이어 편향 초기화

    ## 컨볼루션 레이어 생성 함수
    def conv_layer(self, channel, pred=False):
        if not pred:
            conv_block = nn.Sequential(
                nn.Conv2d(in_channels=channel[0], out_channels=channel[1], kernel_size=3, padding=1),  # 컨볼루션 레이어
                nn.BatchNorm2d(num_features=channel[1]),  # 배치 정규화
                nn.ReLU(inplace=True),  # ReLU 활성화 함수
            )
        else:
            conv_block = nn.Sequential(
                nn.Conv2d(in_channels=channel[0], out_channels=channel[0], kernel_size=3, padding=1),
                nn.Conv2d(in_channels=channel[0], out_channels=channel[1], kernel_size=1, padding=0),
            )
        return conv_block

    ### 순방향 전달 함수
    def forward(self, x):
        g_encoder, g_decoder, g_maxpool, g_upsampl, indices = ([0] * 5 for _ in range(5))
        for i in range(5):
            g_encoder[i], g_decoder[-i - 1] = ([0] * 2 for _ in range(2))

        ## 공유 네트워크 정의
        for i in range(5):
            if i == 0:
                g_encoder[i][0] = self.encoder_block[i](x)
                g_encoder[i][1] = self.conv_block_enc[i](g_encoder[i][0])
                g_maxpool[i], indices[i] = self.down_sampling(g_encoder[i][1])
            else:
                g_encoder[i][0] = self.encoder_block[i](g_maxpool[i - 1])
                g_encoder[i][1] = self.conv_block_enc[i](g_encoder[i][0])
                g_maxpool[i], indices[i] = self.down_sampling(g_encoder[i][1])

        for i in range(5):
            if i == 0:
                g_upsampl[i] = self.up_sampling(g_maxpool[-1], indices[-i - 1])
                g_decoder[i][0] = self.decoder_block[-i - 1](g_upsampl[i])
                g_decoder[i][1] = self.conv_block_dec[-i - 1](g_decoder[i][0])
            else:
                g_upsampl[i] = self.up_sampling(g_decoder[i - 1][-1], indices[-i - 1])
                g_decoder[i][0] = self.decoder_block[-i - 1](g_upsampl[i])
                g_decoder[i][1] = self.conv_block_dec[-i - 1](g_decoder[i][0])

        ## 작업 예측 레이어 정의 (semantic, depth, normal)
        if opt.task == 'semantic':
            pred = F.log_softmax(self.pred_task(g_decoder[-1][-1]), dim=1)  # semantic segmentation 예측
        if opt.task == 'depth':
            pred = self.pred_task(g_decoder[-1][-1])  # 깊이 예측
        if opt.task == 'normal':
            pred = self.pred_task(g_decoder[-1][-1])  # 법선 벡터 예측
            pred = pred / torch.norm(pred, p=2, dim=1, keepdim=True)  # 법선 벡터 정규화
        return pred

In [None]:
## 모델, 옵티마이저 및 스케줄러 설정
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")  # GPU 또는 CPU 장치 선택
SegNet = SegNet().to(device)  # SegNet 모델 초기화 및 장치 설정
optimizer = optim.Adam(SegNet.parameters(), lr=1e-4)  # Adam 옵티마이저 초기화
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.5)  # 학습률 스케줄러 설정

# 네트워크의 파라미터 공간 크기 출력
print('Parameter Space: ABS: {:.1f}, REL: {:.4f}'.format(count_parameters(SegNet),
                                                         count_parameters(SegNet) / 24981069))

# 출력 형식 정의
print('LOSS FORMAT: SEMANTIC_LOSS MEAN_IOU PIX_ACC | DEPTH_LOSS ABS_ERR REL_ERR | NORMAL_LOSS MEAN MED <11.25 <22.5 <30')

In [None]:
## 데이터셋 및 데이터로더 설정
dataset_path = os.path.join('/content/drive/MyDrive/Colab Notebooks/Euron 5기/Week 1/im2im_pred', opt.dataroot)  # 데이터셋 경로
if opt.apply_augmentation:
    nyuv2_train_set = NYUv2(root=dataset_path, train=True, augmentation=True)  # 데이터 증강 적용
    print('Applying data augmentation on NYUv2.')
else:
    nyuv2_train_set = NYUv2(root=dataset_path, train=True)  # 표준 훈련 전략
    print('Standard training strategy without data augmentation.')

nyuv2_test_set = NYUv2(root=dataset_path, train=False)  # 테스트 데이터셋

In [None]:
batch_size = 2  # 배치 크기 설정

## 훈련 데이터로더 설정
nyuv2_train_loader = torch.utils.data.DataLoader(
    dataset = nyuv2_train_set,
    batch_size = batch_size,
    shuffle = True)

## 테스트 데이터로더 설정
nyuv2_test_loader = torch.utils.data.DataLoader(
    dataset = nyuv2_test_set,
    batch_size = batch_size,
    shuffle = False)

## Single-task 네트워크 훈련 및 평가
# 200 에폭 동안 훈련
single_task_trainer(nyuv2_train_loader,
                    nyuv2_test_loader,
                    SegNet,
                    device,
                    optimizer,
                    scheduler,
                    opt,
                    200)

- 시간이 오래 걸려서 실행해보지는 x

### **b) 단일 작업, STAN**
- Single-Task Attention Network
- 단일 작업만 수행하면서 제안된 MTAN을 직접 적용한 **단일** 작업 주의 네트워크

In [None]:
### Import Libraries

import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import argparse

from create_dataset import *
from utils import *

In [None]:
### 명령행 인수 파싱
import argparse
import sys

## 명령행 인수를 리스트로 지정
sys.argv = ['program_name', '--task', 'depth', '--dataroot', 'nyuv2', '--apply_augmentation']

## 명령행 인수를 파싱할 파서(parser) 설정
parser = argparse.ArgumentParser(description = 'Single-task: Attention Network')

## add_argument 메서드를 사용하여 프로그램에 전달될 명령행 인수를 정의
# 기본값: 'semantic'
parser.add_argument('--task', default='semantic', type=str, help='choose task: semantic, depth, normal')

## '--dataroot': 기본값은 'nyuv2'
parser.add_argument('--dataroot', default='nyuv2', type=str, help='dataset root')

## '--apply_augmentation': 지정되면 True, 그렇지 않으면 False로 설정
# action='store_true': 인수를 받지 않고 해당 옵션을 사용하면 True로 설정
parser.add_argument('--apply_augmentation', action='store_true', help='toggle to apply data augmentation on NYUv2')

## 명령행 인수를 파싱하고, 그 결과를 opt 객체에 저장
opt = parser.parse_args()

In [None]:
### SegNet 모델 정의

class SegNet(nn.Module):
  ## 신경망 레이어 및 파라미터 초기화
  def __init__(self):
    super(SegNet, self).__init__() # nn.Module 클래스를 상속하여 신경망 아키텍처를 정의
    # 네트워크 매개변수 초기화
    filter = [64, 128, 256, 512, 512] # 인코더와 디코더 레이어의 출력 채널 수
    self.class_nb = 13  # 세그멘테이션 작업에서 구분할 클래스 수

    # 인코더/디코더 레이어 정의
    self.encoder_block = nn.ModuleList([self.conv_layer([3, filter[0]])])
    self.decoder_block = nn.ModuleList([self.conv_layer([filter[0], filter[0]])])
    for i in range(4):
      self.encoder_block.append(self.conv_layer([filter[i], filter[i + 1]]))
      self.decoder_block.append(self.conv_layer([filter[i + 1], filter[i]]))

    self.conv_block_enc = nn.ModuleList([self.conv_layer([filter[0], filter[0]])])
    self.conv_block_dec = nn.ModuleList([self.conv_layer([filter[0], filter[0]])])
    for i in range(4):
      if i == 0:
        self.conv_block_enc.append(self.conv_layer([filter[i + 1], filter[i + 1]]))
        self.conv_block_dec.append(self.conv_layer([filter[i], filter[i]]))
      else:
        self.conv_block_enc.append(nn.Sequential(self.conv_layer([filter[i + 1], filter[i + 1]]),
                                                self.conv_layer([filter[i + 1], filter[i + 1]])))
        self.conv_block_dec.append(nn.Sequential(self.conv_layer([filter[i], filter[i]]),
                                                self.conv_layer([filter[i], filter[i]])))
    # 어텐션(attention) 레이어
    # 입력 데이터의 중요한 부분에 가중치를 부여하여 어텐션 메커니즘을 구현
    self.encoder_att = nn.ModuleList([nn.ModuleList([self.att_layer([filter[0], filter[0], filter[0]])])])
    self.decoder_att = nn.ModuleList([nn.ModuleList([self.att_layer([2 * filter[0], filter[0], filter[0]])])])
    self.encoder_block_att = nn.ModuleList([self.conv_layer([filter[0], filter[1]])])
    self.decoder_block_att = nn.ModuleList([self.conv_layer([filter[0], filter[0]])])

    for j in range(1):
      for i in range(4):
        self.encoder_att[j].append(self.att_layer([2 * filter[i + 1], filter[i + 1], filter[i + 1]]))
        self.decoder_att[j].append(self.att_layer([filter[i + 1] + filter[i], filter[i], filter[i]]))
    for i in range(4):
      if i < 3:
        self.encoder_block_att.append(self.conv_layer([filter[i + 1], filter[i + 2]]))
        self.decoder_block_att.append(self.conv_layer([filter[i + 1], filter[i]]))
      else:
        self.encoder_block_att.append(self.conv_layer([filter[i + 1], filter[i + 1]]))
        self.decoder_block_att.append(self.conv_layer([filter[i + 1], filter[i + 1]]))

    # 최종 출력 레이어
    # 작업 종류(opt.task)에 따라 예측 레이어를 정의
    if opt.task == 'semantic':
      self.pred_task = self.conv_layer([filter[0], self.class_nb], pred=True)
    if opt.task == 'depth':
      self.pred_task = self.conv_layer([filter[0], 1], pred=True)
    if opt.task == 'normal':
      self.pred_task = self.conv_layer([filter[0], 3], pred=True)

    # 풀링 및 언풀링 함수 정의
    self.down_sampling = nn.MaxPool2d(kernel_size=2, stride=2, return_indices=True)
    self.up_sampling = nn.MaxUnpool2d(kernel_size=2, stride=2)

    # 가중치 초기화
    for m in self.modules():
      if isinstance(m, nn.Conv2d):
        nn.init.xavier_normal_(m.weight)
        nn.init.constant_(m.bias, 0)
      elif isinstance(m, nn.BatchNorm2d):
        nn.init.constant_(m.weight, 1)
        nn.init.constant_(m.bias, 0)
      elif isinstance(m, nn.Linear):
        nn.init.xavier_normal_(m.weight)
        nn.init.constant_(m.bias, 0)

    # 컨볼루션 레이어와 배치 정규화 레이어를 포함하는 Sequential 모듈을 정의
    def conv_layer(self, channel, pred=False):
      if not pred:
        conv_block = nn.Sequential(
          # 2D 컨볼루션 레이어를 생성하며, 입력 채널에서 출력 채널로 변환
          nn.Conv2d(in_channels=channel[0], out_channels=channel[1], kernel_size=3, padding=1),
          # 배치 정규화 적용
          nn.BatchNorm2d(num_features=channel[1]),
          # ReLU 활성화 함수를 적용
          nn.ReLU(inplace=True),
        )
      else:
        conv_block = nn.Sequential(
          # 예측 모드인 경우 입력 채널과 출력 채널이 동일한 2D 컨볼루션 레이어 생성
          nn.Conv2d(in_channels=channel[0], out_channels=channel[0], kernel_size=3, padding=1),
          # 추가적인 1x1 컨볼루션 레이어를 사용하여 출력 채널을 변경
          nn.Conv2d(in_channels=channel[0], out_channels=channel[1], kernel_size=1, padding=0),
        )
      return conv_block

      def att_layer(self, channel):
        # 어텐션 레이어를 포함하는 Sequential 모듈을 정의
        att_block = nn.Sequential(
          # 1x1 컨볼루션 레이어를 사용하여 입력 채널에서 출력 채널로 변환
          nn.Conv2d(in_channels=channel[0], out_channels=channel[1], kernel_size=1, padding=0),
          # 배치 정규화를 적용
          nn.BatchNorm2d(channel[1]),
          # ReLU 활성화 함수를 적용
          nn.ReLU(inplace=True),
          # 1x1 컨볼루션 레이어를 사용하여 출력 채널을 다시 변경
          nn.Conv2d(in_channels=channel[1], out_channels=channel[2], kernel_size=1, padding=0),
          # 시그모이드 활성화 함수를 적용
          nn.BatchNorm2d(channel[2]),
          nn.Sigmoid(),
        )
        return att_block

  ## 순전파
  def forward(self, x):
    # 초기 변수 정의
    g_encoder, g_decoder, g_maxpool, g_upsampl, indices = ([0] * 5 for _ in range(5))
    for i in range(5):
      g_encoder[i], g_decoder[-i - 1] = ([0] * 2 for _ in range(2))
    # 두 가지 작업에 대한 주의 목록 정의
    atten_encoder, atten_decoder = ([0] * 3 for _ in range(2))
    for i in range(3):
      atten_encoder[i], atten_decoder[i] = ([0] * 5 for _ in range(2))
    for i in range(3):
      for j in range(5):
        atten_encoder[i][j], atten_decoder[i][j] = ([0] * 3 for _ in range(2))

    # 전역 공유 네트워크 정의
    # 전역 공유 네트워크의 인코더 부분을 정의하고 풀링 연산을 수행
    for i in range(5):
      if i == 0:
        g_encoder[i][0] = self.encoder_block[i](x)
        g_encoder[i][1] = self.conv_block_enc[i](g_encoder[i][0])
        g_maxpool[i], indices[i] = self.down_sampling(g_encoder[i][1])
      else:
        g_encoder[i][0] = self.encoder_block[i](g_maxpool[i - 1])
        g_encoder[i][1] = self.conv_block_enc[i](g_encoder[i][0])
        g_maxpool[i], indices[i] = self.down_sampling(g_encoder[i][1])
    for i in range(5):
      if i == 0:
        g_upsampl[i] = self.up_sampling(g_maxpool[-1], indices[-i - 1])
        g_decoder[i][0] = self.decoder_block[-i - 1](g_upsampl[i])
        g_decoder[i][1] = self.conv_block_dec[-i - 1](g_decoder[i][0])
      else:
        g_upsampl[i] = self.up_sampling(g_decoder[i - 1][-1], indices[-i - 1])
        g_decoder[i][0] = self.decoder_block[-i - 1](g_upsampl[i])
        g_decoder[i][1] = self.conv_block_dec[-i - 1](g_decoder[i][0])

    # 작업 종속 주의 모듈 정의
    # 주의 메커니즘을 사용하여 인코더 및 디코더의 출력을 가중치화하고 주의 적용 결과를 계산
    for i in range(1):
      for j in range(5):
        if j == 0:
          atten_encoder[i][j][0] = self.encoder_att[i][j](g_encoder[j][0])
          atten_encoder[i][j][1] = (atten_encoder[i][j][0]) * g_encoder[j][1]
          atten_encoder[i][j][2] = self.encoder_block_att[j](atten_encoder[i][j][1])
          atten_encoder[i][j][2] = F.max_pool2d(atten_encoder[i][j][2], kernel_size=2, stride=2)
        else:
          atten_encoder[i][j][0] = self.encoder_att[i][j](torch.cat((g_encoder[j][0], atten_encoder[i][j - 1][2]), dim=1))
          atten_encoder[i][j][1] = (atten_encoder[i][j][0]) * g_encoder[j][1]
          atten_encoder[i][j][2] = self.encoder_block_att[j](atten_encoder[i][j][1])
          atten_encoder[i][j][2] = F.max_pool2d(atten_encoder[i][j][2], kernel_size=2, stride=2)

      for j in range(5):
        if j == 0:
          atten_decoder[i][j][0] = F.interpolate(atten_encoder[i][-1][-1], scale_factor=2, mode='bilinear', align_corners=True)
          atten_decoder[i][j][0] = self.decoder_block_att[-j - 1](atten_decoder[i][j][0])
          atten_decoder[i][j][1] = self.decoder_att[i][-j - 1](torch.cat((g_upsampl[j], atten_decoder[i][j][0]), dim=1))
          atten_decoder[i][j][2] = (atten_decoder[i][j][1]) * g_decoder[j][-1]
        else:
          atten_decoder[i][j][0] = F.interpolate(atten_decoder[i][j - 1][2], scale_factor=2, mode='bilinear', align_corners=True)
          atten_decoder[i][j][0] = self.decoder_block_att[-j - 1](atten_decoder[i][j][0])
          atten_decoder[i][j][1] = self.decoder_att[i][-j - 1](torch.cat((g_upsampl[j], atten_decoder[i][j][0]), dim=1))
          atten_decoder[i][j][2] = (atten_decoder[i][j][1]) * g_decoder[j][-1]

    # 작업 예측 레이어 정의
    if opt.task == 'semantic':
      pred = F.log_softmax(self.pred_task(atten_decoder[0][-1][-1]), dim=1)
    if opt.task == 'depth':
      pred = self.pred_task(atten_decoder[0][-1][-1])
    if opt.task == 'normal':
      pred = self.pred_task(atten_decoder[0][-1][-1])
      pred = pred / torch.norm(pred, p=2, dim=1, keepdim=True)
    return pred

- 레이어 넘어갈 때 인덱스 붙이는 게 이해되지 않는다..하하..

In [None]:
### 모델, 옵티마이저 및 스케줄러 정의

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
SegNet_STAN = SegNet().to(device)
optimizer = optim.Adam(SegNet_STAN.parameters(), lr=1e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.5)

# 모델 파라미터 개수 출력
print('Parameter Space: ABS: {:.1f}, REL: {:.4f}'.format(count_parameters(SegNet_STAN),
                                                         count_parameters(SegNet_STAN) / 24981069))
print('LOSS FORMAT: SEMANTIC_LOSS MEAN_IOU PIX_ACC | DEPTH_LOSS ABS_ERR REL_ERR | NORMAL_LOSS MEAN MED <11.25 <22.5 <30')

In [None]:
### 데이터셋 정의
dataset_path = os.path.join('/content/drive/MyDrive/Colab Notebooks/Euron 5기/Week 1/im2im_pred', opt.dataroot)  # 데이터셋 경로

## 데이터 증강 여부에 따라 데이터셋 설정
if opt.apply_augmentation:
    nyuv2_train_set = NYUv2(root=dataset_path, train=True, augmentation=True)
    print('Applying data augmentation on NYUv2.')
else:
    nyuv2_train_set = NYUv2(root=dataset_path, train=True)
    print('Standard training strategy without data augmentation.')

nyuv2_test_set = NYUv2(root=dataset_path, train=False)

In [None]:
batch_size = 2

nyuv2_train_loader = torch.utils.data.DataLoader(
    dataset=nyuv2_train_set,
    batch_size=batch_size,
    shuffle=True)

nyuv2_test_loader = torch.utils.data.DataLoader(
    dataset=nyuv2_test_set,
    batch_size=batch_size,
    shuffle=False)

In [None]:
## Single-task 네트워크 훈련 및 평가
single_task_trainer(nyuv2_train_loader,
                    nyuv2_test_loader,
                    SegNet_STAN,
                    device,
                    optimizer,
                    scheduler,
                    opt,
                    200)

### **c) 다중 작업, 분할(Multi-Task, Split-Wide, Deep)**
- 각 특정 작업에 대한 최종 예측을 위해 마지막 레이어에서 분할하는 표준 다중 작업 학습


In [None]:
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import argparse
import torch.utils.data.sampler as sampler

from create_dataset import *
from utils import *

parser = argparse.ArgumentParser(description='Multi-task: Split')
parser.add_argument('--type', default='standard', type=str, help='split type: standard, wide, deep')
parser.add_argument('--weight', default='equal', type=str, help='multi-task weighting: equal, uncert, dwa')
parser.add_argument('--dataroot', default='nyuv2', type=str, help='dataset root')
parser.add_argument('--temp', default=2.0, type=float, help='temperature for DWA (must be positive)')
parser.add_argument('--apply_augmentation', action='store_true', help='toggle to apply data augmentation on NYUv2')
opt = parser.parse_args()


class SegNet(nn.Module):
    def __init__(self):
        super(SegNet, self).__init__()
        # initialise network parameters
        if opt.type == 'wide':
            filter = [64, 128, 256, 512, 1024]
        else:
            filter = [64, 128, 256, 512, 512]

        self.class_nb = 13

        # define encoder decoder layers
        self.encoder_block = nn.ModuleList([self.conv_layer([3, filter[0]])])
        self.decoder_block = nn.ModuleList([self.conv_layer([filter[0], filter[0]])])
        for i in range(4):
            self.encoder_block.append(self.conv_layer([filter[i], filter[i + 1]]))
            self.decoder_block.append(self.conv_layer([filter[i + 1], filter[i]]))

        # define convolution layer
        self.conv_block_enc = nn.ModuleList([self.conv_layer([filter[0], filter[0]])])
        self.conv_block_dec = nn.ModuleList([self.conv_layer([filter[0], filter[0]])])
        for i in range(4):
            if i == 0:
                self.conv_block_enc.append(self.conv_layer([filter[i + 1], filter[i + 1]]))
                self.conv_block_dec.append(self.conv_layer([filter[i], filter[i]]))
            else:
                self.conv_block_enc.append(nn.Sequential(self.conv_layer([filter[i + 1], filter[i + 1]]),
                                                         self.conv_layer([filter[i + 1], filter[i + 1]])))
                self.conv_block_dec.append(nn.Sequential(self.conv_layer([filter[i], filter[i]]),
                                                         self.conv_layer([filter[i], filter[i]])))

        # define task specific layers
        self.pred_task1 = nn.Sequential(nn.Conv2d(in_channels=filter[0], out_channels=filter[0], kernel_size=3, padding=1),
                                        nn.Conv2d(in_channels=filter[0], out_channels=self.class_nb, kernel_size=1, padding=0))
        self.pred_task2 = nn.Sequential(nn.Conv2d(in_channels=filter[0], out_channels=filter[0], kernel_size=3, padding=1),
                                        nn.Conv2d(in_channels=filter[0], out_channels=1, kernel_size=1, padding=0))
        self.pred_task3 = nn.Sequential(nn.Conv2d(in_channels=filter[0], out_channels=filter[0], kernel_size=3, padding=1),
                                        nn.Conv2d(in_channels=filter[0], out_channels=3, kernel_size=1, padding=0))

        # define pooling and unpooling functions
        self.down_sampling = nn.MaxPool2d(kernel_size=2, stride=2, return_indices=True)
        self.up_sampling = nn.MaxUnpool2d(kernel_size=2, stride=2)

        self.logsigma = nn.Parameter(torch.FloatTensor([-0.5, -0.5, -0.5]))

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.xavier_normal_(m.weight)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_normal_(m.weight)
                nn.init.constant_(m.bias, 0)

    # define convolutional block
    def conv_layer(self, channel):
        if opt.type == 'deep':
            conv_block = nn.Sequential(
                nn.Conv2d(in_channels=channel[0], out_channels=channel[1], kernel_size=3, padding=1),
                nn.BatchNorm2d(num_features=channel[1]),
                nn.ReLU(inplace=True),
                nn.Conv2d(in_channels=channel[1], out_channels=channel[1], kernel_size=3, padding=1),
                nn.BatchNorm2d(num_features=channel[1]),
                nn.ReLU(inplace=True),
            )
        else:
            conv_block = nn.Sequential(
                nn.Conv2d(in_channels=channel[0], out_channels=channel[1], kernel_size=3, padding=1),
                nn.BatchNorm2d(num_features=channel[1]),
                nn.ReLU(inplace=True)
            )
        return conv_block

    def forward(self, x):
        g_encoder, g_decoder, g_maxpool, g_upsampl, indices = ([0] * 5 for _ in range(5))
        for i in range(5):
            g_encoder[i], g_decoder[-i - 1] = ([0] * 2 for _ in range(2))

        # global shared encoder-decoder network
        for i in range(5):
            if i == 0:
                g_encoder[i][0] = self.encoder_block[i](x)
                g_encoder[i][1] = self.conv_block_enc[i](g_encoder[i][0])
                g_maxpool[i], indices[i] = self.down_sampling(g_encoder[i][1])
            else:
                g_encoder[i][0] = self.encoder_block[i](g_maxpool[i - 1])
                g_encoder[i][1] = self.conv_block_enc[i](g_encoder[i][0])
                g_maxpool[i], indices[i] = self.down_sampling(g_encoder[i][1])

        for i in range(5):
            if i == 0:
                g_upsampl[i] = self.up_sampling(g_maxpool[-1], indices[-i - 1])
                g_decoder[i][0] = self.decoder_block[-i - 1](g_upsampl[i])
                g_decoder[i][1] = self.conv_block_dec[-i - 1](g_decoder[i][0])
            else:
                g_upsampl[i] = self.up_sampling(g_decoder[i - 1][-1], indices[-i - 1])
                g_decoder[i][0] = self.decoder_block[-i - 1](g_upsampl[i])
                g_decoder[i][1] = self.conv_block_dec[-i - 1](g_decoder[i][0])

        # define task prediction layers
        t1_pred = F.log_softmax(self.pred_task1(g_decoder[i][1]), dim=1)
        t2_pred = self.pred_task2(g_decoder[i][1])
        t3_pred = self.pred_task3(g_decoder[i][1])
        t3_pred = t3_pred / torch.norm(t3_pred, p=2, dim=1, keepdim=True)

        return [t1_pred, t2_pred, t3_pred], self.logsigma


# define model, optimiser and scheduler
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
SegNet_SPLIT = SegNet().to(device)
optimizer = optim.Adam(SegNet_SPLIT.parameters(), lr=1e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.5)

print('Parameter Space: ABS: {:.1f}, REL: {:.4f}'.format(count_parameters(SegNet_SPLIT),
                                                         count_parameters(SegNet_SPLIT) / 24981069))
print('LOSS FORMAT: SEMANTIC_LOSS MEAN_IOU PIX_ACC | DEPTH_LOSS ABS_ERR REL_ERR | NORMAL_LOSS MEAN MED <11.25 <22.5 <30')

# define dataset
dataset_path = opt.dataroot
if opt.apply_augmentation:
    nyuv2_train_set = NYUv2(root=dataset_path, train=True, augmentation=True)
    print('Applying data augmentation on NYUv2.')
else:
    nyuv2_train_set = NYUv2(root=dataset_path, train=True)
    print('Standard training strategy without data augmentation.')

nyuv2_test_set = NYUv2(root=dataset_path, train=False)

batch_size = 2
nyuv2_train_loader = torch.utils.data.DataLoader(
    dataset=nyuv2_train_set,
    batch_size=batch_size,
    shuffle=True)

nyuv2_test_loader = torch.utils.data.DataLoader(
    dataset=nyuv2_test_set,
    batch_size=batch_size,
    shuffle=False)

# Train and evaluate multi-task network
multi_task_trainer(nyuv2_train_loader,
                   nyuv2_test_loader,
                   SegNet_SPLIT,
                   device,
                   optimizer,
                   scheduler,
                   opt,
                   200)

- 정리 x

### **d) 다중 작업, 밀집(Multi-Task, Dense)**

In [None]:
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import argparse
import torch.utils.data.sampler as sampler

from create_dataset import *
from utils import *

parser = argparse.ArgumentParser(description='Multi-task: Dense')
parser.add_argument('--weight', default='equal', type=str, help='multi-task weighting: equal, uncert, dwa')
parser.add_argument('--dataroot', default='nyuv2', type=str, help='dataset root')
parser.add_argument('--temp', default=2.0, type=float, help='temperature for DWA (must be positive)')
parser.add_argument('--apply_augmentation', action='store_true', help='toggle to apply data augmentation on NYUv2')
opt = parser.parse_args()


class SegNet(nn.Module):
    def __init__(self):
        super(SegNet, self).__init__()
        # initialise network parameters
        filter = [64, 128, 256, 512, 512]
        self.class_nb = 13

        # define encoder decoder layers
        self.encoder_block = nn.ModuleList([self.conv_layer([3, filter[0], filter[0]], bottle_neck=True)])
        self.decoder_block = nn.ModuleList([self.conv_layer([filter[0], filter[0], self.class_nb], bottle_neck=True)])

        self.encoder_block_t = nn.ModuleList([nn.ModuleList([self.conv_layer([3, filter[0], filter[0]], bottle_neck=True)])])
        self.decoder_block_t = nn.ModuleList([nn.ModuleList([self.conv_layer([2 * filter[0], 2 * filter[0], filter[0]], bottle_neck=True)])])

        for i in range(4):
            if i == 0:
                self.encoder_block.append(self.conv_layer([filter[i], filter[i + 1], filter[i + 1]], bottle_neck=True))
                self.decoder_block.append(self.conv_layer([filter[i + 1], filter[i], filter[i]], bottle_neck=True))
            else:
                self.encoder_block.append(self.conv_layer([filter[i], filter[i + 1], filter[i + 1]], bottle_neck=False))
                self.decoder_block.append(self.conv_layer([filter[i + 1], filter[i], filter[i]], bottle_neck=False))

        for j in range(3):
            if j < 2:
                self.encoder_block_t.append(nn.ModuleList([self.conv_layer([3, filter[0], filter[0]], bottle_neck=True)]))
                self.decoder_block_t.append(nn.ModuleList([self.conv_layer([2 * filter[0], 2 * filter[0], filter[0]], bottle_neck=True)]))
            for i in range(4):
                if i == 0:
                    self.encoder_block_t[j].append(self.conv_layer([2 * filter[i], filter[i + 1], filter[i + 1]], bottle_neck=True))
                    self.decoder_block_t[j].append(self.conv_layer([2 * filter[i + 1], filter[i], filter[i]], bottle_neck=True))
                else:
                    self.encoder_block_t[j].append(self.conv_layer([2 * filter[i], filter[i + 1], filter[i + 1]], bottle_neck=False))
                    self.decoder_block_t[j].append(self.conv_layer([2 * filter[i + 1], filter[i], filter[i]], bottle_neck=False))

        # define pooling and unpooling functions
        self.down_sampling = nn.MaxPool2d(kernel_size=2, stride=2, return_indices=True)
        self.up_sampling = nn.MaxUnpool2d(kernel_size=2, stride=2)

        self.pred_task1 = self.conv_layer([filter[0], self.class_nb], bottle_neck=True, pred_layer=True)
        self.pred_task2 = self.conv_layer([filter[0], 1], bottle_neck=True, pred_layer=True)
        self.pred_task3 = self.conv_layer([filter[0], 3], bottle_neck=True, pred_layer=True)

        self.logsigma = nn.Parameter(torch.FloatTensor([-0.5, -0.5, -0.5]))

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.xavier_uniform_(m.weight)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                nn.init.constant_(m.bias, 0)

    def conv_layer(self, channel, bottle_neck, pred_layer=False):
        if bottle_neck:
            if not pred_layer:
                conv_block = nn.Sequential(
                    nn.Conv2d(in_channels=channel[0], out_channels=channel[1], kernel_size=3, padding=1),
                    nn.BatchNorm2d(channel[1]),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(in_channels=channel[1], out_channels=channel[2], kernel_size=3, padding=1),
                    nn.BatchNorm2d(channel[2]),
                    nn.ReLU(inplace=True),
                )
            else:
                conv_block = nn.Sequential(
                    nn.Conv2d(in_channels=channel[0], out_channels=channel[0], kernel_size=3, padding=1),
                    nn.Conv2d(in_channels=channel[0], out_channels=channel[1], kernel_size=1, padding=0),
                )

        else:
            conv_block = nn.Sequential(
                nn.Conv2d(in_channels=channel[0], out_channels=channel[1], kernel_size=3, padding=1),
                nn.BatchNorm2d(channel[1]),
                nn.ReLU(inplace=True),
                nn.Conv2d(in_channels=channel[1], out_channels=channel[1], kernel_size=3, padding=1),
                nn.BatchNorm2d(channel[1]),
                nn.ReLU(inplace=True),
                nn.Conv2d(in_channels=channel[1], out_channels=channel[2], kernel_size=3, padding=1),
                nn.BatchNorm2d(channel[2]),
                nn.ReLU(inplace=True),
            )

        return conv_block

    def forward(self, x):
        encoder_conv, decoder_conv, encoder_samp, decoder_samp, indices = ([0] * 5 for _ in range(5))
        encoder_conv_t, decoder_conv_t, encoder_samp_t, decoder_samp_t, indices_t = ([0] * 3 for _ in range(5))
        for i in range(3):
            encoder_conv_t[i], decoder_conv_t[i], encoder_samp_t[i], decoder_samp_t[i], indices_t[i] = ([0] * 5 for _ in range(5))

        # global shared encoder-decoder network
        for i in range(5):
            if i == 0:
                encoder_conv[i] = self.encoder_block[i](x)
                encoder_samp[i], indices[i] = self.down_sampling(encoder_conv[i])
            else:
                encoder_conv[i] = self.encoder_block[i](encoder_samp[i - 1])
                encoder_samp[i], indices[i] = self.down_sampling(encoder_conv[i])

        for i in range(5):
            if i == 0:
                decoder_samp[i] = self.up_sampling(encoder_samp[-1], indices[-1])
                decoder_conv[i] = self.decoder_block[-i - 1](decoder_samp[i])
            else:
                decoder_samp[i] = self.up_sampling(decoder_conv[i - 1], indices[-i - 1])
                decoder_conv[i] = self.decoder_block[-i - 1](decoder_samp[i])

        # define task prediction layers
        for j in range(3):
            for i in range(5):
                if i == 0:
                    encoder_conv_t[j][i] = self.encoder_block_t[j][i](x)
                    encoder_samp_t[j][i], indices_t[j][i] = self.down_sampling(encoder_conv_t[j][i])
                else:
                    encoder_conv_t[j][i] = self.encoder_block_t[j][i](torch.cat((encoder_samp_t[j][i - 1], encoder_samp[i - 1]), dim=1))
                    encoder_samp_t[j][i], indices_t[j][i] = self.down_sampling(encoder_conv_t[j][i])

            for i in range(5):
                if i == 0:
                    decoder_samp_t[j][i] = self.up_sampling(encoder_samp_t[j][-1], indices_t[j][-1])
                    decoder_conv_t[j][i] = self.decoder_block_t[j][-i - 1](torch.cat((decoder_samp_t[j][i], decoder_samp[i]), dim=1))
                else:
                    decoder_samp_t[j][i] = self.up_sampling(decoder_conv_t[j][i - 1], indices_t[j][-i - 1])
                    decoder_conv_t[j][i] = self.decoder_block_t[j][-i - 1](torch.cat((decoder_samp_t[j][i], decoder_samp[i]), dim=1))

        t1_pred = F.log_softmax(self.pred_task1(decoder_conv_t[0][-1]), dim=1)
        t2_pred = self.pred_task2(decoder_conv_t[1][-1])
        t3_pred = self.pred_task3(decoder_conv_t[2][-1])
        t3_pred = t3_pred / torch.norm(t3_pred, p=2, dim=1, keepdim=True)

        return [t1_pred, t2_pred, t3_pred], self.logsigma


# define model, optimiser and scheduler
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
SegNet_DENSE = SegNet().to(device)
optimizer = optim.Adam(SegNet_DENSE.parameters(), lr=1e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.5)

print('Parameter Space: ABS: {:.1f}, REL: {:.4f}'.format(count_parameters(SegNet_DENSE),
                                                         count_parameters(SegNet_DENSE) / 24981069))
print('LOSS FORMAT: SEMANTIC_LOSS MEAN_IOU PIX_ACC | DEPTH_LOSS ABS_ERR REL_ERR | NORMAL_LOSS MEAN MED <11.25 <22.5 <30')

# define dataset
dataset_path = opt.dataroot
if opt.apply_augmentation:
    nyuv2_train_set = NYUv2(root=dataset_path, train=True, augmentation=True)
    print('Applying data augmentation on NYUv2.')
else:
    nyuv2_train_set = NYUv2(root=dataset_path, train=True)
    print('Standard training strategy without data augmentation.')

nyuv2_test_set = NYUv2(root=dataset_path, train=False)

batch_size = 2
nyuv2_train_loader = torch.utils.data.DataLoader(
    dataset=nyuv2_train_set,
    batch_size=batch_size,
    shuffle=True)

nyuv2_test_loader = torch.utils.data.DataLoader(
    dataset=nyuv2_test_set,
    batch_size=batch_size,
    shuffle=False)

# Train and evaluate multi-task network
multi_task_trainer(nyuv2_train_loader,
                   nyuv2_test_loader,
                   SegNet_DENSE,
                   device,
                   optimizer,
                   scheduler,
                   opt,
                   200)

- 정리 x

### **e) 다중 작업, 크로스 스티치(Multi-Task, Cross-stitch)**

In [None]:
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import argparse
import torch.utils.data.sampler as sampler

from create_dataset import *
from utils import *

parser = argparse.ArgumentParser(description='Multi-task: Cross')
parser.add_argument('--weight', default='equal', type=str, help='multi-task weighting: equal, uncert, dwa')
parser.add_argument('--dataroot', default='nyuv2', type=str, help='dataset root')
parser.add_argument('--temp', default=2.0, type=float, help='temperature for DWA (must be positive)')
parser.add_argument('--apply_augmentation', action='store_true', help='toggle to apply data augmentation on NYUv2')
opt = parser.parse_args()


class SegNet(nn.Module):
    def __init__(self):
        super(SegNet, self).__init__()
        # initialise network parameters
        filter = [64, 128, 256, 512, 512]
        self.class_nb = 13

        # define encoder decoder layers
        self.encoder_block_t = nn.ModuleList([nn.ModuleList([self.conv_layer([3, filter[0], filter[0]], bottle_neck=True)])])
        self.decoder_block_t = nn.ModuleList([nn.ModuleList([self.conv_layer([filter[0], filter[0], filter[0]], bottle_neck=True)])])

        for j in range(3):
            if j < 2:
                self.encoder_block_t.append(nn.ModuleList([self.conv_layer([3, filter[0], filter[0]], bottle_neck=True)]))
                self.decoder_block_t.append(nn.ModuleList([self.conv_layer([filter[0], filter[0], filter[0]], bottle_neck=True)]))
            for i in range(4):
                if i == 0:
                    self.encoder_block_t[j].append(self.conv_layer([filter[i], filter[i + 1], filter[i + 1]], bottle_neck=True))
                    self.decoder_block_t[j].append(self.conv_layer([filter[i + 1], filter[i], filter[i]], bottle_neck=True))
                else:
                    self.encoder_block_t[j].append(self.conv_layer([filter[i], filter[i + 1], filter[i + 1]], bottle_neck=False))
                    self.decoder_block_t[j].append(self.conv_layer([filter[i + 1], filter[i], filter[i]], bottle_neck=False))

        # define cross-stitch units
        self.cs_unit_encoder = nn.Parameter(data=torch.ones(4, 3))
        self.cs_unit_decoder = nn.Parameter(data=torch.ones(5, 3))

        # define task specific layers
        self.pred_task1 = self.conv_layer([filter[0], self.class_nb], bottle_neck=True, pred_layer=True)
        self.pred_task2 = self.conv_layer([filter[0], 1], bottle_neck=True, pred_layer=True)
        self.pred_task3 = self.conv_layer([filter[0], 3], bottle_neck=True, pred_layer=True)

        # define pooling and unpooling functions
        self.down_sampling = nn.MaxPool2d(kernel_size=2, stride=2, return_indices=True)
        self.up_sampling = nn.MaxUnpool2d(kernel_size=2, stride=2)

        self.logsigma = nn.Parameter(torch.FloatTensor([-0.5, -0.5, -0.5]))

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.xavier_uniform_(m.weight)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Parameter):
                nn.init.constant(m.weight, 1)

    def conv_layer(self, channel, bottle_neck, pred_layer=False):
        if bottle_neck:
            if not pred_layer:
                conv_block = nn.Sequential(
                    nn.Conv2d(in_channels=channel[0], out_channels=channel[1], kernel_size=3, padding=1),
                    nn.BatchNorm2d(channel[1]),
                    nn.ReLU(inplace=True),
                    nn.Conv2d(in_channels=channel[1], out_channels=channel[2], kernel_size=3, padding=1),
                    nn.BatchNorm2d(channel[2]),
                    nn.ReLU(inplace=True),
                )
            else:
                conv_block = nn.Sequential(
                    nn.Conv2d(in_channels=channel[0], out_channels=channel[0], kernel_size=3, padding=1),
                    nn.Conv2d(in_channels=channel[0], out_channels=channel[1], kernel_size=1, padding=0),
                )

        else:
            conv_block = nn.Sequential(
                nn.Conv2d(in_channels=channel[0], out_channels=channel[1], kernel_size=3, padding=1),
                nn.BatchNorm2d(channel[1]),
                nn.ReLU(inplace=True),
                nn.Conv2d(in_channels=channel[1], out_channels=channel[1], kernel_size=3, padding=1),
                nn.BatchNorm2d(channel[1]),
                nn.ReLU(inplace=True),
                nn.Conv2d(in_channels=channel[1], out_channels=channel[2], kernel_size=3, padding=1),
                nn.BatchNorm2d(channel[2]),
                nn.ReLU(inplace=True),
            )
        return conv_block

    def forward(self, x):
        encoder_conv_t, decoder_conv_t, encoder_samp_t, decoder_samp_t, indices_t = ([0] * 3 for _ in range(5))
        for i in range(3):
            encoder_conv_t[i], decoder_conv_t[i], encoder_samp_t[i], decoder_samp_t[i], indices_t[i] = ([0] * 5 for _ in range(5))

        # task branch 1
        for i in range(5):
            for j in range(3):
                if i == 0:
                    encoder_conv_t[j][i] = self.encoder_block_t[j][i](x)
                    encoder_samp_t[j][i], indices_t[j][i] = self.down_sampling(encoder_conv_t[j][i])
                else:
                    encoder_cross_stitch = self.cs_unit_encoder[i - 1][0] * encoder_samp_t[0][i - 1] + \
                                           self.cs_unit_encoder[i - 1][1] * encoder_samp_t[1][i - 1] + \
                                           self.cs_unit_encoder[i - 1][2] * encoder_samp_t[2][i - 1]
                    encoder_conv_t[j][i] = self.encoder_block_t[j][i](encoder_cross_stitch)
                    encoder_samp_t[j][i], indices_t[j][i] = self.down_sampling(encoder_conv_t[j][i])

        for i in range(5):
            for j in range(3):
                if i == 0:
                    decoder_cross_stitch = self.cs_unit_decoder[i][0] * encoder_samp_t[0][-1] + \
                                           self.cs_unit_decoder[i][1] * encoder_samp_t[1][-1] + \
                                           self.cs_unit_decoder[i][2] * encoder_samp_t[2][-1]
                    decoder_samp_t[j][i] = self.up_sampling(decoder_cross_stitch, indices_t[j][-i - 1])
                    decoder_conv_t[j][i] = self.decoder_block_t[j][-i - 1](decoder_samp_t[j][i])
                else:
                    decoder_cross_stitch = self.cs_unit_decoder[i][0] * decoder_conv_t[0][i - 1] + \
                                           self.cs_unit_decoder[i][1] * decoder_conv_t[1][i - 1] + \
                                           self.cs_unit_decoder[i][2] * decoder_conv_t[2][i - 1]
                    decoder_samp_t[j][i] = self.up_sampling(decoder_cross_stitch, indices_t[j][-i - 1])
                    decoder_conv_t[j][i] = self.decoder_block_t[j][-i - 1](decoder_samp_t[j][i])

        # define task prediction layers
        t1_pred = F.log_softmax(self.pred_task1(decoder_conv_t[0][-1]), dim=1)
        t2_pred = self.pred_task2(decoder_conv_t[1][-1])
        t3_pred = self.pred_task3(decoder_conv_t[2][-1])
        t3_pred = t3_pred / torch.norm(t3_pred, p=2, dim=1, keepdim=True)

        return [t1_pred, t2_pred, t3_pred], self.logsigma


# define model, optimiser and scheduler
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
SegNet_CROSS = SegNet().to(device)
optimizer = optim.Adam(SegNet_CROSS.parameters(), lr=1e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.5)

print('Parameter Space: ABS: {:.1f}, REL: {:.4f}'.format(count_parameters(SegNet_CROSS),
                                                         count_parameters(SegNet_CROSS) / 24981069))
print('LOSS FORMAT: SEMANTIC_LOSS MEAN_IOU PIX_ACC | DEPTH_LOSS ABS_ERR REL_ERR | NORMAL_LOSS MEAN MED <11.25 <22.5 <30')

# define dataset
dataset_path = opt.dataroot
if opt.apply_augmentation:
    nyuv2_train_set = NYUv2(root=dataset_path, train=True, augmentation=True)
    print('Applying data augmentation on NYUv2.')
else:
    nyuv2_train_set = NYUv2(root=dataset_path, train=True)
    print('Standard training strategy without data augmentation.')

nyuv2_test_set = NYUv2(root=dataset_path, train=False)

batch_size = 2
nyuv2_train_loader = torch.utils.data.DataLoader(
    dataset=nyuv2_train_set,
    batch_size=batch_size,
    shuffle=True)

nyuv2_test_loader = torch.utils.data.DataLoader(
    dataset=nyuv2_test_set,
    batch_size=batch_size,
    shuffle=False)

# Train and evaluate multi-task network
multi_task_trainer(nyuv2_train_loader,
                   nyuv2_test_loader,
                   SegNet_CROSS,
                   device,
                   optimizer,
                   scheduler,
                   opt,
                   200)

- 정리 x

## **1-3. 최종 목표**
- MTAN(Multi-task Attention Network)

#### **📌 전체 Process**
1. **네트워크 파라미터 초기화**
- 모델 내부에서 사용되는 ```필터 개수``` 및 ```클래스 수```와 같은 네트워크 파라미터를 초기화

2. **인코더와 디코더 레이어 정의**
- SegNet은 인코더와 디코더로 구성되며, 이 레이어들은 이미지의 특징을 추출하고 다시 복원
- 각 레이어는 ```컨볼루션 레이어```, ```배치 정규화``` 및 ```ReLU``` 활성화 함수로 구성
- Encoder는 이미지를 down-sampling하고 Decoder는 up-sampling 함

3. **컨볼루션 레이어 정의**
- 인코더와 디코더에서 사용되는 컨볼루션 레이어를 정의
- 이미지 특징 추출, 세분화된 예측을 생성하는 데 사용

4. **어텐션 레이어 정의**
- 어텐션 레이어는 다양한 작업 간에 특징을 공유하도록 함
- 각 어텐션 레이어는 ```컨볼루션 레이어```와 ```시그모이드``` 활성화 함수로 구성되며, 작업 간의 상호 작용을 조절하는 데 사용

5. **풀링 및 언풀링 함수 정의**
- pooling 레이어는 이미지를 down-sampling하고, unpooling 레이어는 up-sampling
- 이미지 크기를 조절하고 공간적인 정보를 보존하는 데 사용

6. **가중치 초기화**
- 모든 레이어의 가중치와 편향을 초기화
  - 모델이 효과적으로 학습될 수 있도록 함

7. **순전파 함수 정의**
- 네트워크의 입력부터 출력까지의 데이터 흐름을 정의
- 해당 코드는 ```MTAN(Multi-Task Attention Network)``` 아키텍처를 구현
  - 다중 작업을 수행하도록 네트워크를 구성
  - Encoder와 Decoder를 통해 특징을 추출하고 어텐션 메커니즘을 사용하여 작업 간에 정보를 공유
  - 각 작업에 대한 예측 레이어를 정의하고, 마지막으로 클래스 확률 예측을 수행

In [None]:
### Import Libraries

import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import argparse
import torch.utils.data.sampler as sampler

from create_dataset import *
from utils import *

In [None]:
### argparse를 사용하여 명령행 인수를 파싱

import argparse

# argparse.ArgumentParser를 사용하여 명령행 인수 파서를 생성
parser = argparse.ArgumentParser(description='Multi-task: Attention Network')
# 명령행 인수를 정의
parser.add_argument('--weight', default='equal', type=str, help='multi-task weighting: equal, uncert, dwa')
# '--dataroot' 설정
parser.add_argument('--dataroot', default='nyuv2', type=str, help='dataset root')
parser.add_argument('--temp', default=2.0, type=float, help='temperature for DWA (must be positive)')
parser.add_argument('--apply_augmentation', action='store_true', help='toggle to apply data augmentation on NYUv2')
# 명령행 인수를 파싱하고 결과를 'opt' 객체에 저장합니다.
opt = parser.parse_args()

In [None]:
### SegNet 클래스 정의

class SegNet(nn.Module):
    def __init__(self):
        super(SegNet, self).__init__()

        # 1. 네트워크 파라미터 초기화
        filter = [64, 128, 256, 512, 512]  # 각 레이어의 필터 개수
        self.class_nb = 13  # 클래스 개수

        # 2. 인코더 및 디코더 레이어 정의
        # 이미지의 특징을 추출하고 다시 복원
        self.encoder_block = nn.ModuleList([self.conv_layer([3, filter[0]])]) # 이미지를 down-sampling
        self.decoder_block = nn.ModuleList([self.conv_layer([filter[0], filter[0]])]) # 이미지를 up-sampling
        for i in range(4):
            self.encoder_block.append(self.conv_layer([filter[i], filter[i + 1]]))
            self.decoder_block.append(self.conv_layer([filter[i + 1], filter[i]]))

        # 3. 컨볼루션 레이어 정의
        # Encoder와 Decoder에서 사용되는 합성곱 layer를 정의
        self.conv_block_enc = nn.ModuleList([self.conv_layer([filter[0], filter[0]])])
        self.conv_block_dec = nn.ModuleList([self.conv_layer([filter[0], filter[0]])])
        for i in range(4):
            if i == 0:
                self.conv_block_enc.append(self.conv_layer([filter[i + 1], filter[i + 1]]))
                self.conv_block_dec.append(self.conv_layer([filter[i], filter[i]]))
            else:
                self.conv_block_enc.append(nn.Sequential(self.conv_layer([filter[i + 1], filter[i + 1]]),
                                                         self.conv_layer([filter[i + 1], filter[i + 1]])))
                self.conv_block_dec.append(nn.Sequential(self.conv_layer([filter[i], filter[i]]),
                                                         self.conv_layer([filter[i], filter[i]])))

        # 4. task attention 레이어 정의
        # 다양한 작업 간에 특징을 공유할 수 있도록 함
        self.encoder_att = nn.ModuleList([nn.ModuleList([self.att_layer([filter[0], filter[0], filter[0]])])])
        self.decoder_att = nn.ModuleList([nn.ModuleList([self.att_layer([2 * filter[0], filter[0], filter[0]])])])

        self.encoder_block_att = nn.ModuleList([self.conv_layer([filter[0], filter[1]])])
        self.decoder_block_att = nn.ModuleList([self.conv_layer([filter[0], filter[0]])])

        for j in range(3):
            if j < 2:
                self.encoder_att.append(nn.ModuleList([self.att_layer([filter[0], filter[0], filter[0]])]))
                self.decoder_att.append(nn.ModuleList([self.att_layer([2 * filter[0], filter[0], filter[0]])]))
            for i in range(4):
                self.encoder_att[j].append(self.att_layer([2 * filter[i + 1], filter[i + 1], filter[i + 1]]))
                self.decoder_att[j].append(self.att_layer([filter[i + 1] + filter[i], filter[i], filter[i]]))

        for i in range(4):
            if i < 3:
                self.encoder_block_att.append(self.conv_layer([filter[i + 1], filter[i + 2]]))
                self.decoder_block_att.append(self.conv_layer([filter[i + 1], filter[i]]))
            else:
                self.encoder_block_att.append(self.conv_layer([filter[i + 1], filter[i + 1]]))
                self.decoder_block_att.append(self.conv_layer([filter[i + 1], filter[i + 1]]))

        # 태스크 예측 레이어 정의
        # 마지막 레이어인 경우 1*1 convolution을 통해 클래스에 대한 예측 수행
        self.pred_task1 = self.conv_layer([filter[0], self.class_nb], pred=True)
        self.pred_task2 = self.conv_layer([filter[0], 1], pred=True)
        self.pred_task3 = self.conv_layer([filter[0], 3], pred=True)

        # 5. 풀링 및 언풀링 함수 정의
        # 이미지 크기 조절, 공간 정보 보존
        self.down_sampling = nn.MaxPool2d(kernel_size=2, stride=2, return_indices=True) # 풀링 -> down-sampling
        self.up_sampling = nn.MaxUnpool2d(kernel_size=2, stride=2) # 언풀링 -> up-sampling
        self.logsigma = nn.Parameter(torch.FloatTensor([-0.5, -0.5, -0.5]))

        # 6. 가중치 초기화
        # 모든 모듈의 가중치 및 편향을 적절하게 초기화
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.xavier_normal_(m.weight)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_normal_(m.weight)
                nn.init.constant_(m.bias, 0)

    ## 컨볼루션 레이어 정의
    # channel: 입력 및 출력 채널의 수
    # pred: 마지막 레이어인지의 여부
    def conv_layer(self, channel, pred=False):
        if not pred:
            conv_block = nn.Sequential(
                nn.Conv2d(in_channels=channel[0], out_channels=channel[1], kernel_size=3, padding=1),
                nn.BatchNorm2d(num_features=channel[1]),
                nn.ReLU(inplace=True),
            )
        else:
            conv_block = nn.Sequential(
                nn.Conv2d(in_channels=channel[0], out_channels=channel[0], kernel_size=3, padding=1),
                nn.Conv2d(in_channels=channel[0], out_channels=channel[1], kernel_size=1, padding=0),
            )
        return conv_block

    ## 어텐션 레이어 정의
    # 입력 채널에서 attention 가중치 생성
    def att_layer(self, channel):
        att_block = nn.Sequential(
            nn.Conv2d(in_channels=channel[0], out_channels=channel[1], kernel_size=1, padding=0),
            nn.BatchNorm2d(channel[1]),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=channel[1], out_channels=channel[2], kernel_size=1, padding=0),
            nn.BatchNorm2d(channel[2]),
            nn.Sigmoid(),
        )
        return att_block

    ## 7. 순전파 함수
    # 네트워크의 모든 구성 요소를 초기화하고 데이터가 앞으로 흐르게 됨
    def forward(self, x):
        # 네트워크의 각 부분을 초기화
        g_encoder, g_decoder, g_maxpool, g_upsampl, indices = ([0] * 5 for _ in range(5))
        for i in range(5):
            g_encoder[i], g_decoder[-i - 1] = ([0] * 2 for _ in range(2))

        # 태스크를 위한 어텐션 리스트 정의
        atten_encoder, atten_decoder = ([0] * 3 for _ in range(2))
        for i in range(3):
            atten_encoder[i], atten_decoder[i] = ([0] * 5 for _ in range(2))
        for i in range(3):
            for j in range(5):
                atten_encoder[i][j], atten_decoder[i][j] = ([0] * 3 for _ in range(2))

        # 전역 공유 네트워크 정의
        for i in range(5):
            if i == 0:
                g_encoder[i][0] = self.encoder_block[i](x)
                g_encoder[i][1] = self.conv_block_enc[i](g_encoder[i][0])
                g_maxpool[i], indices[i] = self.down_sampling(g_encoder[i][1])
            else:
                g_encoder[i][0] = self.encoder_block[i](g_maxpool[i - 1])
                g_encoder[i][1] = self.conv_block_enc[i](g_encoder[i][0])
                g_maxpool[i], indices[i] = self.down_sampling(g_encoder[i][1])

        for i in range(5):
            if i == 0:
                g_upsampl[i] = self.up_sampling(g_maxpool[-1], indices[-i - 1])
                g_decoder[i][0] = self.decoder_block[-i - 1](g_upsampl[i])
                g_decoder[i][1] = self.conv_block_dec[-i - 1](g_decoder[i][0])
            else:
                g_upsampl[i] = self.up_sampling(g_decoder[i - 1][-1], indices[-i - 1])
                g_decoder[i][0] = self.decoder_block[-i - 1](g_upsampl[i])
                g_decoder[i][1] = self.conv_block_dec[-i - 1](g_decoder[i][0])

        # 태스크 종속 어텐션 모듈 정의
        # 각 작업 관련 특징 학습
        for i in range(3):
            for j in range(5):
                if j == 0:
                    atten_encoder[i][j][0] = self.encoder_att[i][j](g_encoder[j][0])
                    atten_encoder[i][j][1] = (atten_encoder[i][j][0]) * g_encoder[j][1]
                    atten_encoder[i][j][2] = self.encoder_block_att[j](atten_encoder[i][j][1])
                    atten_encoder[i][j][2] = F.max_pool2d(atten_encoder[i][j][2], kernel_size=2, stride=2)
                else:
                    atten_encoder[i][j][0] = self.encoder_att[i][j](torch.cat((g_encoder[j][0], atten_encoder[i][j - 1][2]), dim=1))
                    atten_encoder[i][j][1] = (atten_encoder[i][j][0]) * g_encoder[j][1]
                    atten_encoder[i][j][2] = self.encoder_block_att[j](atten_encoder[i][j][1])
                    atten_encoder[i][j][2] = F.max_pool2d(atten_encoder[i][j][2], kernel_size=2, stride=2)

            for j in range(5):
                if j == 0:
                    atten_decoder[i][j][0] = F.interpolate(atten_encoder[i][-1][-1], scale_factor=2, mode='bilinear', align_corners=True)
                    atten_decoder[i][j][0] = self.decoder_block_att[-j - 1](atten_decoder[i][j][0])
                    atten_decoder[i][j][1] = self.decoder_att[i][-j - 1](torch.cat((g_upsampl[j], atten_decoder[i][j][0]), dim=1))
                    atten_decoder[i][j][2] = (atten_decoder[i][j][1]) * g_decoder[j][-1]
                else:
                    atten_decoder[i][j][0] = F.interpolate(atten_decoder[i][j - 1][2], scale_factor=2, mode='bilinear', align_corners=True)
                    atten_decoder[i][j][0] = self.decoder_block_att[-j - 1](atten_decoder[i][j][0])
                    atten_decoder[i][j][1] = self.decoder_att[i][-j - 1](torch.cat((g_upsampl[j], atten_decoder[i][j][0]), dim=1))
                    atten_decoder[i][j][2] = (atten_decoder[i][j][1]) * g_decoder[j][-1]

        # 태스크 예측 레이어 정의
        t1_pred = F.log_softmax(self.pred_task1(atten_decoder[0][-1][-1]), dim=1)
        t2_pred = self.pred_task2(atten_decoder[1][-1][-1])
        t3_pred = self.pred_task3(atten_decoder[2][-1][-1])
        t3_pred = t3_pred / torch.norm(t3_pred, p=2, dim=1, keepdim=True)

        return [t1_pred, t2_pred, t3_pred], self.logsigma

In [None]:
### 모델, optimizer 및 scheduler 정의

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
SegNet_MTAN = SegNet().to(device)
optimizer = optim.Adam(SegNet_MTAN.parameters(), lr=1e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.5)

# 네트워크의 파라미터 수 출력
print('Parameter Space: ABS: {:.1f}, REL: {:.4f}'.format(count_parameters(SegNet_MTAN),
                                                         count_parameters(SegNet_MTAN) / 24981069))
print('LOSS FORMAT: SEMANTIC_LOSS MEAN_IOU PIX_ACC | DEPTH_LOSS ABS_ERR REL_ERR | NORMAL_LOSS MEAN MED <11.25 <22.5 <30')

In [None]:
### 데이터셋 정의
dataset_path = opt.dataroot
if opt.apply_augmentation:
    nyuv2_train_set = NYUv2(root=dataset_path, train=True, augmentation=True)
    print('Applying data augmentation on NYUv2.')
else:
    nyuv2_train_set = NYUv2(root=dataset_path, train=True)
    print('Standard training strategy without data augmentation.')

nyuv2_test_set = NYUv2(root=dataset_path, train=False)

In [None]:
batch_size = 2

### 데이터로더 정의
nyuv2_train_loader = torch.utils.data.DataLoader(
    dataset=nyuv2_train_set,
    batch_size=batch_size,
    shuffle=True)

nyuv2_test_loader = torch.utils.data.DataLoader(
    dataset=nyuv2_test_set,
    batch_size=batch_size,
    shuffle=False)

In [None]:
# Multi-task 네트워크를 훈련하고 평가
multi_task_trainer(nyuv2_train_loader,
                   nyuv2_test_loader,
                   SegNet_MTAN,
                   device,
                   optimizer,
                   scheduler,
                   opt,
                   200)