## Settings

In [None]:
import os

dataset_name = ""
hf_dataset_name = f"<hf_username>/{dataset_name}-no-images"
hf_token = ""
image_dir = f"/path/to/{dataset_name}-images/"

output_dir = "./outputs/{0}".format(dataset_name)
save_processed_data = True

num_train_epochs = 30
train_batch_size = 16
eval_batch_size = 16
weight_decay = 0.01
metric_for_best_model = "accuracy"
push_to_hub = True
hub_model_id = ""

device = "cuda"
embed_dim = 50
frcnn_pretrained_name = "unc-nlp/frcnn-vg-finetuned"
visualbert_pretrained_name = "uclanlp/visualbert-vqa-coco-pre"
tokenizer_pretrained_name = "vinai/phobert-large"

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained(tokenizer_pretrained_name)
tokenizer.vocab_size

## Dataset

In [None]:
import os
from tqdm import tqdm
from PIL import Image
from datasets import load_dataset

train_dataset = load_dataset(hf_dataset_name, token=hf_token, split="train")
val_dataset = load_dataset(hf_dataset_name, token=hf_token, split="val")
test_dataset = load_dataset(hf_dataset_name, token=hf_token, split="test")

In [None]:
all_answers = []
for split in [train_dataset, val_dataset, test_dataset]:
    all_answers.extend([it['answer'] for it in split])
all_answers = list(set(all_answers))
label2answer = {label: answer for label, answer in enumerate(all_answers)}
answer2label = {v: k for k, v in label2answer.items()}
len(answer2label)

## Preprocess

### Define transforms function

In [None]:
from embedding import FRCNNEmbedding

vision_embed = FRCNNEmbedding(pretrained_name=frcnn_pretrained_name, embed_dim=embed_dim, device=device)

In [None]:
import torch
import torch.nn.functional as F


def transforms(examples):
    inputs = tokenizer(
        examples["question"], 
        max_length=embed_dim, 
        padding="max_length", 
        truncation=True, 
        return_tensors="pt",
    )

    image_paths = [
        os.path.join(image_dir, image_filename)
        for image_filename in examples["image_filename"]
    ]
    
    visual_embeds = vision_embed(image_paths).detach().cpu()    
    visual_token_type_ids = torch.ones(visual_embeds.shape[:-1], dtype=torch.long)
    visual_attention_mask = torch.ones(visual_embeds.shape[:-1], dtype=torch.float)
    
    labels = [torch.tensor(answer2label[it]) for it in examples["answer"]]
    labels = torch.stack([
        F.one_hot(label, num_classes=len(label2answer)).type(torch.FloatTensor) 
        for label in labels
    ])

    inputs.update(
        {
            "visual_embeds": visual_embeds,
            "visual_token_type_ids": visual_token_type_ids,
            "visual_attention_mask": visual_attention_mask,
            "labels": labels,
        }
    )
    return inputs

### Transform train set

In [None]:
from datasets.utils.logging import disable_progress_bar
disable_progress_bar()

In [None]:
train_transformed = train_dataset.map(
    transforms, batched=True, remove_columns=train_dataset.column_names,
)
train_transformed.set_format(type='torch')

### Transforms test set

In [None]:
test_transformed = test_dataset.map(
    transforms, batched=True, remove_columns=test_dataset.column_names,
)
test_transformed.set_format(type='torch')

### Transforms val set

In [None]:
val_transformed = val_dataset.map(
    transforms, batched=True, remove_columns=val_dataset.column_names,
)
val_transformed.set_format(type='torch')

### Save transformed data

In [None]:
if save_processed_data:
    test_transformed.save_to_disk(os.path.join(output_dir, "test"))
    val_transformed.save_to_disk(os.path.join(output_dir, "val"))
    train_transformed.save_to_disk(os.path.join(output_dir, "train"))

In [None]:
# clear cache
del vision_embed
torch.cuda.empty_cache()

## Modeling

In [None]:
from datasets import load_from_disk

if save_processed_data:
    test_transformed = load_from_disk(os.path.join(output_dir, "test"))
    val_transformed = load_from_disk(os.path.join(output_dir, "val"))
    train_transformed = load_from_disk(os.path.join(output_dir, "train"))

### Define VisualBert model for VQA

In [None]:
from transformers import VisualBertConfig, VisualBertForQuestionAnswering

# config = VisualBertConfig.from_pretrained("uclanlp/visualbert-vqa-coco-pre")
# config.vocab_size = tokenizer.vocab_size
# config.num_labels=len(label2answer)

# model = VisualBertForQuestionAnswering(config=config).to(device)

model = VisualBertForQuestionAnswering.from_pretrained(
    visualbert_pretrained_name, 
    num_labels=len(label2answer),
    vocab_size=tokenizer.vocab_size,
    ignore_mismatched_sizes=True,
)

### Define metrics calculator

In [None]:
from metrics import eval_report

def compute_metrics(eval_preds):
    logits, labels = eval_preds
    labels = list(map(label2answer.get, [list(l).index(1) for l in labels]))
    predictions = list(map(label2answer.get, np.argmax(logits, axis=-1).tolist()))

    return eval_report(labels, predictions)

## Train model

In [None]:
import torch
import numpy as np
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir=output_dir,
    num_train_epochs=num_train_epochs,
    per_device_train_batch_size=train_batch_size,
    per_device_eval_batch_size=eval_batch_size,
    weight_decay=weight_decay,
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model=metric_for_best_model,
    push_to_hub=push_to_hub,
    hub_model_id=hub_model_id,
    hub_token=hf_token,
)

In [None]:
from transformers import Trainer
trainer = Trainer(
    model=model, 
    args=training_args,
    compute_metrics=compute_metrics, 
    train_dataset=train_transformed, 
    eval_dataset=val_transformed,
)

In [None]:
trainer.train()

## Test

In [None]:
preds, labels, metrics = trainer.predict(test_transformed)
preds = np.vectorize(label2answer.get)(preds.argmax(-1)).tolist()
metrics

In [None]:
with open(os.path.join(output_dir, "predictions.txt"), "w", encoding="utf-8") as f:
    f.write("\n".join(preds))