## 0. Libarary 불러오기 및 경로설정

In [3]:
import os
import pandas as pd
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import torchvision.models as models
from sklearn.model_selection import train_test_split

import wandb
from tqdm import tqdm, tqdm_notebook
import torchvision.transforms as transforms
import torch.optim as optim
from torchvision import transforms
from PIL import Image

In [4]:
# 테스트 데이터셋 폴더 경로를 지정해주세요.
train_dir = '/opt/ml/input/data/train'
test_dir = '/opt/ml/input/data/eval'

## 1. Model 정의

In [5]:
import torchvision.models as models
import torch.nn as nn

class MyNet(nn.Module):
    def __init__(self ,num_classes: int = 1000):
        super(MyNet, self).__init__()
        self.model = models.resnet18(pretrained=True)
        self.fc2 = nn.Linear(1000,num_classes)
    def forward(self,x):
        x = self.model(x)
        return self.fc2(x)
    

## 2. Test Dataset 정의 (Gender, Mask, Age)

In [6]:
class GenderDataset(Dataset):
    def __init__(self, img_paths, transform, train):
        self.img_paths = img_paths
        self.transform = transform
        self.train = train

    def __getitem__(self, index):
        image = Image.open(self.img_paths[index])
        label = self.process_labels(self.img_paths[index])

        if self.transform:
            image = self.transform(image)
            
        if self.train:
            return image,label
        else:
            return image

    def __len__(self):
        return len(self.img_paths)
    
    def process_labels(self,path):
        info,mask = path.split('/')[-2:]
        _,gender,race,age = info.split('_')
        mask = mask.split('.')[0]
        label = {'female':0,'male':1}
        
        return label[gender]
    
class MaskDataset(Dataset):
    def __init__(self, img_paths, transform, train):
        self.img_paths = img_paths
        self.transform = transform
        self.train = train

    def __getitem__(self, index):
        image = Image.open(self.img_paths[index])
        label = self.process_labels(self.img_paths[index])

        if self.transform:
            image = self.transform(image)
            
        if self.train:
            return image,label
        else:
            return image

    def __len__(self):
        return len(self.img_paths)
    
    def process_labels(self,path):
        info,mask = path.split('/')[-2:]
        _,gender,race,age = info.split('_')
        mask = mask.split('.')[0]
        if mask[:-1]=='mask':
            mask='mask'
        label = {'incorrect_mask':0,'mask':1,'normal':2}
        
        return label[mask]
    
class AgeDataset(Dataset):
    def __init__(self, img_paths, transform, train):
        self.img_paths = img_paths
        self.transform = transform
        self.train = train

    def __getitem__(self, index):
        image = Image.open(self.img_paths[index])
        label = self.process_labels(self.img_paths[index])

        if self.transform:
            image = self.transform(image)
            
        if self.train:
            return image,label
        else:
            return image

    def __len__(self):
        return len(self.img_paths)
    
    def process_labels(self,path):
        info,mask = path.split('/')[-2:]
        _,gender,race,age = info.split('_')
        age=int(age)
        if age<30:
            return 0
        elif 30<=age<60:
            return 1
        else:
            return 2


## 3. data 불러오기

In [7]:
# meta 데이터와 이미지 경로를 불러옵니다.
train_df = pd.read_csv(os.path.join(train_dir, 'train.csv'))
image_dir = os.path.join(train_dir, 'images')

# Test Dataset 클래스 객체를 생성하고 DataLoader를 만듭니다.
image_paths = [os.path.join(image_dir, img_id) for img_id in train_df.path]

img_list=[]
for i in image_paths:
    for j in os.listdir(i):
        if j[0]!='.':
            img_list.append(i+'/'+j)

train, test = train_test_split(img_list, test_size=0.1, shuffle=True,random_state=1004)



transform_train = transforms.Compose([
                                # transforms.CenterCrop(32),
                                # transforms.RandomCrop(32, padding=4),
                                # transforms.RandomHorizontalFlip(),
                                transforms.Resize((224,224)),
                                transforms.ToTensor(),
                                transforms.Normalize((0.5070751592371323, 0.48654887331495095, 0.4409178433670343), (0.2673342858792401, 0.2564384629170883, 0.27615047132568404))])

transform_test = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5070751592371323, 0.48654887331495095, 0.4409178433670343), (0.2673342858792401, 0.2564384629170883, 0.27615047132568404)),
])


print('==> Preparing data..')
train, test = train_test_split(img_list, test_size=0.1, shuffle=False)
print("test 개수:",len(test))
print("train 개수:",len(train))

train_dataset = GenderDataset(train, transform_train,train=True)
train_set = DataLoader(train_dataset,shuffle=True)

test_dataset = GenderDataset(test, transform_test,train=True)
test_set =  DataLoader(test_dataset,shuffle=True)


==> Preparing data..
test 개수: 1890
train 개수: 17010


In [None]:
import wandb
from tqdm import tqdm, tqdm_notebook
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

EPOCHS = 30
BATCH_SIZE = 64
LEARNING_RATE = 0.0001

# config={"epochs": EPOCHS, "batch_size": BATCH_SIZE, "learning_rate" : LEARNING_RATE}

# wandb.init(project="mask_classification", entity='hyeyoon')

net = MyNet(num_classes=3)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
net = net.to(device)
print(net)

optimizer = optim.Adam(net.parameters(),lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss()



def adjust_learning_rate(optimizer, epoch, LR):
    if epoch <= 10:
        lr = LR     
    elif epoch <= 15:
        lr = LR * 0.5           
    else:
        lr = LR * 0.1   
            
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr
      

# Training
def train(epoch):
    print('\nEpoch: %d' % epoch)
    net.train()
    train_loss = 0
    correct = 0
    total = 0
    print('LR: {:0.6f}'.format(optimizer.param_groups[0]['lr']))
    for batch_idx, (inputs, targets) in enumerate(tqdm(train_set)):
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
        
    print('Loss: %.3f | Acc: %.3f%% (%d/%d)'
        % (train_loss/(batch_idx+1), 100.*correct/total, correct, total))
    
    # model save
    if epoch%5==0:
      torch.save({
          'epoch': epoch,
          'model_state_dict': net.state_dict(),
          'optimizer_state_dict': optimizer.state_dict(),
          'loss': train_loss/(batch_idx+1),
          "acc": 100*correct/total
          }, f"/result/checkpoint/resnet18_e_{epoch}_{loss}.pt")
      
#     wandb.log({'train_accuracy': 100*correct/total, 'train_loss': train_loss/(batch_idx+1)})

        
def test(epoch):
    global best_acc
    net.eval()
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(tqdm(test_set)):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = net(inputs)
            loss = criterion(outputs, targets)

            test_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

        print('Loss: %.3f | Acc: %.3f%% (%d/%d)'
            % (test_loss/(batch_idx+1), 100.*correct/total, correct, total))
        
#         wandb.log({'test_accuracy': 100*correct/total, 'test_loss': test_loss/(batch_idx+1)})
   
start_epoch = 1
for epoch in range(start_epoch, start_epoch+EPOCHS):
    adjust_learning_rate(optimizer, epoch, LR=LEARNING_RATE)
    
    train(epoch)
    test(epoch)

  0%|          | 0/17010 [00:00<?, ?it/s]

MyNet(
  (model): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running

  0%|          | 41/17010 [00:06<31:28,  8.98it/s]  