In [1]:
import random
import pandas as pd
import numpy as np
import os
import re
import glob
import cv2

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import torchvision.models as models
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn.metrics import f1_score
from sklearn.metrics import classification_report
from tqdm import tqdm
import warnings
import matplotlib.pyplot as plt
from PIL import Image,ImageOps
warnings.filterwarnings(action='ignore') 
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [2]:
CFG={
    'IMG_SIZE':256,
    'BATCH_SIZE':16,
    'LEARNING_RATE':1e-5,
    'SEED':42,
    'EPOCHS':5
}

In [3]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) 

In [4]:
label_path='/kaggle/input/labels/label'
train_label_path=os.path.join(label_path,'train')
val_label_path=os.path.join(label_path,'val')
label_json_list=['anger.json','happy.json','panic.json','sadness.json']

In [14]:
train_df=pd.DataFrame(columns=['img_path','gender','age','maxX','maxY','minX','minY','label'])
val_df=pd.DataFrame(columns=['img_path','gender','age','maxX','maxY','minX','minY','label'])

In [15]:
train_image_path_dict=dict()
val_image_path_dict=dict()
exp=['anger','happy','panic','sadness']

In [16]:
import shutil
extension=set()
for i in range(1,8):
    path='/kaggle/input/images/img-00'+str(i)+'/img/'
    if os.path.isdir(os.path.join(path,'train')):
        train_path=os.path.join(path,'train')
    if os.path.isdir(os.path.join(path,'val')):
        val_path=os.path.join(path,'val')
    for j in exp:
        if os.path.isdir(os.path.join(train_path,j)):
            train_exp_path=os.path.join(train_path,j)
        if os.path.isdir(os.path.join(val_path,j)):
            val_exp_path=os.path.join(val_path,j)
        for j in [train_exp_path,val_exp_path]:
            cate=j.split('/')[-2]
            for f in os.listdir(j):
                src=os.path.join(j,f)
                extension.add(f.split('.')[-1])
                if cate=='train':
                    train_image_path_dict[f]=src
                else:
                    val_image_path_dict[f]=src

In [20]:
import json
def make_df(dir_path,df,image_path_dict):
    i=0
    data_name=dir_path.split('/')[-1]
    for label_name in label_json_list:
        with open(os.path.join(dir_path,data_name+'_'+label_name),'r',encoding='cp949') as f:
            file=json.load(f)
        for v in file:
            if v['filename'].split('.')[-1]=='jpeg':
                continue
            if v['gender']=='남':
                gender=0
            else:
                gender=1
            if v['faceExp_uploader']=='분노':
                label='anger'
            elif v['faceExp_uploader']=='기쁨':
                label='happy'
            elif v['faceExp_uploader']=='당황':
                label='panic'
            elif v['faceExp_uploader']=='슬픔':
                label='sadness'
            minX=(v['annot_A']['boxes']['minX']+v['annot_B']['boxes']['minX']+v['annot_C']['boxes']['minX'])/3
            minY=(v['annot_A']['boxes']['minY']+v['annot_B']['boxes']['minY']+v['annot_C']['boxes']['minY'])/3
            maxX=(v['annot_A']['boxes']['maxX']+v['annot_B']['boxes']['maxX']+v['annot_C']['boxes']['maxX'])/3
            maxY=(v['annot_A']['boxes']['maxY']+v['annot_B']['boxes']['maxY']+v['annot_C']['boxes']['maxY'])/3
            
            df.loc[i]=[image_path_dict[v['filename']],gender,v['age'],int(maxX),int(maxY),int(minX),int(minY),label]
            i+=1
    return df.sample(frac=1).reset_index(drop=True)

In [21]:
train_df=make_df(train_label_path,train_df,train_image_path_dict)
val_df=make_df(val_label_path,val_df,val_image_path_dict)

In [None]:
from sklearn.preprocessing import LabelEncoder
le=LabelEncoder()
train_df['label']=le.fit_transform(train_df['label'])
val_df['label']=le.transform(val_df['label'])

In [None]:
import matplotlib.pyplot as plt
fig, axs = plt.subplots(nrows=1, ncols=5, figsize=(20, 20))
for i in range(5):
    image_file = train_df.loc[i,'img_path']
    minY, minX, maxX, maxY = map(int, [train_df.loc[i,'minY'], train_df.loc[i,'minX'], train_df.loc[i,'maxX'], train_df.loc[i,'maxY']])
    img=cv2.imread(image_file)
    img=cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
    img=img[minY:maxY,minX:maxX]
    axs[i].imshow(img)
    axs[i].axis('off')
    axs[i].set_title(le.classes_[train_df.loc[i,'label']], size='large')

In [42]:
import timm
model = timm.create_model('timm/swinv2_large_window12to16_192to256.ms_in22k_ft_in1k', pretrained=True,num_classes=1)
model = torch.nn.DataParallel(model)


In [32]:
from torchvision.transforms import Compose, Resize, Normalize, ToTensor,RandomHorizontalFlip,RandomRotation,ColorJitter
class CustomDataset(Dataset):
    def __init__(self, img_path, gender, maxX, maxY, minX, minY, transforms=None):
        self.img_path = img_path
        self.transforms = transforms
        self.maxX = maxX
        self.maxY = maxY
        self.minX = minX
        self.minY = minY
        self.gender = gender
        
    def __getitem__(self, index):
        image_path = self.img_path[index]
        image = Image.open(image_path).convert("RGB")
        image = ImageOps.exif_transpose(image)
        minY, minX, maxX, maxY = int(self.minY[index]), int(self.minX[index]), int(self.maxX[index]), int(self.maxY[index])
        image = image.crop((minX, minY, maxX, maxY))
        
        if self.transforms is not None:
            image = self.transforms(image)
        if self.gender is not None:
            gender=self.gender[index]
            return image, gender
        else:
            return image
        
    def __len__(self):
        return len(self.img_path)

train_transform = Compose([
    Resize((CFG['IMG_SIZE'], CFG['IMG_SIZE'])), 
    ToTensor(),
    Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
])

test_transform = Compose([
    Resize((CFG['IMG_SIZE'],CFG['IMG_SIZE'])),
    ToTensor(),
    Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
])
train_ds=CustomDataset(train_df['img_path'].values,train_df['gender'].values,train_df['maxX'].values,train_df['maxY'].values,train_df['minX'].values,train_df['minY'].values,train_transform)
val_ds=CustomDataset(val_df['img_path'].values,val_df['gender'].values,val_df['maxX'].values,val_df['maxY'].values,val_df['minX'].values,val_df['minY'].values,test_transform)
train_loader=DataLoader(train_ds,batch_size=CFG['BATCH_SIZE'],shuffle=True,num_workers=0)
val_loader=DataLoader(val_ds,batch_size=CFG['BATCH_SIZE'],shuffle=False,num_workers=0)

# 표정 학습

In [None]:
import numpy as np
import torch
from tqdm import tqdm

def train(model, optimizer, train_loader, val_loader, scheduler, device, early_stopping_patience=3):
    model.to(device)
    criterion = torch.nn.CrossEntropyLoss().to(device)
    best_score = 0
    best_model = None
    patience = 0

    for epoch in range(1, CFG['EPOCHS'] + 1):
        model.train()
        train_loss = []
        for imgs, labels in tqdm(iter(train_loader)):
            try:
                imgs = imgs.float().to(device)
                labels = labels.to(device)
                optimizer.zero_grad()
                output = model(imgs)
                loss = criterion(output, labels)
                loss.backward()
                optimizer.step()
                train_loss.append(loss.item())
            except:
                continue

        _val_loss, _val_score = validation(model, criterion, val_loader, device)
        _train_loss = np.mean(train_loss)
        current_lr = optimizer.param_groups[0]["lr"]

        print(f'Epoch [{epoch}], Train Loss: [{_train_loss:.5f}], Val Loss: [{_val_loss:.5f}], Val F1 Score: [{_val_score:.5f}], Learning Rate: {current_lr}')


        if scheduler is not None:
            scheduler.step()


        if best_score < _val_score:
            best_score = _val_score
            best_model = model
            patience = 0
            torch.save(best_model, 'beitv2_large.pt')
        else:
            patience += 1


        if patience >= early_stopping_patience:
            print(f'Early stopping triggered at epoch {epoch}!')
            break

    return best_model

def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss = []
    preds, true_labels = [], []
    with torch.no_grad():
        for imgs, labels in tqdm(iter(val_loader)):
            imgs = imgs.float().to(device)
            labels = labels.to(device)
            pred = model(imgs)
            loss = criterion(pred, labels)
            preds += pred.argmax(1).detach().cpu().numpy().tolist()
            true_labels += labels.detach().cpu().numpy().tolist()
            val_loss.append(loss.item())
        _val_loss = np.mean(val_loss)
        _val_score = f1_score(true_labels, preds, average='macro')
    return _val_loss, _val_score


# 성별 학습

In [68]:
import torch
import torch.nn as nn
import numpy as np
from tqdm import tqdm
from sklearn.metrics import accuracy_score
import copy  # deep copy를 위해 import

def train(model, optimizer, train_loader, val_loader, scheduler, device, early_stopping_patience=3):
    criterion = nn.BCEWithLogitsLoss().to(device)
    model.to(device)
    best_model = None
    best_loss = float('inf')
    patience = 0

    for epoch in range(1, CFG['EPOCHS'] + 1):
        train_loss = []
        train_preds, train_gender_true = [], []
        model.train()

        for img, gender in tqdm(iter(train_loader)):
            img = img.float().to(device)
            gender = gender.float().to(device)
            optimizer.zero_grad()
            output = model(img)

            # 차원 맞추기
            output = output.view(-1)
            gender = gender.view(-1)

            loss = criterion(output, gender)
            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())
            train_preds += (torch.sigmoid(output) > 0.5).int().detach().cpu().numpy().tolist()
            train_gender_true += gender.int().detach().cpu().numpy().tolist()

        _train_loss = np.mean(train_loss)
        _train_acc = accuracy_score(train_gender_true, train_preds)
        _val_acc, _val_loss = validation(model, val_loader, criterion, device)
        current_lr = optimizer.param_groups[0]['lr']

        print(f'Epoch [{epoch}], Train Loss: [{_train_loss:.5f}], Train Acc: [{_train_acc:.5f}], '
              f'Val Loss: [{_val_loss:.5f}], Val Acc: [{_val_acc:.5f}], Learning Rate: {current_lr}')

        if scheduler is not None:
            scheduler.step(_val_loss)

        if best_loss > _val_loss:
            best_loss = _val_loss
            best_model = copy.deepcopy(model)
            torch.save(best_model, 'swinv2_gender.pt')
            patience = 0
        else:
            patience += 1

        if patience >= early_stopping_patience:
            print(f'Early stopping triggered at epoch {epoch}!')
            break

    return best_model

def validation(model, val_loader, criterion, device):
    model.eval()
    val_loss = []
    val_preds, val_gender_true = [], []

    with torch.no_grad():
        for img, gender in tqdm(iter(val_loader)):
            img = img.float().to(device)
            gender = gender.float().to(device)
            pred = model(img)

            # 차원 맞추기
            pred = pred.view(-1)
            gender = gender.view(-1)

            loss = criterion(pred, gender)

            val_preds += (torch.sigmoid(pred) > 0.5).int().detach().cpu().numpy().tolist()
            val_gender_true += gender.int().detach().cpu().numpy().tolist()
            val_loss.append(loss.item())

        _val_loss = np.mean(val_loss)
        _val_acc = accuracy_score(val_gender_true, val_preds)

    return _val_acc, _val_loss


# 나이 학습

In [46]:
import torch
import torch.nn as nn
import numpy as np
from tqdm import tqdm
from sklearn.metrics import mean_absolute_error
import copy

def train(model, optimizer, train_loader, val_loader, scheduler, device, early_stopping_patience=3):
    criterion = nn.MSELoss().to(device)
    model.to(device)
    best_model = None
    best_loss = float('inf')
    patience = 0

    for epoch in range(1,CFG['EPOCHS']+1):
        train_loss = []
        train_preds, train_age_true = [], []
        model.train()

        for img, age in tqdm(iter(train_loader)):
            img = img.float().to(device)
            age = age.float().to(device)
            optimizer.zero_grad()
            output = model(img)
            loss = criterion(output, age) 
            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())
            train_preds += output.detach().cpu().numpy().tolist()  
            train_age_true += age.detach().cpu().numpy().tolist()
        _train_loss=np.mean(train_loss)
        _train_mae = mean_absolute_error(train_age_true, train_preds)
        _val_mae, _val_loss= validation(model, val_loader, criterion, device)
        current_lr = optimizer.param_groups[0]['lr']

        print(f'Epoch [{epoch}], Train MAE: [{_train_mae:.5f}], Train Loss: [{_train_loss:.5f}], Val MAE: [{_val_mae:.5f}], Val Loss: [{_val_loss:.5f}], Learning Rate: {current_lr}')

        if scheduler is not None:
            scheduler.step()

        if best_loss > _val_mae:
            best_loss = _val_mae
            best_model = copy.deepcopy(model)
            torch.save(best_model, 'swinv2_age.pt')
            patience = 0
        else:
            patience += 1

        if patience >= early_stopping_patience:
            print(f'Early stopping triggered at epoch {epoch}!')
            break

    return best_model

def validation(model, val_loader, criterion, device):
    model.eval()
    val_loss = []
    val_preds, val_age_true = [], []

    with torch.no_grad():
        for img, age in tqdm(iter(val_loader)):
            img = img.float().to(device)
            age = age.float().to(device)
            pred = model(img)
            loss = criterion(pred, age)  # pred를 squeeze하여 차원을 맞춤

            val_preds += pred.detach().cpu().numpy().tolist()  # flatten을 사용하여 1차원 배열로 변환
            val_age_true += age.detach().cpu().numpy().tolist()
            val_loss.append(loss.item())
        _val_loss=np.mean(val_loss)
        _val_mae = mean_absolute_error(val_age_true, val_preds)

    return _val_mae, _val_loss


In [69]:
model.eval()
optimizer=torch.optim.AdamW(params=model.parameters(),lr=CFG['LEARNING_RATE'])
scheduler=torch.optim.lr_scheduler.CosineAnnealingLR(optimizer,T_max=10)
infer_model=train(model,optimizer,train_loader,val_loader,scheduler,device)

100%|██████████| 369/369 [17:20<00:00,  2.82s/it]
100%|██████████| 73/73 [02:12<00:00,  1.82s/it]


Epoch [1], Train Loss: [0.01439], Train Acc: [0.99575], Val Loss: [0.02683], Val Acc: [0.99057], Learning Rate: 1e-05


100%|██████████| 369/369 [17:25<00:00,  2.83s/it]
100%|██████████| 73/73 [02:08<00:00,  1.76s/it]


Epoch [2], Train Loss: [0.00439], Train Acc: [0.99830], Val Loss: [0.13300], Val Acc: [0.97942], Learning Rate: 9.999289656187745e-06


100%|██████████| 369/369 [17:24<00:00,  2.83s/it]
100%|██████████| 73/73 [02:17<00:00,  1.89s/it]


Epoch [3], Train Loss: [0.00844], Train Acc: [0.99796], Val Loss: [0.01715], Val Acc: [0.99400], Learning Rate: 9.98255114040809e-06


100%|██████████| 369/369 [17:27<00:00,  2.84s/it]
100%|██████████| 73/73 [02:09<00:00,  1.77s/it]


Epoch [4], Train Loss: [0.00777], Train Acc: [0.99864], Val Loss: [0.03346], Val Acc: [0.99400], Learning Rate: 9.999709810960851e-06


100%|██████████| 369/369 [17:12<00:00,  2.80s/it]
100%|██████████| 73/73 [02:10<00:00,  1.79s/it]

Epoch [5], Train Loss: [0.00069], Train Acc: [0.99983], Val Loss: [0.05577], Val Acc: [0.98542], Learning Rate: 9.998894880263873e-06



