In [5]:
import os
import pandas as pd

import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

from transformers import ViTImageProcessor, ViTForImageClassification
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer # 텍스트

from tqdm.auto import tqdm
from PIL import Image


In [6]:
if torch.cuda.is_available() : device = torch.device('cuda')
elif torch.backends.mps.is_available() : device = torch.device('mps')
else : device=torch.device('cpu')
print(f'Using {device}')

Using mps


In [7]:
class VQADataset(Dataset):
    def __init__(self, df, tokenizer, transform, img_path, is_test=False):
        self.df = df
        self.tokenizer = tokenizer
        self.transform = transform
        self.img_path = img_path
        self.is_test = is_test
        
    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]

        img_name = os.path.join(self.img_path, row['image_id'] + '.jpg')
        image = Image.open(img_name).convert('RGB')
        
        image = self.transform(image)

        question = row['question']
        question = self.tokenizer(question, truncation=True, padding='max_length', max_length=70, return_tensors="pt")

        if not self.is_test:
            answer = row['answer']

            answer = self.tokenizer(answer, truncation=True, padding='max_length', max_length=100, return_tensors="pt")
            
            return {
                'image': image.squeeze(),
                'question': question['input_ids'].squeeze(),
                'question_attention_mask' : question["attention_mask"].squeeze(),
                'answer': answer['input_ids'].squeeze()
            }
        else:
            return {
                'image': image.squeeze(),
                'question': question['input_ids'].squeeze(),
                'question_attention_mask' : question["attention_mask"].squeeze(),
            }


In [8]:
# 데이터 불러오기
train_df = pd.read_csv('trainsformed_train.csv')
test_df = pd.read_csv('trainsformed_test.csv')
sample_submission = pd.read_csv('sample_submission.csv')
train_img_path = 'image/train'
test_img_path = 'image/test'

# dataset & dataloader
tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-base")
vocab_size = len(tokenizer)

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1)
])

train_dataset = VQADataset(train_df, tokenizer, transform, train_img_path, is_test=False)
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)

In [9]:
class VQAModel(nn.Module):
    def __init__(self, vocab_size):
        super(VQAModel, self).__init__()
        if torch.cuda.is_available() : self.device = torch.device('cuda')
        elif torch.backends.mps.is_available() : self.device = torch.device('mps')
        else : self.device=torch.device('cpu')

        self.vocab_size = vocab_size

        self.vit = ViTForImageClassification.from_pretrained('google/vit-base-patch16-224')
        self.vit_processor = ViTImageProcessor.from_pretrained('google/vit-base-patch16-224')

        self.flan = AutoModelForSeq2SeqLM.from_pretrained('google/flan-t5-base')

        combined_features_size = self.vit.classifier.out_features + self.flan.lm_head.out_features

        self.classifier = nn.Linear(combined_features_size, vocab_size)

    def forward(self, images, question,question_attention_mask, answer):
        inputs = self.vit_processor(images=images, return_tensors="pt")

        inputs = inputs.to(self.device)
        image_features = self.vit(**inputs).logits

        question_features = self.flan(question,question_attention_mask,answer).logits
        question_features = question_features.view(question_features.size(0),-1)
        
        combined = torch.cat([image_features, question_features], dim=-1) # [batch, sequence, 1000+hidden]

        output = self.classifier(combined) # [batch, vocab_size]

        return output
        
            

In [10]:
# 데이터 불러오기
train_df = pd.read_csv('trainsformed_train.csv')
test_df = pd.read_csv('trainsformed_test.csv')
sample_submission = pd.read_csv('sample_submission.csv')
train_img_path = 'image/train'
test_img_path = 'image/test'

# dataset & dataloader
tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-base")
vocab_size = len(tokenizer)

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1)
])

train_dataset = VQADataset(train_df, tokenizer, transform, train_img_path, is_test=False)
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)

In [11]:
def train(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0

    for data in tqdm(loader, total=len(loader)):
        images = data['image'].to(device)
        question = data['question'].to(device)
        question_attention_mask = data['question_attention_mask'].to(device)
        answer = data['answer'].to(device)

        optimizer.zero_grad()

        outputs = model(images = images, question = question, question_attention_mask = question_attention_mask, answer = answer)

        # output: [batch, sequence, vocab], answer : [batch, sequence]
        loss = criterion(outputs.view(-1, outputs.size(-1)), answer.view(-1))
        total_loss += loss.item()

        loss.backward()
        optimizer.step()

    avg_loss = total_loss / len(loader)
    return avg_loss

In [12]:
def inference(model, loader):
    model.eval()
    preds = []
    with torch.no_grad():
        for data in tqdm(loader, total=len(loader)):
            images = data['image'].to(device)
            question = data['question'].to(device)
            question_attention_mask = data['question_attention_mask'].to(device)

            outputs = model(images = images, question = question, question_attention_mask = question_attention_mask, answer = None) # [batch, sequence, vocab]

            _, pred = torch.max(outputs, dim=2) # values, indices = _, pred
            preds.extend(pred.cpu().numpy())

    return preds

In [13]:
model = VQAModel(vocab_size).to(device)


In [14]:
os.makedirs("models",exist_ok=True)
# Criterion and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=5e-5)

# Training loop
best_loss = float('inf')  # 초기 최적의 손실값을 무한대로 설정
num_epochs = 100000      # 원하는 만큼 반복 실행할 에폭 수

epoch = 0
while True:
    epoch += 1
    avg_loss = train(model, train_loader, optimizer, criterion)
    print(f"Epoch: {epoch}, Loss: {avg_loss:.4f}")

    # 성능이 좋아질 때마다 모델 저장
    if avg_loss < best_loss:
        torch.save(model.state_dict(), f"models/{epoch}_{avg_loss}.pth")
        best_loss = avg_loss


  0%|          | 0/89881 [00:00<?, ?it/s]

: 

: 