### 모델 로드

In [1]:
import timm
model = timm.create_model('mixer_b16_224', pretrained=True)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import torch
import torch.nn as nn
checkpoint = torch.load("C:/Users/USER/Downloads/mlp_mixer_img21_b16.pth", map_location='cpu') # GPU 환경이 아닌 경우 'cpu'를 사용합니다.
model.load_state_dict(checkpoint)

# 모델의 마지막 분류기 부분을 새로운 클래스 수에 맞게 변경
num_classes = 3  # 새로운 클래스 수
model.head = nn.Linear(model.head.in_features, num_classes)

In [3]:
model

MlpMixer(
  (stem): PatchEmbed(
    (proj): Conv2d(3, 768, kernel_size=(16, 16), stride=(16, 16))
    (norm): Identity()
  )
  (blocks): Sequential(
    (0): MixerBlock(
      (norm1): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
      (mlp_tokens): Mlp(
        (fc1): Linear(in_features=196, out_features=384, bias=True)
        (act): GELU(approximate='none')
        (drop1): Dropout(p=0.0, inplace=False)
        (norm): Identity()
        (fc2): Linear(in_features=384, out_features=196, bias=True)
        (drop2): Dropout(p=0.0, inplace=False)
      )
      (drop_path): Identity()
      (norm2): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
      (mlp_channels): Mlp(
        (fc1): Linear(in_features=768, out_features=3072, bias=True)
        (act): GELU(approximate='none')
        (drop1): Dropout(p=0.0, inplace=False)
        (norm): Identity()
        (fc2): Linear(in_features=3072, out_features=768, bias=True)
        (drop2): Dropout(p=0.0, inplace=False)
      

### 데이터 셋

In [4]:
import logging
import os
import json
from PIL import Image
from torchvision.transforms import ToTensor
from torchvision import transforms, datasets
from torch.utils.data import DataLoader, RandomSampler, DistributedSampler, SequentialSampler

class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, img_dir,annotations_dir,  transform=None):
        """
        annotation_dir (string): 메타데이터가 있는 JSON 파일의 경로
        img_dir (string): 모든 이미지가 있는 디렉토리의 경로
        transform (callable, optional): 샘플에 적용될 선택적 변환
        """
        self.img_dir = img_dir
        self.transform = transform
        
        self.annotation_dir= annotations_dir
        

    def __len__(self):
        label_list= os.listdir(self.annotation_dir)
        return len(label_list)

    def __getitem__(self, idx):
        label_list= os.listdir(self.annotation_dir)
        
        img_path = os.path.join(self.img_dir, label_list[idx].split('.')[0]+'.jpg')
        image = Image.open(img_path)
        
        # faceExp_uploader 부분만 라벨로 사용
        with open(self.annotation_dir+'/'+label_list[idx],'r', encoding='utf-8') as f:
            self.image_labels=json.load(f)
        label = self.image_labels['faceExp_uploader']
        label_to_int = {'기쁨': 0, '당황': 1, '중립': 2}

        # 문자열 라벨을 정수로 매핑
        label_int = label_to_int[label]
        label_tensor = torch.tensor(label_int, dtype=torch.long)
        
        if self.transform:
            image_tensor = self.transform(image)
        else:
            # 기본적으로 이미지를 Tensor로 변환
            transform = ToTensor()
            image_tensor = transform(image)
        
        return image_tensor, label_tensor

In [5]:
def get_loader(img_size, train_img_dir,train_annotation_dir, test_img_dir, test_annotation_dir, train_batch_size, eval_batch_size):

    transform_train = transforms.Compose([
        #주어진 이미지에서 임의의 크기 및 비율로 크롭한 후, 지정된 크기로 이미지를 리사이즈합니다, 5%~100% 사이의 영역을 crop 하여 resizing, crop되는 부분은 랜덤, ex) 중앙하단, 왼쪽 상단 등
        transforms.RandomResizedCrop((img_size, img_size), scale=(0.05, 1.0)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]), # 픽셀에 normalize 적용, 각 채널의 평균과 표준 편차, pre trained된 데이터의 통계를 기반으로 설정
    ])
    transform_test = transforms.Compose([
        transforms.Resize((img_size, img_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
    ])

    trainset = CustomDataset(img_dir=train_img_dir,
                             annotations_dir=train_annotation_dir,
                             transform=transform_train)
    testset = CustomDataset(img_dir=test_img_dir,
                            annotations_dir=test_annotation_dir,
                            transform=transform_test)
    

    train_sampler = RandomSampler(trainset) # 데이터셋에서 무작위로 샘플을 선택,  데이터셋의 인덱스를 무작위로 섞어서 데이터의 순서를 랜덤하게 배치, 모델이 순서에 의존하지 않고 학습함
    test_sampler = SequentialSampler(testset) # 데이터셋에서 순차적으로 샘플을 선택, 데이터를 처음부터 끝까지 순서대로 샘플링
    train_loader = DataLoader(trainset,
                              sampler=train_sampler,
                              batch_size=train_batch_size,
                              num_workers=0,
                              pin_memory=True)
    test_loader = DataLoader(testset,
                             sampler=test_sampler,
                             batch_size=eval_batch_size,
                             num_workers=0,
                             pin_memory=True) if testset is not None else None

    return train_loader, test_loader

### 학습

In [6]:
#학습 시 하이퍼 파라미터 지정
num_epochs=10
train_batch_size = 64  # 훈련 배치 크기
eval_batch_size = 64  # 평가 배치 크기
train_img_dir= 'C:/Users/USER/Desktop/train'
train_annotation_dir= 'C:/Users/USER/Desktop/train_label'
test_img_dir='C:/Users/USER/Desktop/valid'
test_annotaion_dir='C:/Users/USER/Desktop/valid_label'
name= 'yong_mlp'
output_dir= 'output'
eval_every = 113  # 몇 스텝마다 평가를 할 것인지
learning_rate = 3e-2  # 초기 학습률
weight_decay = 0  # 가중치 감소율
sweight_decay = 0  # 가중치 감소율
num_steps = 113 * num_epochs  # 총 훈련 스텝
decay_type = "cosine"  # 학습률 감소 방식
seed = 42  # 초기화를 위한 랜덤 시드
n_gpu=1
decay_type = "cosine"  # 학습률 감소 방식
gradient_accumulation_steps = 1  # 업데이트를 위해 누적할 스텝 수
warmup_steps = 500  # 웜업을 위한 스텝 수
max_grad_norm = 1.0  # 최대 그래디언트 노름

In [7]:
cd C:/Users/USER/Downloads/MLP-Mixer-Pytorch/MLP-Mixer-Pytorch-main

C:\Users\USER\Downloads\MLP-Mixer-Pytorch\MLP-Mixer-Pytorch-main


In [8]:
pwd

'C:\\Users\\USER\\Downloads\\MLP-Mixer-Pytorch\\MLP-Mixer-Pytorch-main'

In [9]:
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import math
from torch.optim.lr_scheduler import LambdaLR
from torch.nn.utils import clip_grad_norm_
import random
import numpy as np
import logging

from torch.utils.tensorboard import SummaryWriter

logger = logging.getLogger(__name__)

#스케줄러
class WarmupCosineSchedule(LambdaLR):
    """ Linear warmup and then cosine decay.
        Linearly increases learning rate from 0 to 1 over `warmup_steps` training steps.
        Decreases learning rate from 1. to 0. over remaining `t_total - warmup_steps` steps following a cosine curve.
        If `cycles` (default=0.5) is different from default, learning rate follows cosine function after warmup.
    """
    def __init__(self, optimizer, warmup_steps, t_total, cycles=.5, last_epoch=-1):
        self.warmup_steps = warmup_steps
        self.t_total = t_total
        self.cycles = cycles
        super(WarmupCosineSchedule, self).__init__(optimizer, self.lr_lambda, last_epoch=last_epoch)

    def lr_lambda(self, step):
        if step < self.warmup_steps:
            return float(step) / float(max(1.0, self.warmup_steps))
        # progress after warmup
        progress = float(step - self.warmup_steps) / float(max(1, self.t_total - self.warmup_steps))
        return max(0.0, 0.5 * (1. + math.cos(math.pi * float(self.cycles) * 2.0 * progress)))

class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

def valid(model, writer, test_loader,eval_batch_size, global_step,device):
    # Validation!
    eval_losses = AverageMeter()

    logger.info("***** Running Validation *****")
    logger.info("  Num steps = %d", len(test_loader))
    logger.info("  Batch size = %d", eval_batch_size)

    model.eval()
    all_preds, all_label = [], []
    epoch_iterator = tqdm(test_loader,
                          desc="Validating... (loss=X.X)",
                          bar_format="{l_bar}{r_bar}",
                          dynamic_ncols=True)
    loss_fct = torch.nn.CrossEntropyLoss()
    for step, batch in enumerate(epoch_iterator):
        batch = tuple(t.to(device) for t in batch)
        x, y = batch
        with torch.no_grad():
            
            logits = model(x)
            eval_loss = loss_fct(logits, y)
            eval_losses.update(eval_loss.item())

            preds = torch.argmax(logits, dim=-1)

        if len(all_preds) == 0:
            all_preds.append(preds.detach().cpu().numpy())
            all_label.append(y.detach().cpu().numpy())
        else:
            all_preds[0] = np.append(
                all_preds[0], preds.detach().cpu().numpy(), axis=0
            )
            all_label[0] = np.append(
                all_label[0], y.detach().cpu().numpy(), axis=0
            )
        epoch_iterator.set_description("Validating... (loss=%2.5f)" % eval_losses.val)

    all_preds, all_label = all_preds[0], all_label[0]
    accuracy = simple_accuracy(all_preds, all_label)
    print("Global Steps: %d" % global_step)
    print("Valid Loss: %2.5f" % eval_losses.avg)
    print("Valid Accuracy: %2.5f" % accuracy)

    logger.info("/n")
    logger.info("Validation Results")
    logger.info("Global Steps: %d" % global_step)
    logger.info("Valid Loss: %2.5f" % eval_losses.avg)
    logger.info("Valid Accuracy: %2.5f" % accuracy)

    writer.add_scalar("test/accuracy", scalar_value=accuracy, global_step=global_step)
    return accuracy

def save_model(model, output_dir, name):
    model_to_save = model.module if hasattr(model, 'module') else model
    model_checkpoint = os.path.join(output_dir, "%s_checkpoint.bin" % name)
    torch.save(model_to_save.state_dict(), model_checkpoint)
    logger.info("Saved model checkpoint to [DIR: %s]", output_dir)

def simple_accuracy(preds, labels):
    return (preds == labels).mean()

 



device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

os.makedirs(output_dir, exist_ok=True)
writer = SummaryWriter(log_dir=os.path.join("logs", name))

train_loader, test_loader = get_loader(224, train_img_dir, train_annotation_dir, test_img_dir, test_annotaion_dir, train_batch_size, eval_batch_size)

optimizer = torch.optim.SGD(model.parameters(),
                            lr=learning_rate,
                            momentum=0.9,
                            weight_decay=weight_decay)
t_total = num_steps

# 스케줄러
scheduler = WarmupCosineSchedule(optimizer, warmup_steps=warmup_steps, t_total=t_total)

model.zero_grad()
# set seed
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if n_gpu > 0:
    torch.cuda.manual_seed_all(seed)

losses = AverageMeter()
global_step, best_acc = 0, 0

# epoch 단위로 학습 시작
for epoch in range(num_epochs):
    model.train()
    epoch_iterator = tqdm(train_loader,
                          desc=f"Epoch {epoch + 1}/{num_epochs}",
                          bar_format="{l_bar}{r_bar}",
                          dynamic_ncols=True)
    for step, batch in enumerate(epoch_iterator):
        batch = tuple(t.to(device) for t in batch)
        x, y = batch
        outputs = model(x)
        loss_fn = nn.CrossEntropyLoss()
        loss = loss_fn(outputs, y)
        loss.backward()

        if (step + 1) % gradient_accumulation_steps == 0:
            losses.update(loss.item() * gradient_accumulation_steps)
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)

            scheduler.step()
            optimizer.step()
            optimizer.zero_grad()
            global_step += 1

            epoch_iterator.set_description(
                f"Epoch {epoch + 1}/{num_epochs} (Step {global_step} / {t_total}) (Loss={losses.val:.5f})"
            )
            writer.add_scalar("train/loss", scalar_value=losses.val, global_step=global_step)
            writer.add_scalar("train/lr", scalar_value=scheduler.get_lr()[0], global_step=global_step)

            if global_step % eval_every == 0:
                accuracy = valid(model, writer, test_loader, eval_batch_size, global_step, device)

                if best_acc < accuracy:
                    save_model(model, output_dir, name)
                    best_acc = accuracy
                model.train()
    
    losses.reset()

writer.close()



Validating... (loss=0.79865): 100%|| 29/29 [02:51<00:00,  5.92s/it]00:06,  6.96s/it]


Global Steps: 113
Valid Loss: 0.91199
Valid Accuracy: 0.54500


Epoch 1/10 (Step 113 / 1130) (Loss=0.82764): 100%|| 113/113 [15:43<00:00,  8.35s/it]
Validating... (loss=0.54619): 100%|| 29/29 [02:50<00:00,  5.89s/it]00:06,  6.99s/it]
Epoch 2/10 (Step 226 / 1130) (Loss=0.66722): 100%|| 113/113 [15:42<00:00, 57.27s/it]

Global Steps: 226
Valid Loss: 0.68390
Valid Accuracy: 0.70056


Epoch 2/10 (Step 226 / 1130) (Loss=0.66722): 100%|| 113/113 [15:42<00:00,  8.34s/it]
Validating... (loss=0.44157): 100%|| 29/29 [02:50<00:00,  5.89s/it]00:06,  6.84s/it]


Global Steps: 339
Valid Loss: 0.55903
Valid Accuracy: 0.76667


Epoch 3/10 (Step 339 / 1130) (Loss=0.68471): 100%|| 113/113 [15:41<00:00,  8.33s/it]
Validating... (loss=0.62090): 100%|| 29/29 [02:50<00:00,  5.89s/it]00:06,  6.82s/it]
Epoch 4/10 (Step 452 / 1130) (Loss=0.37589): 100%|| 113/113 [15:41<00:00,  8.33s/it]


Global Steps: 452
Valid Loss: 0.54766
Valid Accuracy: 0.76556


Validating... (loss=0.79781): 100%|| 29/29 [02:50<00:00,  5.90s/it]00:06,  6.82s/it]
Epoch 5/10 (Step 565 / 1130) (Loss=0.45882): 100%|| 113/113 [15:45<00:00,  8.36s/it]


Global Steps: 565
Valid Loss: 0.55578
Valid Accuracy: 0.75833


Validating... (loss=0.33925): 100%|| 29/29 [02:51<00:00,  5.91s/it]00:06,  6.69s/it]
Epoch 6/10 (Step 678 / 1130) (Loss=0.64397): 100%|| 113/113 [15:52<00:00, 57.17s/it]

Global Steps: 678
Valid Loss: 0.48137
Valid Accuracy: 0.80500


Epoch 6/10 (Step 678 / 1130) (Loss=0.64397): 100%|| 113/113 [15:52<00:00,  8.43s/it]
Validating... (loss=0.26935): 100%|| 29/29 [02:50<00:00,  5.88s/it]00:06,  6.83s/it]


Global Steps: 791
Valid Loss: 0.43961
Valid Accuracy: 0.81111


Epoch 7/10 (Step 791 / 1130) (Loss=0.52183): 100%|| 113/113 [15:47<00:00,  8.39s/it]
Epoch 8/10 (Step 820 / 1130) (Loss=0.31907):  26%|| 29/113 [03:19<09:54,  7.08s/it]