# 마스크 착용 여부 이미지 분류

이미지를 시각화하는게 필요해서 노트북으로 옮긴다. (220224 10:49)

## Import

In [106]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.models as models
import albumentations
import collections
import os
import random
import time

from torch.utils.data import Dataset, DataLoader
from torch.utils.data import Subset
from albumentations.pytorch.transforms import ToTensorV2
from sklearn.model_selection import StratifiedShuffleSplit
from PIL import Image

In [79]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda', index=0)

## Seed 설정

In [80]:
SEED = 3086
random.seed(SEED)
np.random.seed(SEED)
os.environ["PYTHONHASHSEED"] = str(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)  # type: ignore
torch.backends.cudnn.deterministic = True  # type: ignore
torch.backends.cudnn.benchmark = True  # type: ignore

## Dataset

In [53]:
class MaskedFaceDataset(Dataset):
    def __init__(self, csv_path, kind=None, transform=None, train=True):
        self.kind = kind # mask, gender, age
        self.csv_path = csv_path
        self.transform = transform
        self.train = train
        self.df = pd.read_csv(self.csv_path)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]

        if self.train:
            img = Image.open(row['path'])
            label = row[self.kind]
            if self.transform:
                img = self.transform(image=np.array(img))['image']
        else:
            img = Image.open(row['ImageID'])
            label = row['ans']
        return img, label

    def __len__(self):
        return len(self.df)

In [23]:
train_transform = albumentations.Compose([ToTensorV2()]);

In [71]:
train_csv_path = '../input/data/train/full_path_three_label.csv'

In [74]:
train_mask_data_set = MaskedFaceDataset(csv_path=train_csv_path, kind='mask', 
                                        transform=train_transform, train=True)

## train, validation 나누기

In [75]:
y_train_mask = [y for _, y in train_mask_data_set]
counter_mask_train = collections.Counter(y_train_mask)
print(counter_mask_train)

Counter({0: 13500, 1: 2700, 2: 2700})


In [84]:
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=SEED)

In [88]:
indices = list(range(len(y_train_mask)))
for train_index, val_index in sss.split(indices, y_train_mask):
    print('train:', train_index, 'val:', val_index)
    print(len(train_index), len(val_index))

train: [11632 10335 12271 ...   607   462  3491] val: [18425  6054 16566 ...  1993 12262 18655]
15120 3780


In [91]:
train_mask_ds = Subset(train_mask_data_set, train_index)
val_mask_ds = Subset(train_mask_data_set, val_index)

In [94]:
y_train = [y for _, y in train_mask_ds]
y_val = [y for _, y in val_mask_ds]

counter_train = collections.Counter(y_train)
counter_val = collections.Counter(y_val)
print(counter_train)
print(counter_val)

Counter({0: 10800, 1: 2160, 2: 2160})
Counter({0: 2700, 1: 540, 2: 540})


## Train

In [98]:
EPOCH = 5
BATCH_SIZE = 16
LEARNING_RATE = 1e-3
criterion = nn.CrossEntropyLoss()

In [101]:
train_dl = DataLoader(train_mask_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
val_dl = DataLoader(val_mask_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=4) 

모델 불러오기 -> resnet34

In [104]:
resnet34 = models.resnet34(pretrained=True)
num_ftrs = resnet34.fc.in_features
resnet34.fc = nn.Linear(num_ftrs, 3)

In [107]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # 각 에폭(epoch)은 학습 단계와 검증 단계를 갖습니다.
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # 모델을 학습 모드로 설정
            else:
                model.eval()   # 모델을 평가 모드로 설정

            running_loss = 0.0
            running_corrects = 0

            # 데이터를 반복
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # 매개변수 경사도를 0으로 설정
                optimizer.zero_grad()

                # 순전파
                # 학습 시에만 연산 기록을 추적
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # 학습 단계인 경우 역전파 + 최적화
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # 통계
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))

            # 모델을 깊은 복사(deep copy)함
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # 가장 나은 모델 가중치를 불러옴
    model.load_state_dict(best_model_wts)
    return model