In [None]:
pip install open_clip_torch

In [None]:
from huggingface_hub import login
login()

In [None]:
import torch
from PIL import Image
import open_clip

MODEL_NAME = "laion/mscoco_finetuned_CoCa-ViT-L-14-laion2B-s13B-b90k"

# 모델 로드 (가중치는 다운로드하지만 실제 연산은 하지 않음)
model, _, preprocess = open_clip.create_model_and_transforms('coca_ViT-L-14', pretrained='mscoco_finetuned_laion2B_s13B_b90k')
tokenizer = open_clip.get_tokenizer('ViT-B-32')

# image = preprocess(Image.open("docs/CLIP.png")).unsqueeze(0)
text = tokenizer(["a diagram", "a dog", "a cat"])

# 모델의 모든 파라미터 수를 합산
total_params = sum(p.numel() for p in model.parameters())

print(f"모델: {MODEL_NAME}")
print(f"총 파라미터 수: {total_params:,}")

if total_params >= 1_000_000_000:
    print(f"{total_params / 1_000_000_000:.2f}B 개의 파라미터")
else:
    print(f"{total_params / 1_000_000:.2f}M 개의 파라미터")

In [None]:
import torch
import open_clip
import pandas as pd
from PIL import Image
from pathlib import Path
from tqdm.auto import tqdm
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from torch.nn import CrossEntropyLoss
import zipfile
import os

from google.colab import drive
drive.mount('/content/drive')
base_path = "/content/drive/MyDrive/Colab Notebooks/Dacon/"


In [None]:
zip_path = os.path.join(base_path, 'data/open.zip')
extract_path = '/content/data'
os.makedirs(extract_path, exist_ok=True)

print(f"Extracting {zip_path} to {extract_path}...")
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)
print("Extraction complete.")

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")


In [None]:
class VQADataset(Dataset):
    def __init__(self, df, tokenizer, image_transform, image_base_path, is_test=False):
        self.df = df
        self.tokenizer = tokenizer
        self.image_transform = image_transform
        self.image_base_path = Path(image_base_path)
        self.is_test = is_test
        self.answer_map = {'A': 0, 'B': 1, 'C': 2, 'D': 3}

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image_path = self.image_base_path / row['img_path'].split('/')[-1]
        try:
            image = self.image_transform(Image.open(image_path).convert("RGB"))
        except FileNotFoundError:
            image = torch.zeros((3, 224, 224))

        question = row['Question']
        options = {'A': row['A'], 'B': row['B'], 'C': row['C'], 'D': row['D']}
        text_prompts = [f"Based on the given image, the answer to the question: {question} is: {options[key]}" for key in ['A', 'B', 'C', 'D']]
        tokenized_texts = self.tokenizer(text_prompts)

        if self.is_test:
            return row['ID'], image, tokenized_texts
        else:
            answer_label = self.answer_map[row['answer']]
            return image, tokenized_texts, torch.tensor(answer_label, dtype=torch.long)


In [None]:
model_name = "coca_ViT-L-14"
pretrained = "laion2b_s13b_b90k"
model, preprocess_train, preprocess_val = open_clip.create_model_and_transforms(
    model_name,
    pretrained=pretrained,
    device=device
)
tokenizer = open_clip.get_tokenizer(model_name)
print("Model loaded successfully.")

In [None]:
# 하이퍼파라미터 설정
EPOCHS = 5
BATCH_SIZE = 16
LEARNING_RATE = 1e-6

# 압축 해제된 데이터 경로 설정
TRAIN_CSV_PATH = os.path.join(extract_path, 'train.csv')
TRAIN_IMG_DIR = os.path.join(extract_path, 'train_input_images')

from sklearn.model_selection import train_test_split

# 학습 데이터 로드 및 데이터셋/로더 생성
original_train_df = pd.read_csv(TRAIN_CSV_PATH)
# 훈련/검증 데이터 분리 (stratify=original_train_df['answer']는 정답 비율을 유지)
train_df, val_df = train_test_split(
    original_train_df,
    test_size=0.2, # 검증
    random_state=42,
    stratify=original_train_df['answer']
)

print(f"훈련 데이터: {len(train_df)}개, 검증 데이터: {len(val_df)}개")

train_dataset = VQADataset(
    df=train_df,
    tokenizer=tokenizer,
    image_transform=preprocess_train,
    image_base_path=TRAIN_IMG_DIR
)
val_dataset = VQADataset(
    df=val_df,
    tokenizer=tokenizer,
    image_transform=preprocess_val, # 데이터 증강 없는 전처리기
    image_base_path=TRAIN_IMG_DIR
)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

# 옵티마이저 및 손실 함수
optimizer = AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=0.01)
loss_fn = CrossEntropyLoss()

In [None]:
best_val_accuracy = 0.0
best_model_path = "best_vqa_model.pth" #

for epoch in range(EPOCHS):
    # 훈련
    model.train()
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS} [Training]")
    total_loss = 0.0
    for images, texts, labels in progress_bar:
        images, texts, labels = images.to(device), texts.to(device), labels.to(device)
        num_options = texts.shape[1]
        texts = texts.view(-1, texts.shape[-1])
        optimizer.zero_grad()
        image_features = model.encode_image(images)
        text_features = model.encode_text(texts)
        image_features = image_features / image_features.norm(dim=-1, keepdim=True)
        text_features = text_features / text_features.norm(dim=-1, keepdim=True)
        text_features = text_features.view(-1, num_options, text_features.shape[-1])
        logits = torch.einsum('bd,bcd->bc', image_features, text_features) * model.logit_scale.exp()
        loss = loss_fn(logits, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        progress_bar.set_postfix({"loss": f"{loss.item():.4f}"})
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1} Average Training Loss: {avg_loss:.4f}")
# 검증
    model.eval()
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for images, texts, labels in tqdm(val_loader, desc=f"Epoch {epoch+1}/{EPOCHS} [Validation]"):
            images, texts, labels = images.to(device), texts.to(device), labels.to(device)
            num_options = texts.shape[1]
            texts = texts.view(-1, texts.shape[-1])

            image_features = model.encode_image(images)
            text_features = model.encode_text(texts)
            image_features /= image_features.norm(dim=-1, keepdim=True)
            text_features /= text_features.norm(dim=-1, keepdim=True)
            text_features = text_features.view(-1, num_options, text_features.shape[-1])
            logits = torch.einsum('bd,bcd->bc', image_features, text_features) * model.logit_scale.exp()

            # 정확도 계산
            preds = logits.argmax(dim=-1)
            val_correct += (preds == labels).sum().item()
            val_total += labels.size(0)

    val_accuracy = val_correct / val_total
    print(f"Epoch {epoch+1} Validation Accuracy: {val_accuracy:.4f}")

    # 최고 성능 모델 저장
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model.state_dict(), best_model_path)
        print(f"🎉 New best model saved with accuracy: {best_val_accuracy:.4f} at {best_model_path}")

print("Training finished.")
print(f"Best validation accuracy: {best_val_accuracy:.4f}")

In [None]:
model.train()
for epoch in range(EPOCHS):
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS}")
    total_loss = 0.0
    for images, texts, labels in progress_bar:
        images = images.to(device)
        num_options = texts.shape[1]
        texts = texts.view(-1, texts.shape[-1]).to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        image_features = model.encode_image(images)
        text_features = model.encode_text(texts)

        image_features = image_features / image_features.norm(dim=-1, keepdim=True)
        text_features = text_features / text_features.norm(dim=-1, keepdim=True)

        text_features = text_features.view(-1, num_options, text_features.shape[-1])
        logits = torch.einsum('bd,bcd->bc', image_features, text_features) * model.logit_scale.exp()

        loss = loss_fn(logits, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        progress_bar.set_postfix({"loss": f"{loss.item():.4f}"})

    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1} Average Loss: {avg_loss:.4f}")

print("Training finished.")

In [None]:
TEST_CSV_PATH = os.path.join(extract_path, 'test.csv')
TEST_IMG_DIR = os.path.join(extract_path, 'test_input_images')
SUBMISSION_PATH = os.path.join('/content/data', 'submission1.csv')
test_df = pd.read_csv(TEST_CSV_PATH)
test_dataset = VQADataset(
    df=test_df,
    tokenizer=tokenizer,
    image_transform=preprocess_val,
    image_base_path=TEST_IMG_DIR,
    is_test=True
)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)


In [None]:
model.eval()
predictions = []
ids = []
idx_to_answer = {0: 'A', 1: 'B', 2: 'C', 3: 'D'}

# 최고 성능 모델의 가중치 로드
model.load_state_dict(torch.load(best_model_path))
print(f"Loaded best model from {best_model_path}")

with torch.no_grad():
    for batch_ids, images, texts in tqdm(test_loader, desc="Predicting"):
        images = images.to(device)
        num_options = texts.shape[1]
        texts = texts.view(-1, texts.shape[-1]).to(device)

        image_features = model.encode_image(images)
        text_features = model.encode_text(texts)

        image_features /= image_features.norm(dim=-1, keepdim=True)
        text_features /= text_features.norm(dim=-1, keepdim=True)

        text_features = text_features.view(-1, num_options, text_features.shape[-1])
        logits = torch.einsum('bd,bcd->bc', image_features, text_features) * model.logit_scale.exp()

        preds = logits.argmax(dim=-1).cpu().numpy()

        for p in preds:
            predictions.append(idx_to_answer[p])
        ids.extend(batch_ids)

submission_df = pd.DataFrame({'ID': ids, 'answer': predictions})

sample_submission_path = os.path.join(extract_path, 'sample_submission.csv')
if os.path.exists(sample_submission_path):
    sample_df = pd.read_csv(sample_submission_path)
    submission_df = submission_df.set_index('ID').loc[sample_df['ID']].reset_index()

submission_df.to_csv(SUBMISSION_PATH, index=False)

print(f"Inference complete. Submission file saved to {SUBMISSION_PATH}")