## Multi Label(Head, Branch) Classifier
- 하나의 Convolution 모델에서 3개의 FC Layer 브랜치를 만들어보자

In [1]:
import os
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt 

import torch
import torch.nn as nn
import torchvision
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)


cuda


- 기존 train data 1가지
- mask, gender, age 각각 labeling을 한 데이터프레임 3가지
- 총 4가지의 데이터프레임을 Dataset모듈에 넣을 것임

In [2]:
# # 기본 train 데이터 셋
# df = pd.read_csv('train_label.csv')
# df.head(5)

Unnamed: 0.1,Unnamed: 0,path,label
0,0,../input/data/train/images/000001_female_Asian...,4
1,1,../input/data/train/images/000001_female_Asian...,4
2,2,../input/data/train/images/000001_female_Asian...,4
3,3,../input/data/train/images/000001_female_Asian...,10
4,4,../input/data/train/images/000001_female_Asian...,4


In [19]:
# # wear, incorrect, normal 3가지 클래스로 변경
# # 0~5, 6~11, 12~17끼리 묶는다
# df_mask = df.copy()

# def mask_label(x):
#     if x in [0,1,2,3,4,5]:
#         return 0    # wear
#     elif x in [6,7,8,9,10,11]:
#         return 1    # incorrect
#     else:
#         return 2    # not wear

# df_mask['label'] = df_mask['label'].apply(mask_label)

# # 남성, 여성 2가지 클래스로 변경
# # [0,1,2,6,7,8,12,13,14], [3,4,5,9,10,11,15,16,17]
# df_gender = df.copy()

# def gender_label(x):
#     if x in [0,1,2,6,7,8,12,13,14]:
#         return 0    # male
#     else:
#         return 1    # female

# df_gender['label'] = df_gender['label'].apply(gender_label)

# # 청년, 중년, 장년 3가지 클래스로 변경
# # [0,3,6,9,12,15], [1,4,7,10,13,16], [2,5,8,11,14,17]
# df_age = df.copy()

# def age_label(x):
#     if x in [0,3,6,9,12,15]:
#         return 0    # young
#     elif x in [1,4,7,10,13,16]:
#         return 1    # middle
#     else:
#         return 2    # old

# df_age['label'] = df_age['label'].apply(age_label)


### Dataset  정의
- mask, gender, age 별로 label 나눔

In [60]:
class MultiBranchDataset(Dataset):
    def __init__(self, df, df_mask, df_gender, df_age, transforms):
        self.df = df
        self.image_data = self.df['path']   # x data, 이미지
        self.image_label = self.df['label'] # y data, 레이블

        self.mask_label = df_mask['label']
        self.gender_label = df_gender['label']
        self.age_label = df_age['label']

        self.transform = transforms

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):

        img_path = self.df['path'].iloc[idx]
        img = Image.open(img_path)

        if self.transform:
            img = self.transform(img)
        
        dict_label = {
            'class' : self.image_label[idx],
            'mask' : self.mask_label[idx],
            'gender' : self.gender_label[idx],
            'age' : self.age_label[idx]
        }

        return img, dict_label

In [61]:
# 데이터셋 준비
df = pd.read_csv('train_label.csv')
df_mask = pd.read_csv('df_mask.csv')
df_gender = pd.read_csv('df_gender.csv')
df_age = pd.read_csv('df_age.csv')

(14175, 3) (4725, 3)


#### Transform & Dataset
- 모든 데이터를 한방에 학습할 때와
- train과 valid를 split할 때의 코드가 다름 (train, valid를 쪼갠거와 동일한 mask, gender, age 레이블링이 필요함)

In [62]:
# Transform Compose
data_transform = torchvision.transforms.Compose([
    # transforms.CenterCrop(320),
    torchvision.transforms.Resize((350,350),Image.BILINEAR),
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(mean=(0.5,0.5,0.5), std=(0.2,0.2,0.2)),
])

# train data 한방에 학습시키는 경우
train_dataset = MultiBranchDataset(df, df_mask, df_gender, df_age, data_transform)



In [None]:
# 데이터를 split할 경우, 기존 데이터(train, valid)와 이에 해당하는 mask, age, gender 레이블을 모두 맞춰야함
from sklearn.model_selection import train_test_split
train, valid = train_test_split(df, test_size = 0.25, shuffle=True, stratify=df['label'], random_state=1234)
print(train.shape, valid.shape)

### train에 대해서 mask, age, gender 데이터프레임 생성
df_mask = train.copy()

def mask_label(x):
    if x in [0,1,2,3,4,5]:
        return 0    # wear
    elif x in [6,7,8,9,10,11]:
        return 1    # incorrect
    else:
        return 2    # not wear

df_mask['label'] = df_mask['label'].apply(mask_label)

# 남성, 여성 2가지 클래스로 변경
# [0,1,2,6,7,8,12,13,14], [3,4,5,9,10,11,15,16,17]
df_gender = train.copy()

def gender_label(x):
    if x in [0,1,2,6,7,8,12,13,14]:
        return 0    # male
    else:
        return 1    # female

df_gender['label'] = df_gender['label'].apply(gender_label)

# 청년, 중년, 장년 3가지 클래스로 변경
# [0,3,6,9,12,15], [1,4,7,10,13,16], [2,5,8,11,14,17]
df_age = train.copy()

def age_label(x):
    if x in [0,3,6,9,12,15]:
        return 0    # young
    elif x in [1,4,7,10,13,16]:
        return 1    # middle
    else:
        return 2    # old

df_age['label'] = df_age['label'].apply(age_label)


### DataLoader
- 여기서 3개로 나눌 필요가 없음
- 모델 train에서 loss를 따로 나눌 것임

In [63]:
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)


데이터로더의 label 부분을 보면 mask, gender, age별로 배치사이즈에 맞게 레이블 정보가 담아짐

In [64]:
next(iter(train_dataloader))[1]

{'class': tensor([15,  3,  0,  5, 15, 15,  9,  3,  0, 16,  4,  3,  3,  4,  0,  0,  3, 14,
          3,  0,  7,  4,  9,  4,  0, 15,  4,  4, 17, 16,  9, 17, 16,  0,  3,  9,
          5,  7, 10, 15,  2, 15,  3,  3,  0,  1,  0, 16,  3,  4, 16,  1,  2,  7,
          5,  4, 16,  1,  8,  2,  3,  2,  0,  1]),
 'mask': tensor([2, 0, 0, 0, 2, 2, 1, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 1, 0, 1, 0,
         0, 2, 0, 0, 2, 2, 1, 2, 2, 0, 0, 1, 0, 1, 1, 2, 0, 2, 0, 0, 0, 0, 0, 2,
         0, 0, 2, 0, 0, 1, 0, 0, 2, 0, 1, 0, 0, 0, 0, 0]),
 'gender': tensor([1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1, 0, 1, 0, 0, 1, 1, 1,
         0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 1,
         1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 0]),
 'age': tensor([0, 0, 0, 2, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 2, 0, 0, 1, 1, 0, 1,
         0, 0, 1, 1, 2, 1, 0, 2, 1, 0, 0, 0, 2, 1, 1, 0, 2, 0, 0, 0, 0, 1, 0, 1,
         0, 1, 1, 1, 2, 1, 2, 1, 1, 1, 2, 2, 0, 2, 0, 1])}

#### Conv layer 빠져나오고 브랜치 만들기

In [65]:
class MultiBranchModel(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.base_model = torchvision.models.resnet18(pretrained=True)

        self.base_model = nn.Sequential(*list(self.base_model.children())[:-1])

        self.mask = nn.Sequential(
            # nn.Dropout(p=0.2),
            nn.Linear(512, 3, bias=True)
        )
        self.gender = nn.Sequential(
            # nn.Dropout(p=0.2),
            nn.Linear(512, 2, bias=True)
        )
        self.age = nn.Sequential(
            # nn.Dropout(p=0.2),
            nn.Linear(512, 3, bias=True)
        )
        

    def forward(self, x):
        x = self.base_model(x)
        # print(x.shape)

        x = torch.flatten(x, start_dim=1)
        # print('22: ', x.shape)

        return {
            'mask': self.mask(x),
            'gender': self.gender(x),
            'age': self.age(x)
        }

    # Loss 함수 구현 부분
    def get_loss(self, net_output, ground_truth):
        mask_loss = F.cross_entropy(net_output['mask'], ground_truth['mask'].to(device))
        gender_loss = F.cross_entropy(net_output['gender'], ground_truth['gender'].to(device))
        age_loss = F.cross_entropy(net_output['age'], ground_truth['age'].to(device))

        loss = mask_loss + gender_loss + age_loss

        return loss, {'mask' : mask_loss, 'gender' : gender_loss, 'age' : age_loss}

In [39]:
model = MultiBranchModel()
x = torch.ones((1,3,224,224))
out = model.forward(x)

### Loss 함수 정의
- 3가지 task 각각 loss 구하고 합침

In [None]:
# def get_loss(net_output, ground_truth):
#     mask_loss = F.cross_entropy(net_output['mask'], ground_truth['mask'])
#     gender_loss = F.cross_entropy(net_output['gender'], ground_truth['gender'])
#     age_loss = F.cross_entropy(net_output['age'], ground_truth['age'])

#     loss = mask_loss + gender_loss + age_loss

#     return loss, {'mask' : mask_loss, 'gender' : gender_loss, 'age' : age_loss}

## Train

In [71]:
model = MultiBranchModel().to(device)

LEARNING_RATE = 0.001
NUM_EPOCH = 3
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)


In [74]:

for epoch in range(NUM_EPOCH):
    running_acc = 0
    total_loss = 0
    for i, (images, labels) in enumerate(train_dataloader):

        images = images.to(device)
        labels = labels # dict타입이라서 get_loss 함수 안에서 따로 cuda로 올림

        optimizer.zero_grad()

        output = model(images)


        pred_mask = torch.argmax(output['mask'], dim=-1)
        pred_gender = torch.argmax(output['gender'], dim=-1)
        pred_age = torch.argmax(output['age'], dim=-1)
        pred_class = (pred_mask * 6) + (pred_gender * 3) + (pred_age)

        loss_train, _ = model.get_loss(output, labels)
        total_loss += loss_train.item()

        loss_train.backward()       # gradient를 계산
        optimizer.step()      # gradient descent
    

        running_acc += torch.sum(pred_class == labels['class'].to(device))

    
        if i % 50 == 0:
            print('Epoch: {}, i: {},Loss: {:.6f}'.format(epoch, i, loss_train.item()))
            print(f'{i}번 배치: {running_acc}/{(i+1)*64}, 정확도: {running_acc/((i+1)*64)}')
    
    epoch_acc = running_acc / len(train_dataloader.dataset)
    print('Final Accuracy :', epoch_acc)


Epoch: 0, i: 0,Loss: 0.299706
0번 배치: 54/64, 정확도: 0.84375
Epoch: 0, i: 50,Loss: 0.086221
50번 배치: 3110/3264, 정확도: 0.9528186321258545
Epoch: 0, i: 100,Loss: 0.314374
100번 배치: 6128/6464, 정확도: 0.948019802570343
Epoch: 0, i: 150,Loss: 0.120939
150번 배치: 9155/9664, 정확도: 0.9473302960395813
Epoch: 0, i: 200,Loss: 0.134061
200번 배치: 12235/12864, 정확도: 0.9511038064956665
Epoch: 0, i: 250,Loss: 0.044403
250번 배치: 15287/16064, 정확도: 0.9516310095787048
Final Accuracy : tensor(0.9522, device='cuda:0')
Epoch: 1, i: 0,Loss: 0.203645
0번 배치: 58/64, 정확도: 0.90625
Epoch: 1, i: 50,Loss: 0.041827
50번 배치: 3153/3264, 정확도: 0.9659926891326904
Epoch: 1, i: 100,Loss: 0.036559
100번 배치: 6276/6464, 정확도: 0.9709158539772034
Epoch: 1, i: 150,Loss: 0.140769
150번 배치: 9398/9664, 정확도: 0.9724751710891724
Epoch: 1, i: 200,Loss: 0.075183
200번 배치: 12522/12864, 정확도: 0.9734141826629639
Epoch: 1, i: 250,Loss: 0.074180
250번 배치: 15622/16064, 정확도: 0.9724850654602051
Final Accuracy : tensor(0.9729, device='cuda:0')
Epoch: 2, i: 0,Loss: 0.12

### 테스트(Evaluation) 중, mask, gender, age별 output 뽑아내기
- 최종 클래스 예측(18개)으로 변환
- mask : 0, 1, 2 (wear, incorrect, not wear)
- gender : 0, 1  (male, female)
- age : 0, 1, 2  (young, middle, old)
- class : (mask * 6) + (gender * 3) + (age)

In [None]:
output = model(image)
pred_mask = torch.argmax(output['mask'], dim=-1)
pred_gender = torch.argmax(output['gender'], dim=-1)
pred_age = torch.argmax(output['age'], dim=-1)

pred_class = (pred_mask * 6) + (pred_gender * 3) + (pred_age)

## Testing

In [75]:
class TestDataset(Dataset):
    def __init__(self, img_paths, transform):
        self.img_paths = img_paths
        self.transform = transform

    def __getitem__(self, index):
        image = Image.open(self.img_paths[index])

        if self.transform:
            image = self.transform(image)


        return image

    def __len__(self):
        return len(self.img_paths)

In [77]:
# meta 데이터와 이미지 경로를 불러옵니다.
test_dir = '/opt/ml/input/data/eval'
submission = pd.read_csv(os.path.join(test_dir, 'info.csv'))
image_dir = os.path.join(test_dir, 'images')

# Test Dataset 클래스 객체를 생성하고 DataLoader를 만듭니다.
image_paths = [os.path.join(image_dir, img_id) for img_id in submission.ImageID]

dataset = TestDataset(image_paths, data_transform)

loader = DataLoader(
    dataset,
    shuffle=False
)

# 모델을 정의합니다. (학습한 모델이 있다면 torch.load로 모델을 불러주세요!)
device = torch.device('cuda')
test_model = model.to(device)
test_model.eval()

# 모델이 테스트 데이터셋을 예측하고 결과를 저장합니다.
all_predictions = []
for images in loader:
    with torch.no_grad():
        images = images.to(device)
        output = test_model(images)

        pred_mask = torch.argmax(output['mask'], dim=-1)
        pred_gender = torch.argmax(output['gender'], dim=-1)
        pred_age = torch.argmax(output['age'], dim=-1)
        pred_class = (pred_mask * 6) + (pred_gender * 3) + (pred_age)

        all_predictions.extend(pred_class.cpu().numpy())
submission['ans'] = all_predictions

# 제출할 파일을 저장합니다.
submission.to_csv(os.path.join(test_dir, 'submission6.csv'), index=False)
print('test inference is done!')

test inference is done!
