# **1. 데이터셋 준비**

In [1]:
import torch
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision import transforms

import os
import json
import zipfile
import random
import numpy as np
from PIL import Image

## **1-1. 데이터셋 불러오기**

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**이미지 파일 압축 해제**
- 본인 경로를 제대로 설정했는지 확인
- 세션 디스크에 `data > train` 폴더를 만들어두고 압축 해제 진행하셔야 합니다..!
  - 세션 디스크 내 임시 경로로, 드라이브 x

```
!unzip (데이터 경로) -d (압축 해제할 경로)
```

In [None]:
### 이미지 파일 압축 해제

!unzip /content/drive/MyDrive/Euron/6th-project/final/data/train.zip -d /content/data/train
!unzip /content/drive/MyDrive/Euron/6th-project/final/data/val.zip -d /content/data/val

In [3]:
### annotations 파일 준비

coco_train_path = '/content/drive/MyDrive/Euron/6th-project/final/data/coco_train.json'
coco_val_path = '/content/drive/MyDrive/Euron/6th-project/final/data/coco_val.json'

## **1-2. Custom Dataset 준비**

In [4]:
### Custom Dataset 클래스 정의

class FashionDataset(Dataset):
  def __init__(self, annotation_file, image_dir, transform = None):
    with open(annotation_file, 'r') as f:
      self.coco_data = json.load(f)
    self.image_dir = image_dir
    self.transform = transform

  def __len__(self):
    return len(self.coco_data['annotations'])

  def __getitem__(self, idx):
    ## 속성 정보
    annotation = self.coco_data['annotations'][idx]

    ## 이미지
    image_id = annotation['image_id']

    image_info = next((img for img in self.coco_data['images'] if img['id'] == image_id), None)
    image_path = os.path.join(self.image_dir, image_info['file_name'])

    image = Image.open(image_path).convert('RGB')
    if self.transform:
      image = self.transform(image)

    ## 캡션
    caption = annotation['caption']

    return image, caption

In [5]:
## 데이터셋 준비

# 이미지 전처리
transform = transforms.Compose([
    transforms.Resize((384, 384)),
    transforms.ToTensor(),
])

In [6]:
train_dataset = FashionDataset(annotation_file = coco_train_path,
                               image_dir = '/content/data/train',
                               transform = transform)

In [7]:
val_dataset = FashionDataset(annotation_file = coco_val_path,
                               image_dir = '/content/data/val',
                               transform = transform)

In [8]:
### DataLoader 설정

train_loader = DataLoader(train_dataset, batch_size = 16, shuffle = True)
val_loader = DataLoader(val_dataset, batch_size = 16, shuffle = False)

# **2. Training(fine-tuning)**

In [None]:
!pip install peft

In [10]:
from transformers import BlipProcessor, BlipForConditionalGeneration
from transformers import AdamW, get_cosine_schedule_with_warmup

from tqdm import tqdm
import matplotlib.pyplot as plt

from peft import get_peft_model, LoraConfig

## **2-1. model 준비**

In [11]:
### Pre-trained model 불러오기

# 전처리기
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large", do_rescale=False)

# 모델
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-large")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [12]:
### 장치 설정

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

BlipForConditionalGeneration(
  (vision_model): BlipVisionModel(
    (embeddings): BlipVisionEmbeddings(
      (patch_embedding): Conv2d(3, 1024, kernel_size=(16, 16), stride=(16, 16))
    )
    (encoder): BlipEncoder(
      (layers): ModuleList(
        (0-23): 24 x BlipEncoderLayer(
          (self_attn): BlipAttention(
            (dropout): Dropout(p=0.0, inplace=False)
            (qkv): Linear(in_features=1024, out_features=3072, bias=True)
            (projection): Linear(in_features=1024, out_features=1024, bias=True)
          )
          (layer_norm1): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
          (mlp): BlipMLP(
            (activation_fn): GELUActivation()
            (fc1): Linear(in_features=1024, out_features=4096, bias=True)
            (fc2): Linear(in_features=4096, out_features=1024, bias=True)
          )
          (layer_norm2): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
        )
      )
    )
    (post_layernorm): LayerNorm((1024,),

In [13]:
### target layer 가져오기
# 특정한 pattern으로 탐색

def get_target_modules(model, patterns):
    target_modules = []
    for name, module in model.named_modules():
        for pattern in patterns:
            if pattern in name:
                target_modules.append(name)
    return target_modules

In [14]:
## 예시
# 조합에 따라 바꿔야함(layer 찍어서 구조 확인해보기)

decoder_patterns = ["self.query", "self.key", "self.value", "intermediate.dense", "output.dense"]
encoder_patterns = ["self_attn.qkv", "mlp.fc"]

encoder_modules = get_target_modules(model, encoder_patterns)[-18:] # 마지막 6개 layer만 가져오려고..
                                                                    # 조건에 따라 바꿔야 함
decoder_modules = get_target_modules(model, decoder_patterns)

target_modules = encoder_modules+ decoder_modules

In [15]:
## PEFT + LoRA 설정

## LoRA Configuration
lora_config = LoraConfig(
    r = 16, # LoRA의 rank
    lora_alpha = 32, # LoRA의 alpha
    lora_dropout = 0.05, # LoRA의 dropout 비율
    target_modules = target_modules,
    bias = "none"
)

# LoRA 적용 모델
peft_model = get_peft_model(model, lora_config)

In [16]:
for modules in peft_model.targeted_module_names:
  print(modules)

vision_model.encoder.layers.18.self_attn.qkv
vision_model.encoder.layers.18.mlp.fc1
vision_model.encoder.layers.18.mlp.fc2
vision_model.encoder.layers.19.self_attn.qkv
vision_model.encoder.layers.19.mlp.fc1
vision_model.encoder.layers.19.mlp.fc2
vision_model.encoder.layers.20.self_attn.qkv
vision_model.encoder.layers.20.mlp.fc1
vision_model.encoder.layers.20.mlp.fc2
vision_model.encoder.layers.21.self_attn.qkv
vision_model.encoder.layers.21.mlp.fc1
vision_model.encoder.layers.21.mlp.fc2
vision_model.encoder.layers.22.self_attn.qkv
vision_model.encoder.layers.22.mlp.fc1
vision_model.encoder.layers.22.mlp.fc2
vision_model.encoder.layers.23.self_attn.qkv
vision_model.encoder.layers.23.mlp.fc1
vision_model.encoder.layers.23.mlp.fc2
text_decoder.bert.encoder.layer.0.attention.self.query
text_decoder.bert.encoder.layer.0.attention.self.key
text_decoder.bert.encoder.layer.0.attention.self.value
text_decoder.bert.encoder.layer.0.attention.output.dense
text_decoder.bert.encoder.layer.0.crossatt

## **2-2. 평가 지표 준비**

In [None]:
!git clone https://github.com/salaniz/pycocoevalcap
!pip install git+https://github.com/salaniz/pycocoevalcap.git

In [17]:
from pycocoevalcap.bleu.bleu import Bleu
from pycocoevalcap.meteor.meteor import Meteor
from pycocoevalcap.rouge.rouge import Rouge
from pycocoevalcap.cider.cider import Cider
from pycocoevalcap.spice.spice import Spice

In [18]:
### 평가 지표 계산

def compute_metrics(preds, labels):
  scorers = [
      (Bleu(4), "BLEU-4"),
      (Meteor(), "METEOR"),
      (Rouge(), "ROUGE"),
      (Cider(), "CIDEr"),
      (Spice(), "SPICE")
  ]

  results = {}
  for scorer, method in scorers:
    score, _ = scorer.compute_score({i: [labels[i]] for i in range(len(labels))}, {i: [preds[i]] for i in range(len(preds))})
    results[method] = score if isinstance(score, float) else score[-1]

  return results

## **2-3. 학습 및 검증**

In [19]:
### 옵티마이저 및 Learning Rate Schedule 정의

## 옵티마이저
optimizer = AdamW(filter(lambda p: p.requires_grad, peft_model.parameters()),
                  lr = 2e-5, weight_decay = 0.05)

## learning rate schedule
num_training_steps = len(train_loader) * 10  # 10 에포크
num_warmup_steps = int(0.1 * num_training_steps)  # Warmup 비율 10%
scheduler = get_cosine_schedule_with_warmup(optimizer,
                                            num_warmup_steps = num_warmup_steps,
                                            num_training_steps = num_training_steps)



In [20]:
## 학습 및 검증 손실, 평가 지표 기록

train_losses = []
val_losses = []
best_val_loss = float('inf')

bleu_scores = []
meteor_scores = []
rouge_scores = []
cider_scores = []
spice_scores = []

In [21]:
### 학습을 위한 함수

def train(model, train_loader, val_loader, optimizer, scheduler, device, num_epochs):
  global best_val_loss

  for epoch in range(num_epochs):
    print(f"=== Epoch {epoch+1} ===")
    print("-" * 20)

    ## Training loop
    model.train()  # 모델 학습 모드로 설정
    epoch_train_loss = 0

    for batch_idx, (images, captions) in enumerate(tqdm(train_loader, desc = f"Training Epoch {epoch+1}/{num_epochs}")):
      images = images.to(device)

      # 전처리
      inputs = processor(images = images, text = captions, return_tensors = "pt", padding = True).to(device)

      # 모델에 입력
      outputs = model(**inputs, labels = inputs.input_ids)

      # 손실 계산 및 역전파
      loss = outputs.loss
      loss.backward()
      optimizer.step()
      optimizer.zero_grad()
      scheduler.step()  # Learning rate 스케줄링

      epoch_train_loss += loss.item()

    avg_train_loss = epoch_train_loss / len(train_loader)
    train_losses.append(avg_train_loss)
    print(f"Epoch {epoch+1}, Average Training Loss: {avg_train_loss:.4f}")


    ## Validation loop
    model.eval()  # 모델 평가 모드로 설정
    epoch_val_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
      for images, captions in tqdm(val_loader, desc = "Validation"):
        images = images.to(device)
        inputs = processor(images = images, text = captions, return_tensors = "pt", padding = True).to(device)

        outputs = model(**inputs, labels = inputs.input_ids)
        loss = outputs.loss
        epoch_val_loss += loss.item()

        # Generate captions
        generated_ids = model.generate(pixel_values = inputs.pixel_values, max_length = 30)
        generated_texts = processor.batch_decode(generated_ids, skip_special_tokens = True)

        # Collect predictions and labels for evaluation
        all_preds.extend(generated_texts)
        all_labels.extend(captions)

    avg_val_loss = epoch_val_loss / len(val_loader)
    val_losses.append(avg_val_loss)
    print(f"Epoch {epoch+1}, Average Validation Loss: {avg_val_loss:.4f}")

    ## Calculate evaluation metrics
    metrics = compute_metrics(all_preds, all_labels)

    bleu_scores.append(metrics['BLEU-4'])
    meteor_scores.append(metrics['METEOR'])
    rouge_scores.append(metrics['ROUGE'])
    cider_scores.append(metrics['CIDEr'])
    spice_scores.append(metrics['SPICE'])

    print(f"BLEU: {metrics['BLEU-4']:.4f}, METEOR: {metrics['METEOR']:.4f}, ROUGE: {metrics['ROUGE']:.4f}, CIDEr: {metrics['CIDEr']:.4f}, SPICE: {metrics['SPICE']:.4f}")


    ## Best model 저장
    if avg_val_loss < best_val_loss:
      best_val_loss = avg_val_loss
      torch.save(model.state_dict(), "/content/drive/MyDrive/6th-project/final/model/model_trial_xxx.pth") # 모델명 trial에 맞게 바꿔주세요.
      print(f"Model saved at Epoch {epoch+1} with Validation Loss: {avg_val_loss:.4f}")

    ## Clear cache
    torch.cuda.empty_cache()

In [24]:
# 모델 학습

num_epochs = 10
train(peft_model, train_loader, val_loader, optimizer, scheduler, device, num_epochs)

=== Epoch 1 ===
--------------------


Training Epoch 1/10:  13%|█▎        | 79/625 [04:36<31:50,  3.50s/it]


KeyboardInterrupt: 

- layer 풀어주는 개수에 따라서 사용되는 리소스가 달라지니, GPU는 알아서 선택적으로 활용하세요.
  - T4도 버티기는 하는데 조금 불안불안하네요,,
  - GPU 종류 선택 자체가 모델링 성능에 미치는 영향은 다른 조건들이 모두 동일하기에 매우 미미함

## **2-3. 결과 확인**

In [None]:
### 시각화
epochs = range(1, num_epochs + 1)

plt.figure(figsize = (15, 15))

# 손실 곡선
plt.subplot(3, 1, 1)
plt.plot(epochs, train_losses, marker='o', label='Training Loss')
plt.plot(epochs, val_losses, marker='o', label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()

# 평가 지표 1 (BLEU, METEOR)
plt.subplot(3, 1, 2)
plt.plot(epochs, bleu_scores, marker='o', label='BLEU')
plt.plot(epochs, meteor_scores, marker='o', label='METEOR')
plt.xlabel('Epoch')
plt.ylabel('Score')
plt.title('BLEU and METEOR Scores')
plt.legend()

# 평가 지표 2 (ROUGE, CIDEr, SPICE)
plt.subplot(3, 1, 3)
plt.plot(epochs, rouge_scores, marker='o', label='ROUGE')
plt.plot(epochs, cider_scores, marker='o', label='CIDEr')
plt.plot(epochs, spice_scores, marker='o', label='SPICE')
plt.xlabel('Epoch')
plt.ylabel('Score')
plt.title('ROUGE, CIDEr and SPICE Scores')
plt.legend()

plt.tight_layout()
plt.show()