In [None]:
import os
import pandas as pd
from PIL import Image

from tqdm import tqdm

data_dir = '../input/data/train/'
test_dir = '../input/data/eval/'
submission_dir = './submission/'
image_data_dir = data_dir + 'images/'

## 데이터 전처리

In [None]:
train_df = pd.read_csv(data_dir + 'train.csv')
submission = pd.read_csv(test_dir + 'info.csv')

In [None]:
'''
신규범님 코드 참고

학습 데이터 구축
'''
def age_group(x):
    if x < 30: return 0
    elif x < 60: return 1
    else: return 2


df = []

for idx, line in tqdm(enumerate(train_df.iloc)):
    for file in list(os.listdir(os.path.join(image_data_dir, line['path']))):
        if file[0] == '.':
            continue
        if file.split('.')[0] == 'normal':
            mask = 2
        elif file.split('.')[0] == 'incorrect_mask':
            mask = 1
        else:
            mask = 0
        gender = 0 if line['gender'] == 'male' else 1
        data = {
            'id' : line['id'],
            'gender' : line['gender'],
            'age_group' : age_group(line['age']),
            'mask' : mask,
            'path': os.path.join(image_data_dir, line['path'], file),
            'label': mask * 6 + gender * 3 + age_group(line['age'])
        }
        df.append(data)

df = pd.DataFrame(df)

In [None]:
'''
데이터셋 분리
'''
from sklearn.model_selection import train_test_split

train_idx, val_idx = train_test_split(df['label'], train_size = 0.8, random_state = 22, stratify = df['label'])
                                      
train_set, val_set = df.iloc[train_idx.index, :], df.iloc[val_idx.index, :]

## 모델 구축

In [None]:
'''
ResNet 모델 구축

'''

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.backends.cudnn as cudnn
import torch.optim as optim

from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.transforms import Resize, ToTensor, Normalize
import torchvision.models as models

## 데이터셋 구축

In [None]:
'''
Sample_submission 코드 참고

데이터 셋 구축
'''

class CustomDataset(Dataset):
    def __init__(self, df, transform, train = True):
        
        image_dir = '../input/data/eval/images'
        
        self.train = train
        self.df = df
        if self.train:
            self.img_paths = self.df['path'].tolist()
            self.labels = self.df['label'].tolist()
        else:
            self.img_paths = [os.path.join(image_dir, img_id) for img_id in self.df.ImageID]
        self.transform = transform

    def __getitem__(self, index):
        image = Image.open(self.img_paths[index])
        if self.transform:
            image = self.transform(image)
        
        if self.train: return image, torch.tensor(self.labels[index])
        else: return image

    def __len__(self):
        return len(self.img_paths)

## 학습 설정

In [None]:
'''
학습 함수 설정
'''

def train(model, data_loader, optimizer, criterion):
    model.train()
    train_loss = 0
    correct = 0
    total = 0
    for batch_idx, (images, targets) in enumerate(data_loader):
        images, targets = images.to(device), targets.to(device)
        optimizer.zero_grad()

        benign_outputs = model(images)
        loss = criterion(benign_outputs, targets)
        loss.backward()

        optimizer.step()
        train_loss += loss.item()
        _, predicted = benign_outputs.max(1)

        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
    
    train_loss /= len(data_loader)
    acc = correct / total
    
    return train_loss, acc


def val(model, data_loader, criterion):
    model.eval()
    val_loss = 0
    correct = 0
    total = 0
    
    for batch_idx, (images, targets) in enumerate(data_loader):
        with torch.no_grad():
            images, targets = images.to(device), targets.to(device)
            benign_outputs = model(images)
            loss = criterion(benign_outputs, targets)
            val_loss += loss.item()
            _, predicted = benign_outputs.max(1)

            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()
    
    val_loss /= len(data_loader)
    acc = correct / total
    
    return val_loss, acc

def pred(model, data_loader):
    all_predictions = []
    for images in data_loader:
        with torch.no_grad():
            images = images.to(device)
            pred = model(images)
            pred = pred.argmax(dim=-1)
            all_predictions.extend(pred.cpu().numpy())
    
    return all_predictions

In [None]:
'''
학습 설정
'''

device = 'cuda' if torch.cuda.is_available() else 'cpu'
lr = 0.01
epochs = 10
batch_size = 128

In [None]:
'''
데이터 로더 생성
'''

transform = transforms.Compose([
    Resize((128, 128), Image.BILINEAR),
    ToTensor(),
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.2, 0.2, 0.2)),
])

train_customset = CustomDataset(df = train_set, transform = transform, train = True)
val_customset = CustomDataset(df = val_set, transform = transform, train = True)
test_customset = CustomDataset(df = submission, transform = transform, train = False)

train_loader = DataLoader(
    train_customset,
    batch_size = batch_size,
    shuffle=True,
)

val_loader = DataLoader(
    val_customset,
    batch_size = batch_size,
    shuffle=True,
)

test_loader = DataLoader(
    test_customset,
    batch_size = batch_size,
    shuffle=False,
)

In [None]:
'''
모델 설정
'''
model = models.resnet18(pretrained=True).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

In [None]:
len(train_loader)

In [None]:
model.fc = torch.nn.Linear(in_features=512, out_features=18, bias=True).to(device)

## 학습

In [None]:
for epoch in tqdm(range(1, epochs + 1)):
    train_loss, train_acc = train(model = model, data_loader = train_loader, optimizer = optimizer, criterion = criterion)
    val_loss, val_acc = val(model = model, data_loader = val_loader, criterion = criterion)
    
    print(f'epoch : {epoch}, train_loss : {train_loss}, train_acc : {train_acc}, val_loss : {val_loss}, val_acc : {val_acc}')

## 예측

In [None]:
# 모델이 테스트 데이터셋을 예측하고 결과를 저장합니다.
all_predictions = pred(model = model, data_loader = test_loader)
submission['ans'] = all_predictions

# 제출할 파일을 저장합니다.
submission.to_csv(os.path.join(submission_dir, 'resnet18_pretrained.csv'), index=False)
print('test inference is done!')

In [None]:
submission.head()

In [None]:
model