## Settings & Check

In [4]:
# 2. 필요한 라이브러리 설치
# %pip install -q kaggle

# 3. Kaggle API 설정
# import os

!which python
!pip show torch
import torch
print("cuda 버전:", torch.version.cuda)
!echo $PATH
!echo $LD_LIBRARY_PATH

/opt/anaconda3/bin/python
Name: torch
Version: 2.0.1
Summary: Tensors and Dynamic neural networks in Python with strong GPU acceleration
Home-page: https://pytorch.org/
Author: PyTorch Team
Author-email: packages@pytorch.org
License: BSD-3
Location: /opt/anaconda3/lib/python3.11/site-packages
Requires: filelock, jinja2, networkx, sympy, typing-extensions
Required-by: 
cuda 버전: 12.1
/usr/local/cuda-12.2/bin:/usr/local/cuda-12.2/bin:/usr/local/cuda-12.2/bin:/usr/local/cuda-12.2/bin:/home/gpu_04/.vscode-server/cli/servers/Stable-fabdb6a30b49f79a7aba0f2ad9df9b399473380f/server/bin/remote-cli:/opt/anaconda3/bin:/opt/anaconda3/bin:/usr/local/cuda-12.2/bin:/opt/anaconda3/condabin:/usr/local/cuda-12.2/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/home/gpu_04/.vscode-server/data/User/globalStorage/github.copilot-chat/debugCommand
/usr/local/cuda-12.2/lib64


In [5]:
import torch
print("PyTorch 버전:", torch.__version__)
print("CUDA 사용 가능 여부:", torch.cuda.is_available())
print("현재 디바이스:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU")
print("CUDA 버전:", torch.version.cuda if torch.cuda.is_available() else "None")

# GPU 메모리 단편화 문제 완화
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

# GPU 캐시 비우기
import torch
import gc

# 1. 불필요한 변수 삭제
# del variable

# 2. 가비지 컬렉터 실행
gc.collect()

# 3. PyTorch 캐시 메모리 해제
torch.cuda.empty_cache()

# 4. 메모리 사용 상태 출력
print(f"Allocated memory: {torch.cuda.memory_allocated() / (1024 ** 2):.2f} MB")
print(f"Reserved memory: {torch.cuda.memory_reserved() / (1024 ** 2):.2f} MB")

PyTorch 버전: 2.5.1+cu121
CUDA 사용 가능 여부: True
현재 디바이스: NVIDIA RTX A6000
CUDA 버전: 12.1
Allocated memory: 0.00 MB
Reserved memory: 0.00 MB


In [15]:
from datasets import load_dataset

# Flickr30k 데이터셋 다운로드
dataset = load_dataset("nlphuji/flickr30k")
train_dataset = dataset.filter(lambda x: x["split"] == "train")
valid_dataset = dataset.filter(lambda x: x["split"] == "val")
test_dataset = dataset.filter(lambda x: x["split"] == "test")

In [6]:
print(train_dataset)
print(valid_dataset)
print(test_dataset)

DatasetDict({
    test: Dataset({
        features: ['image', 'caption', 'sentids', 'split', 'img_id', 'filename'],
        num_rows: 29000
    })
})
DatasetDict({
    test: Dataset({
        features: ['image', 'caption', 'sentids', 'split', 'img_id', 'filename'],
        num_rows: 1014
    })
})
DatasetDict({
    test: Dataset({
        features: ['image', 'caption', 'sentids', 'split', 'img_id', 'filename'],
        num_rows: 1000
    })
})


In [8]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import pytorch_lightning as pl
from pytorch_lightning import seed_everything
from torch.utils.data import Dataset, DataLoader

from transformers import BertTokenizer, BertModel, ViTModel
from torchvision import transforms

from datasets import load_dataset

# 시드 고정
seed_everything(42)

Seed set to 42


42

## DataSet Structure (Lightning)

### 모든 데이터셋 쌍 (이미지-캡션 5개) 사용

In [22]:
class Flickr30KCustomDataset(Dataset):
    def __init__(self, hf_dataset, tokenizer, image_transform):
        """
        Flickr30K (nlphuji/flickr30k)에서 image/5개 caption 리스트를 받아
        (image, caption) 형태로 중복 샘플링해 pairs 리스트를 만든다.
        """
        self.tokenizer = tokenizer
        self.image_transform = image_transform

        self.pairs = []
        for item in hf_dataset:
            pil_image = item["image"]  # PIL.Image
            captions = item["caption"] # 최대 5개의 캡션 리스트

            # 5개 캡션 각각에 대해 (image, caption) 쌍 생성
            for c in captions:
                self.pairs.append((pil_image, c))

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        pil_image, caption = self.pairs[idx]

        # 1) 이미지 변환
        image = self.image_transform(pil_image)

        # 2) 캡션(문자열) 토큰화
        tokenized = self.tokenizer(
            caption,
            max_length=64,
            truncation=True,
            padding="max_length",
            return_tensors="pt"
        )
        # batch 차원이 [1, seq_len] 형태이므로 squeeze(0) 처리
        input_ids = tokenized["input_ids"].squeeze(0)          # (seq_len,)
        attention_mask = tokenized["attention_mask"].squeeze(0)# (seq_len,)

        return {
            "image": image,  # (3, 224, 224)
            "input_ids": input_ids,  # (seq_len,)
            "attention_mask": attention_mask
        }

### 이미지 당 하나의 캡션만 사용

In [23]:
import random

class Flickr30KCustomDatasetSingleCaption(Dataset):
    def __init__(self, hf_dataset, tokenizer, image_transform):
        self.hf_dataset = hf_dataset
        self.tokenizer = tokenizer
        self.image_transform = image_transform

    def __len__(self):
        return len(self.hf_dataset)

    def __getitem__(self, idx):
        data = self.hf_dataset[idx]
        pil_image = data["image"]  # PIL.Image
        captions = data["caption"] # 5개 캡션 리스트

        # 랜덤하게 1개 선택 (항상 첫 번째 캡션 사용하려면 captions[0]로 고정)
        caption = random.choice(captions)

        # 1) 이미지 전처리
        image = self.image_transform(pil_image)

        # 2) 캡션 토큰화
        tokenized = self.tokenizer(
            caption,
            max_length=64,
            truncation=True,
            padding="max_length",
            return_tensors="pt"
        )
        input_ids = tokenized["input_ids"].squeeze(0)
        attention_mask = tokenized["attention_mask"].squeeze(0)

        return {
            "image": image,
            "input_ids": input_ids,
            "attention_mask": attention_mask
        }


## Data Module

In [28]:
class Flickr30KDataModule(pl.LightningDataModule):
    def __init__(self,
                 train_dataset_hf,
                 valid_dataset_hf,
                 test_dataset_hf,
                 batch_size=32,
                 num_workers=4,
                 use_all_captions=True):
        super().__init__()
        self.train_dataset_hf = train_dataset_hf
        self.valid_dataset_hf = valid_dataset_hf
        self.test_dataset_hf = test_dataset_hf
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.use_all_captions = use_all_captions

        # 이미지 변환
        self.image_transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=(0.5, 0.5, 0.5),
                                 std=(0.5, 0.5, 0.5))
        ])

        # BERT 토크나이저
        self.tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

    def setup(self, stage=None):
        # train
        if stage == "fit" or stage is None:
            if self.use_all_captions:
                self.train_dataset = Flickr30KCustomDataset(
                    self.train_dataset_hf["test"],
                    tokenizer=self.tokenizer,
                    image_transform=self.image_transform
                )
                self.valid_dataset = Flickr30KCustomDataset(
                    self.valid_dataset_hf["test"],
                    tokenizer=self.tokenizer,
                    image_transform=self.image_transform
                )
            else:
                self.train_dataset = Flickr30KCustomDatasetSingleCaption(
                    self.train_dataset_hf["test"],
                    tokenizer=self.tokenizer,
                    image_transform=self.image_transform
                )
                self.valid_dataset = Flickr30KCustomDatasetSingleCaption(
                    self.valid_dataset_hf["test"],
                    tokenizer=self.tokenizer,
                    image_transform=self.image_transform
                )

        # test
        if stage == "test" or stage is None:
            if self.use_all_captions:
                self.test_dataset = Flickr30KCustomDataset(
                    self.test_dataset_hf["test"],
                    tokenizer=self.tokenizer,
                    image_transform=self.image_transform
                )
            else:
                self.test_dataset = Flickr30KCustomDatasetSingleCaption(
                    self.test_dataset_hf["test"],
                    tokenizer=self.tokenizer,
                    image_transform=self.image_transform
                )

    def train_dataloader(self):
        return DataLoader(
            self.train_dataset,
            batch_size=self.batch_size,
            shuffle=True,
            num_workers=self.num_workers
        )

    def val_dataloader(self):
        return DataLoader(
            self.valid_dataset,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=self.num_workers
        )

    def test_dataloader(self):
        return DataLoader(
            self.test_dataset,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=self.num_workers
        )


## Model Structure (Lightning)

In [43]:
class ImageTextLightningModel(pl.LightningModule):
    def __init__(self,
                 image_encoder_name="google/vit-base-patch16-224",
                 text_encoder_name="bert-base-uncased",
                 embed_dim=256,
                 temperature=0.07,
                 learning_rate=5e-5):
        """
        Args:
            image_encoder_name: ViT pretrained 모델 이름 (HuggingFace)
            text_encoder_name:  BERT pretrained 모델 이름 (HuggingFace)
            embed_dim: 최종 임베딩 차원
            temperature: InfoNCE에서 사용하는 스케일 파라미터
            learning_rate: 기본 학습률
        """
        super().__init__()
        self.save_hyperparameters()

        # Image Encoder (ViT)
        self.image_encoder = ViTModel.from_pretrained(image_encoder_name)
        # Text Encoder (BERT)
        self.text_encoder = BertModel.from_pretrained(text_encoder_name)

        # Projection layers: 768 -> embed_dim
        self.image_proj = nn.Linear(768, embed_dim)
        self.text_proj = nn.Linear(768, embed_dim)

        # 온도 (learnable하게 설정할 수도 있음)
        self.temperature = temperature

        # 학습률
        self.learning_rate = learning_rate
        
        # 테스트 시 임베딩 저장할 버퍼
        self.test_image_embeds = []
        self.test_text_embeds = []

    def forward(self, images, input_ids, attention_mask):
        """
        Forward 단계에서 이미지 임베딩, 텍스트 임베딩을 모두 구함
        """
        # ---- 1. 이미지 임베딩 ----
        # ViTModel은 pooler_output이 없을 수도 있으므로, 마지막 hidden state를 평균/CLS 등 취해 사용
        # ViTModel의 경우, [CLS] 위치(hidden_states[:, 0, :])를 임베딩으로 사용 가능
        image_outputs = self.image_encoder(pixel_values=images)
        # [batch_size, sequence_length=197, hidden_size=768] (vit-base기준)
        image_cls = image_outputs.last_hidden_state[:, 0, :]  
        image_embeds = self.image_proj(image_cls)  
        # L2 정규화
        image_embeds = F.normalize(image_embeds, p=2, dim=-1)

        # ---- 2. 텍스트 임베딩 ----
        text_outputs = self.text_encoder(input_ids=input_ids,
                                         attention_mask=attention_mask)
        # [CLS] 토큰 위치 hidden state (batch, hidden_size=768)
        text_cls = text_outputs.last_hidden_state[:, 0, :]
        text_embeds = self.text_proj(text_cls)
        # L2 정규화
        text_embeds = F.normalize(text_embeds, p=2, dim=-1)

        return image_embeds, text_embeds

    def compute_contrastive_loss(self, image_embeds, text_embeds):
        """
        Symmetric InfoNCE Loss 계산
        - logits: (batch, batch) = image_embeds @ text_embeds.T / temperature
        - 정답 라벨: 0..batch-1 (각각 diagonal이 positive pair)
        """
        # 1) similarity matrix
        logits_per_image = image_embeds @ text_embeds.t()
        logits_per_image = logits_per_image / self.temperature

        logits_per_text = logits_per_image.t()

        # 2) 대각선이 정답인 cross entropy
        #   라벨 = [0, 1, 2, ..., B-1]
        batch_size = image_embeds.size(0)
        labels = torch.arange(batch_size, device=self.device)

        loss_i = F.cross_entropy(logits_per_image, labels)
        loss_t = F.cross_entropy(logits_per_text, labels)
        loss = (loss_i + loss_t) / 2.0
        return loss

    def training_step(self, batch, batch_idx):
        # batch 구성: {"image", "input_ids", "attention_mask"}
        images = batch["image"]
        input_ids = batch["input_ids"]
        attention_mask = batch["attention_mask"]

        image_embeds, text_embeds = self(images, input_ids, attention_mask)
        loss = self.compute_contrastive_loss(image_embeds, text_embeds)

        self.log("train_loss", loss, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        images = batch["image"]
        input_ids = batch["input_ids"]
        attention_mask = batch["attention_mask"]

        image_embeds, text_embeds = self(images, input_ids, attention_mask)
        loss = self.compute_contrastive_loss(image_embeds, text_embeds)

        self.log("val_loss", loss, prog_bar=True)
        return loss

    def configure_optimizers(self):
        """
        AdamW + Linear warmup(첫 10% 스텝) + Cosine decay 스케줄링 등을 적용하려면
        Lightning에서 lr_scheduler를 함께 반환
        """
        optimizer = torch.optim.AdamW(self.parameters(), lr=self.learning_rate)

        # 스텝 수가 epoch 길이에 의존하므로, 실제 구현 시 max_steps 혹은 T_mult 등을 잘 설정해야 함
        # 여기서는 단순 예시로, OneCycleLR를 사용하거나 CosineAnnealingLR를 적용하는 예시
        # (Lightning에서 자동 스텝)
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
            optimizer,
            T_max=self.trainer.max_epochs
        )
        return [optimizer], [scheduler]
    
    #!---- Test ----
    def test_step(self, batch, batch_idx):
        """
        배치별로 이미지/텍스트 임베딩을 계산해 리턴
        """
        images = batch["image"]
        input_ids = batch["input_ids"]
        attention_mask = batch["attention_mask"]

        image_embeds, text_embeds = self(images, input_ids, attention_mask)
        return {"image_embeds": image_embeds, "text_embeds": text_embeds}

    def on_test_batch_end(self, outputs, batch, batch_idx, dataloader_idx=0):
        """
        test_step에서 return한 값(outputs)을 멤버 리스트에 쌓음
        """
        self.test_image_embeds.append(outputs["image_embeds"])
        self.test_text_embeds.append(outputs["text_embeds"])

    def on_test_epoch_end(self) -> None:
        """
        에폭 마지막에 모은 임베딩들을 이용해 Recall@K 계산
        """
        # 1) 모든 배치 임베딩 합치기
        all_image_embeds = torch.cat(self.test_image_embeds, dim=0)
        all_text_embeds = torch.cat(self.test_text_embeds, dim=0)

        # 2) Cosine 유사도 행렬 계산
        similarity_matrix = all_text_embeds @ all_image_embeds.T  # (N, N)

        # 3) Recall@K 계산
        recall_at_k = self.compute_recall(similarity_matrix)
        for k, v in recall_at_k.items():
            self.log(f"test_recall@{k}", v, prog_bar=True)
        print(f"[on_test_epoch_end] Test Recall: {recall_at_k}")

        # 4) 계산 후 리스트 초기화 (재사용 시 필요)
        self.test_image_embeds.clear()
        self.test_text_embeds.clear()

    def compute_recall(self, similarity_matrix, ks=[1, 5, 10]):
        """
        Recall@K 계산
        similarity_matrix: (N, N) (text vs image)
        """
        device = similarity_matrix.device
        n = similarity_matrix.size(0)
        ground_truth = torch.arange(n, device=device)  # 정답 인덱스

        # 정렬된 인덱스 (내림차순)
        sorted_indices = similarity_matrix.argsort(dim=1, descending=True)

        recall_scores = {}
        for k in ks:
            top_k = sorted_indices[:, :k]
            match = (top_k == ground_truth.unsqueeze(1)).any(dim=1)
            recall_scores[k] = match.float().mean().item()
        return recall_scores

## Train

In [46]:
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
from pytorch_lightning.loggers import TensorBoardLogger

# 1) DataModule 생성 (예: 중복샘플링 버전)
data_module = Flickr30KDataModule(
    train_dataset_hf=train_dataset,
    valid_dataset_hf=valid_dataset,
    test_dataset_hf=test_dataset,
    batch_size=32,
    num_workers=4,
    use_all_captions=True  # True -> 중복 샘플링, False -> 랜덤 1개 선택
)
data_module.setup("fit")

# 2) 모델 초기화
model = ImageTextLightningModel(
    image_encoder_name="google/vit-base-patch16-224",
    text_encoder_name="bert-base-uncased",
    embed_dim=256,
    temperature=0.07,
    learning_rate=5e-5
)

# 3) 로거와 콜백 설정
logger = TensorBoardLogger(
    save_dir="lightning_logs",  # 로그가 저장될 폴더
    name="dual_encoder_demo"    # 실험 이름
)

checkpoint_callback = ModelCheckpoint(
    monitor="val_loss",         # val_loss 기준으로 체크
    dirpath="checkpoints",      # 체크포인트 저장 경로
    filename="best-checkpoint", # 파일명
    save_top_k=5,               # 가장 좋은 모델 k개만 저장
    mode="min"                  # val_loss가 작을수록 좋음
)

early_stopping_callback = EarlyStopping(
    monitor="val_loss",
    patience=3,  # 3 에폭 연속 개선 없으면 종료
    mode="min"
)

# 4) Trainer 설정
trainer = pl.Trainer(
    max_epochs=10,
    accelerator="gpu",  # GPU 사용
    devices=1,
    precision=16,       # fp16
    logger=logger,
    callbacks=[checkpoint_callback, early_stopping_callback]
)


Some weights of ViTModel were not initialized from the model checkpoint at google/vit-base-patch16-224 and are newly initialized: ['vit.pooler.dense.bias', 'vit.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
/home/gpu_04/.conda/envs/DL/lib/python3.10/site-packages/lightning_fabric/connector.py:572: `precision=16` is supported for historical reasons but its usage is discouraged. Please set your precision to 16-mixed instead!
Using 16bit Automatic Mixed Precision (AMP)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


In [44]:
# 5) 모델 학습
trainer.fit(model, data_module)


Detected KeyboardInterrupt, attempting graceful shutdown ...


NameError: name 'exit' is not defined

In [47]:
trainer.test(model, data_module)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [4]


Testing DataLoader 0: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 157/157 [00:06<00:00, 22.68it/s]

TypeError: ImageTextLightningModel.on_test_epoch_end() missing 1 required positional argument: 'outputs'