In [1]:
import os
import pandas as pd
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import numpy as np
import random
import cv2

from torchvision import transforms
from torchvision.transforms import Resize, ToTensor, Normalize
import torchvision.models as models

In [None]:
train_dir = '/opt/ml/input/data/train/'
train_image_path = '/opt/ml/input/data/train/images/'

test_dir = '/opt/ml/input/data/eval/'
test_image_path = '/opt/ml/input/data/eval/images/'

dt_train = pd.read_csv(train_dir+'train.csv')
dt_train

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
random_seed = 12

torch.manual_seed(random_seed)
torch.cuda.manual_seed(random_seed)
torch.cuda.manual_seed_all(random_seed) # if use multi-GPU
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(random_seed)
random.seed(random_seed)

In [None]:
# 일단은 list로 값 받는 형태로 _ Dataset 안에서 처리할 수도 있을듯
whole_image_path = []
whole_target_label = []

for path in dt_train['path']:
    for file_name in [i for i in os.listdir(train_image_path+path) if '._' not in i]:
        whole_image_path.append(train_image_path+path+'/'+file_name)
        whole_target_label.append((path.split('_')[1], path.split('_')[3], file_name.split('.')[0]))
        
        
# 라벨을 0~17로 할당하는 함수
def onehot_enc(x):
    # x 입력형태 : (gender, age, mask)의 튜플
    def gender(i):
        if i == 'male':
            return 0
        elif i == 'female':
            return 3
    def age(j):
        j = int(j)
        if j < 30:
            return 0
        elif j >= 30 and j < 60:
            return 1
        elif j >= 60:
            return 2
    def mask(k):
        if k == 'normal':
            return 12
        elif 'incorrect' in k:
            return 6
        else:
            return 0
    return gender(x[0]) + age(x[1]) + mask(x[2])

In [None]:
# sr_data : 이미지 데이터의 경로
# sr_label : 이미지 데이터의 라벨정보 (not_encoded)
sr_data = pd.Series(whole_image_path)
sr_label = pd.Series(whole_target_label)
#64, 447

In [None]:
class Dataset_Mask(Dataset):
    def __init__(self, encoding=True, midcrop=True, transform=None):
        self.encoding = encoding
        self.midcrop = midcrop
        self.data = sr_data
        self.label = sr_label
        self.transform = transform
        
        if encoding:
            self.label = self.label.apply(onehot_enc)
        
    def __len__(self):
        return len(sr_data)
    
    def __getitem__(self, idx):
        X = cv2.cvtColor(cv2.imread(self.data[idx]), cv2.COLOR_BGR2RGB)
        y = self.label[idx]
        
        if self.midcrop:
            X = X[64:448]
        
        if self.transform:
            return self.transform(X), y
        return X, y

In [None]:
dataset_mask = Dataset_Mask(transform = transforms.Compose([
                                transforms.ToTensor()
                            ]))

In [None]:
train_size = int(len(dataset_mask) * 0.8)
val_size = int(len(dataset_mask) * 0.2)

train_set, val_set = torch.utils.data.random_split(dataset_mask, [train_size, val_size])
print(f'training data size : {len(train_set)}')
print(f'validation data size : {len(val_set)}')

In [None]:
batch_size = 64
train_dataLoader = DataLoader(dataset = train_set, batch_size=batch_size, shuffle=True, num_workers=0)
val_dataLoader = DataLoader(dataset = val_set, batch_size=batch_size, shuffle=True, num_workers=0)

In [None]:
num_classes = 18
model_vgg19 = models.vgg19(pretrained=True)

print(model_vgg19)

In [None]:
model_vgg19.fc = nn.Sequential(
    nn.Linear(512 * 7 * 7, 4096),
    nn.ReLU(True),
    nn.Dropout(),
    nn.Linear(4096, 4096),
    nn.ReLU(True),
    nn.Dropout(),
    nn.Linear(4096, num_classes),
)

print(model_vgg19)

In [None]:
import torch.nn.init as init

def initialize_weights(model):
    for m in model.modules():
        if isinstance(m, nn.Conv2d):
            init.xavier_uniform_(m.weight.data)
            if m.bias is not None:
                m.bias.data.zero_()
        elif isinstance(m, nn.BatchNorm2d):
            m.weight.data.fill_(1)
            m.bias.data.zero_()
        elif isinstance(m, nn.Linear):
            m.weight.data.normal_(0, 0.01)
            m.bias.data.zero_()

initialize_weights(model_vgg19.classifier)

In [None]:
epochs = 3
learning_rate = 0.0001
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model_vgg19.parameters(), lr=learning_rate, betas=(0.9,0.99))

In [None]:
model_vgg19.to(device)

In [None]:
np.set_printoptions(precision=3)
n_param = 0
for p_idx, (param_name, param) in enumerate(model_vgg19.named_parameters()):
    if param.requires_grad:
        param_numpy = param.detach().cpu().numpy()
        n_param += len(param_numpy.reshape(-1))
        print ("[%d] name:[%s] shape:[%s]."%(p_idx,param_name,param_numpy.shape))
        print ("    val:%s"%(param_numpy.reshape(-1)[:5]))
print ("Total number of parameters:[%s]."%(format(n_param,',d')))

In [None]:
best_val_acc_so_far = 0
best_val_loss_so_far = np.inf

for epoch in range(epochs):
    print("[INFO] Training Epoch : {}".format(epoch))
    model_vgg19.train()

    for i, (X, y) in enumerate(train_dataLoader):
        loss_value = 0
        acc = 0
        
        X = X.to(device)
        y = y.to(device)
        
        _y = model_vgg19(X)
        _pred = torch.argmax(_y, dim=-1)

        loss = criterion(_y, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
                
        loss_value += loss.item()
        acc += (y == _pred).sum().item()

        train_loss = loss_value / batch_size
        train_acc = acc / batch_size

        if i % 10 == 0:
            print(f"STEP[{i}/{len(train_dataLoader)}] Training loss: {train_loss:.3f} / Training accuracy: {train_acc:.3f}")

    with torch.no_grad():
        model_vgg19.eval()

        val_loss_items = []
        val_acc_items = []

        for i, (X, y) in enumerate(val_dataLoader):
            X = X.to(device)
            y = y.to(device)
            
            _y = model_vgg19(X)
            _pred = torch.argmax(_y, dim=-1)
            
            loss_item = criterion(_y, y).item()
            acc_item = (y == _pred).sum().item()

            val_loss_items.append(loss_item)
            val_acc_items.append(acc_item)
            
        cur_val_loss = np.sum(val_loss_items) / len(val_dataLoader)
        cur_val_acc = np.sum(val_acc_items) / len(val_dataLoader)
        
        if cur_val_loss < best_val_loss_so_far:
            best_val_loss_so_far = cur_val_loss
        if cur_val_acc > best_val_acc_so_far:
            best_val_acc_so_far = cur_val_acc

        print(f"[Val] Current acc : {cur_val_acc:.3f} / Current loss : {cur_val_loss:.3f}")
        print(f"[Val] Best acc : {best_val_acc_so_far:.3f} / Best loss : {best_val_loss_so_far:.3f}")

    print(f"EPOCH[{epoch}/{epochs}] Training loss: {train_loss:.3f} / Training accuracy: {train_acc:.3f}")

print("[INFO] Training ALL DONE!!!")

In [None]:
submission = pd.read_csv(test_dir+'info.csv')
submission.head()

In [None]:
image_paths = [os.path.join(test_image_path, img_id) for img_id in submission.ImageID]
test_image = pd.Series(image_paths)

In [None]:
class TestDataset(Dataset):
    def __init__(self, midcrop=True, transform=None):
        self.midcrop = midcrop
        self.data = test_image
        self.transform = transform
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        image = cv2.cvtColor(cv2.imread(self.data[idx]), cv2.COLOR_BGR2RGB)
        
        if self.midcrop:
            image = image[64:448]
        
        if self.transform:
            return self.transform(image)
        
        return image

In [None]:
test_transform = transforms.Compose([
                            transforms.ToTensor()
                        ])

test_set = TestDataset(transform = test_transform)
test_dataLodaer = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=2)

In [None]:
model = model_vgg19(num_classes=num_classes).to(device)
model.eval()

all_predictions = []
for images in test_dataLodaer:
    with torch.no_grad():
        images = images.to(device)
        pred = model(images)
        pred = pred.argmax(dim=-1)
        all_predictions.extend(pred.cpu().numpy())
submission['ans'] = all_predictions

submission.to_csv(os.path.join(test_dir, 'submission.csv'), index=False)
print('test inference is done!')