In [16]:
import os
import pandas as pd
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from torchvision import transforms
from torchvision.transforms import Resize, ToTensor, Normalize, RandomHorizontalFlip, RandomApply
import torchvision.transforms as T

In [17]:
test_dir = '/opt/ml/input/data/eval'
train_dir = '/opt/ml/input/data/train'

In [18]:
class BottleNeck(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(BottleNeck, self).__init__()
        #print(in_channels, out_channels)

        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

        self.conv2 = nn.Sequential(
            nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

        self.conv3 = nn.Sequential(
            nn.Conv2d(out_channels, out_channels * 4, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(out_channels * 4)
        )

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        return x

In [19]:
class ResBlock(nn.Module):
    def __init__(self, in_channels, out_channels, init_block, init_stride=1):
        super(ResBlock, self).__init__()

        self.conv1 = nn.Sequential(
            BottleNeck(in_channels, out_channels, stride=init_stride)
        )

        self.init_stride = init_stride
        self.init_block = init_block

        if init_block:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * 4, kernel_size=1, stride=init_stride),
                nn.BatchNorm2d(out_channels * 4)
            )
            #print("here0", in_channels, out_channels * 4, init_stride)
        else:
            self.shortcut = nn.Identity()

        self.relu = nn.ReLU(inplace=True)
        

    def forward(self, x):
        #print("here1", x.shape, self.init_stride, self.init_block)
        x = self.shortcut(x) + self.conv1(x)
        x = self.relu(x)
        #print("here2", x.shape)
        return x

In [20]:
class ResNet(nn.Module):
    def __init__(self, nblk=[3,4,6,3], nker=64, num_classes=8):
        super(ResNet, self).__init__()

        self.conv1 = nn.Sequential(
            nn.Conv2d(3, nker, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm2d(nker),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )

        layers=[]

        layers.append(ResBlock(nker, 64, init_block=True))
        nker = nker * 4
        for i in range(nblk[0]-1):
            layers.append(ResBlock(nker, 64, init_block=False))

        layers.append(ResBlock(nker, 128, init_block=True, init_stride=2))
        nker = nker * 2
        for i in range(nblk[1]-1):
            layers.append(ResBlock(nker, 128, init_block=False))

        layers.append(ResBlock(nker, 256, init_block=True, init_stride=2))
        nker = nker * 2
        for i in range(nblk[2]-1):
            layers.append(ResBlock(nker, 256, init_block=False))

        layers.append(ResBlock(nker, 512, init_block=True, init_stride=2))
        nker = nker * 2
        for i in range(nblk[3]-1):
            layers.append(ResBlock(nker, 512, init_block=False))

        layers.append(nn.AdaptiveAvgPool2d((1, 1)))
        self.rnt = nn.Sequential(*layers)

        self.fc = nn.Linear(nker, num_classes)
        
        #self.dropout = nn.Dropout(0.5)
        #self.softmax = nn.Softmax(dim=1)
        #print(nker)

    def forward(self, x):
        x = self.conv1(x)
        x = self.rnt(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

In [21]:
def init_weights(m):
    if type(m) == nn.Conv2d or type(m) == nn.Linear:
        nn.init.kaiming_uniform_(m.weight, mode='fan_out', nonlinearity='relu')
        if m.bias is not None:
            nn.init.constant_(m.bias, 0)

In [22]:
# Network
model_test = ResNet()
model_test.apply(init_weights)

# Random input
x = torch.randn((1, 3, 150, 150))

# Forward
out = model_test(x)

# Check the output shape
print("Output tensor shape is :", out.shape)

Output tensor shape is : torch.Size([1, 8])


In [23]:
class MaskBaseDataset(Dataset):
    def __init__(self, csv, transform):
        self.csv = csv
        self.transform = transform

    def __len__(self):
        return len(self.csv)
    
    def read_image(self, index):
        img_path = self.csv.iloc[index]['path']
        image = Image.open(img_path)
        return image
        
    def get_mask_label(self, index):
        mask_label = self.csv.iloc[index]['mask']
        if mask_label == 'mask':
            return 0
        elif mask_label == 'incorrect':
            return 1
        else:
            return 2
    
    def get_gender_label(self, index):
        gender_label = self.csv.iloc[index]['gender']

        if gender_label == 'male':
            return 0
        else:
            return 1

    def get_age_label(self, index):
        age_label = self.csv.iloc[index]['age']

        if age_label < 30:
            return 0
        elif age_label < 60:
            return 1
        else:
            return 2
        
    def __get__item(self, index):
        raise NotImplementedError
        

class MaskMultiLabelDataset(MaskBaseDataset):
    num_classes = 3 + 2 + 3

    def __getitem__(self, index):
        assert self.transform is not None, ".set_tranform 메소드를 이용하여 transform 을 주입해주세요"

        image = self.read_image(index)
        mask_label = self.get_mask_label(index)
        gender_label = self.get_gender_label(index)
        age_label = self.get_age_label(index)
        # multi_class_label = self.encode_multi_class(mask_label, gender_label, age_label)

        image_transform = self.transform(image)
        return image_transform, (mask_label, gender_label, age_label)
    

In [24]:
class AddGaussianNoise(object):
    def __init__(self, mean=0., std=1.):
        self.std = std
        self.mean = mean
        
    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean
    
    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)

In [25]:
import timm
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, accuracy_score

In [26]:
# from torchvision.models import resnet50#, ResNet50_Weights

# def load_pretrained_weights(model, pretrained_model):
#     pretrained_dict = pretrained_model.state_dict()
#     model_dict = model.state_dict()
    
#     print(pretrained_dict)
#     print(model_dict)
    
# model = ResNet(num_classes=8)#.to(device)
# pretrained_model = resnet50(pretrained=True)
# pretrained_dict = pretrained_model.state_dict()
# model_dict = model.state_dict()
    
# print(pretrained_dict)
# print(model_dict)

In [27]:
submission = pd.read_csv(os.path.join(test_dir, 'info.csv'))
image_dir = os.path.join(test_dir, 'images')
all_csv = pd.read_csv(os.path.join(train_dir, 'train_data.csv'))

train_csv, val_csv = train_test_split(all_csv, test_size=0.15, random_state=42)

transform = transforms.Compose([
    # Resize((512, 384), Image.BILINEAR),
    ToTensor(),
    RandomHorizontalFlip(p=0.5),
    RandomApply([AddGaussianNoise(0.5, 0.2)]),
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.2, 0.2, 0.2)),
])

train_dataset = MaskMultiLabelDataset(train_csv, transform)
val_dataset = MaskMultiLabelDataset(val_csv, transform)

num_classes = MaskMultiLabelDataset.num_classes

train_loader = DataLoader(
    train_dataset,
    batch_size=64,
    shuffle=False
)

val_loader = DataLoader(
    train_dataset,
    batch_size=64,
    shuffle=False
)

def multi_label_to_class(mask_label, gender_label, age_label):
    return mask_label * 6 + gender_label * 3 + age_label

# 모델을 정의합니다. (학습한 모델이 있다면 torch.load로 모델을 불러주세요!)
device = torch.device('cuda')
torch.cuda.empty_cache()

#model = ResNet(num_classes=8)#.to(device)
#model.apply(init_weights)
model_PATH = '/opt/ml/test/model.pt'

# pretrained 모델을 load하여 사용
model = timm.create_model('resnet18', num_classes=MaskMultiLabelDataset.num_classes, pretrained=True)

# pretrained된 모델의 가중치를 사용하도록 하는 코드
# pretrained_model = resnet50(pretrained=True)
# load_pretrained_weights = (model, pretrained_model)

# 기존 모델 가중치를 load하여 사용하는 코드
# if os.path.isfile(model_PATH):
#     model.load_state_dict(torch.load(model_PATH))

model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()   

epochs = 50

for epoch in range(epochs):
    model.train()
    lslist = []
    lossv = 0
    for idx, train_batch in enumerate(train_loader):
        images, (mask_labels, gender_labels, age_labels) = train_batch
        images = images.to(device)
        mask_labels = mask_labels.to(device)
        gender_labels = gender_labels.to(device)
        age_labels = age_labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        (mask_outputs, gender_outputs, age_outputs) = torch.split(outputs, [3,2,3], dim=1)

        mask_loss = criterion(mask_outputs, mask_labels)
        gender_loss = criterion(gender_outputs, gender_labels)
        age_loss = criterion(age_outputs, age_labels)

        loss = mask_loss + gender_loss + (1.3 * age_loss)

        lslist.append(loss.item())
        
        loss.backward()
        optimizer.step()

        lossv += loss.item()
        if idx % 100 == 99:
            print(f'Loss in {epoch+1}-{idx+1}', lossv / 100)
            lossv = 0
            
    if epoch % 5 == 0 and epoch != 0:
        torch.save(model.state_dict(), model_PATH)

    avg_loss = sum(lslist)/len(lslist)
    
    model.eval()
    
    y_true = []
    y_pred = []
    
    with torch.no_grad():
        for images, (mask_labels, gender_labels, age_labels) in val_loader:
            images = images.to(device)
            mask_labels = mask_labels.to(device)
            gender_labels = gender_labels.to(device)
            age_labels = age_labels.to(device)

            outputs = model(images)
            (mask_outputs, gender_outputs, age_outputs) = torch.split(outputs, [3, 2, 3], dim=1)

            mask_preds = torch.argmax(mask_outputs, dim=1)
            gender_preds = torch.argmax(gender_outputs, dim=1)
            age_preds = torch.argmax(age_outputs, dim=1)

            for i in range(len(images)):
                y_true.append(multi_label_to_class(mask_labels[i].item(), gender_labels[i].item(), age_labels[i].item()))
                y_pred.append(multi_label_to_class(mask_preds[i].item(), gender_preds[i].item(), age_preds[i].item()))

    f1 = f1_score(y_true, y_pred, average='weighted')
    accuracy = accuracy_score(y_true, y_pred)
    
    print('Epoch [{}/{}], Loss: {:.4f}, {:.4f}'.format(epoch+1, epochs, loss.item(), avg_loss))
    print('Epoch [{}/{}], accuracy : {:.4f}, f1_score : {:.4f}'.format(epoch+1, epochs, accuracy, f1))

Loss in 1-100 0.9021756497025489
Loss in 1-200 0.5408713603019715
Epoch [1/50], Loss: 0.9655, 0.6660
Epoch [1/50], accuracy : 0.7386, f1_score : 0.7315
Loss in 2-100 0.41467378988862036
Loss in 2-200 0.37361507415771483
Epoch [2/50], Loss: 0.3738, 0.3778
Epoch [2/50], accuracy : 0.8380, f1_score : 0.8368
Loss in 3-100 0.3049144271016121
Loss in 3-200 0.28636482790112494
Epoch [3/50], Loss: 0.1674, 0.2867
Epoch [3/50], accuracy : 0.8979, f1_score : 0.8961
Loss in 4-100 0.23585030268877744
Loss in 4-200 0.22025424502789975
Epoch [4/50], Loss: 0.4991, 0.2250
Epoch [4/50], accuracy : 0.9362, f1_score : 0.9346
Loss in 5-100 0.1872155051678419
Loss in 5-200 0.15228873454034328
Epoch [5/50], Loss: 0.0984, 0.1652
Epoch [5/50], accuracy : 0.9267, f1_score : 0.9269
Loss in 6-100 0.16184934839606285
Loss in 6-200 0.13323659390211107
Epoch [6/50], Loss: 0.1053, 0.1475
Epoch [6/50], accuracy : 0.9537, f1_score : 0.9530
Loss in 7-100 0.12087287686765194
Loss in 7-200 0.10799279652535915
Epoch [7/50]

In [None]:

torch.save(model.state_dict(), model_PATH)

In [None]:
class MaskTestDataset(MaskBaseDataset):
    def __init__(self, csv, image_dir, transform):
        super().__init__(csv, transform)
        self.image_dir = image_dir

    def read_image(self, index):
        img_path = os.path.join(self.image_dir, self.csv.iloc[index]['ImageID'])
        image = Image.open(img_path)
        return image

    def __getitem__(self, index):
        assert self.transform is not None, ".set_tranform 메소드를 이용하여 transform 을 주입해주세요"

        image = self.read_image(index)
        image_transform = self.transform(image)
        return image_transform

In [None]:
test_image_dir = os.path.join(test_dir, 'images')
test_dataset = MaskTestDataset(submission, test_image_dir, transform)

test_loader = DataLoader(
    test_dataset,
    batch_size=64,
    shuffle=False
)

model.eval()
predictions = []

with torch.no_grad():
    for images in test_loader:
        images = images.to(device)
        outputs = model(images)
        (mask_outputs, gender_outputs, age_outputs) = torch.split(outputs, [3, 2, 3], dim=1)

        mask_preds = torch.argmax(mask_outputs, dim=1)
        gender_preds = torch.argmax(gender_outputs, dim=1)
        age_preds = torch.argmax(age_outputs, dim=1)

        for i in range(len(images)):
            predictions.append((mask_preds[i].item(), gender_preds[i].item(), age_preds[i].item()))

for i, (mask_pred, gender_pred, age_pred) in enumerate(predictions):
    submission.loc[i, 'ans'] = multi_label_to_class(mask_pred, gender_pred, age_pred)

submission.to_csv('submission.csv', index=False)


In [None]:

# model.eval()
# # 모델이 테스트 데이터셋을 예측하고 결과를 저장합니다.
# all_predictions = []
# for images, labels in train_loader:
#     with torch.no_grad():
#         images = images.to(device)
#         pred = model(images)
#         print(pred)
#         pred = pred.argmax(dim=-1)
#         all_predictions.extend(pred.cpu().numpy())
# submission['ans'] = all_predictions

# # 제출할 파일을 저장합니다.
# submission.to_csv(os.path.join(test_dir, 'submission.csv'), index=False)
# print('test inference is done!')