In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import VisualBertModel, VisualBertConfig, BertTokenizer
from sklearn.model_selection import train_test_split
import pickle
import tqdm
from sklearn.metrics import accuracy_score

In [2]:
class Cifar100Dataset(Dataset):
    def __init__(self, visual_embeddings, labels, tokenizer):
        self.visual_embeddings = visual_embeddings
        self.labels = labels
        self.tokenizer = tokenizer
        self.num_labels = len(set(self.labels))

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        visual_embedding = self.visual_embeddings[idx]
        # text_inputs = tokenizer(text_inputs, padding="max_length", max_length=32, truncation=True, return_tensors="pt")
        text_inputs = tokenizer("", padding="max_length", max_length=32, truncation=True, return_tensors="pt")
        
        # remove batch dimension
        for k,v in text_inputs.items():
            text_inputs[k] = v.squeeze()
            
        label = torch.zeros(self.num_labels)
        label[self.labels[idx]] = 1
        return visual_embedding, text_inputs, label

In [3]:
config = VisualBertConfig.from_pretrained("uclanlp/visualbert-nlvr2")
visual_bert = VisualBertModel(config)
visual_bert.config.visual_embedding_dim

1024

In [4]:
class VisualBERTForImageClassification(nn.Module):
    def __init__(self, config, num_classes, visual_embedding_size):
        super().__init__()
        self.visual_bert = VisualBertModel(config)
        self.projection = nn.Linear(256*7*7, visual_embedding_size * 256)
        self.classifier = nn.Linear(config.hidden_size, num_classes)

    def forward(self, visual_embeddings, text_inputs):
        # Project to the size that VisualBert expects
        input_visual_embeddings = self.projection(visual_embeddings.view(visual_embeddings.size(0), -1))
        
        # Reshape the visual embeddings back to 3D tensor
        input_visual_embeddings = input_visual_embeddings.view(input_visual_embeddings.size(0), 256, -1)
                
        outputs = self.visual_bert(
            input_ids=text_inputs["input_ids"],
            attention_mask=text_inputs["attention_mask"],
            token_type_ids=text_inputs["token_type_ids"],
            visual_embeds=input_visual_embeddings
        )
        return self.classifier(outputs.pooler_output)

class CollateFunction:
    def __init__(self, tokenizer, max_length=32):
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __call__(self, batch):
        visual_embeddings, text_inputs, labels = zip(*batch)
        
        # Flatten the visual embeddings
        visual_embeddings = [embedding.view(256, -1) for embedding in visual_embeddings]
        
        visual_embeddings = torch.stack(visual_embeddings)
        text_inputs = self.tokenizer(list(text_inputs), padding="max_length", max_length=self.max_length, truncation=True, return_tensors="pt")
        labels = torch.tensor(labels)
        
        return visual_embeddings, text_inputs, labels

In [5]:
def collate_fn(batch):
        visual_embeddings, text_inputs, labels = zip(*batch)
        
        # Flatten the visual embeddings
        visual_embeddings = [embedding.view(embedding.size(0), -1) for embedding in visual_embeddings]
        
        visual_embeddings = torch.stack(visual_embeddings)
        
        labels = torch.stack(labels)
        
        input_ids = torch.stack([item['input_ids'] for item in text_inputs])
        attention_mask = torch.stack([item['attention_mask'] for item in text_inputs])
        token_type_ids = torch.stack([item['token_type_ids'] for item in text_inputs])
        
        text_inputs = {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'token_type_ids': token_type_ids
        }
        
                
        return visual_embeddings, text_inputs, labels

In [6]:
def train(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0.0
    for visual_embeddings, text_inputs, labels in dataloader:
        visual_embeddings = visual_embeddings.to(device)
        text_inputs = {k: v.to(device) for k, v in text_inputs.items()}
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(visual_embeddings, text_inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    return total_loss / len(dataloader)

In [7]:
with open('cifar100-train-embeddings.pkl', 'rb') as f:
    train_embeddings_all = pickle.load(f)

with open('cifar100-test-embeddings.pkl', 'rb') as f:
    test_embeddings_all = pickle.load(f)

In [8]:
device = "cuda" if torch.cuda.is_available() else "cpu"

train_embeddings = train_embeddings_all['embeddings']
train_labels = train_embeddings_all['fine_labels']

val_embeddings = test_embeddings_all['embeddings']
val_labels = test_embeddings_all['fine_labels']

In [9]:
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

In [10]:
num_classes = len(set(train_labels))

In [11]:
model = VisualBERTForImageClassification(config, num_classes=num_classes, visual_embedding_size=1024)
model = model.to(device)

In [12]:
train_dataset = Cifar100Dataset(train_embeddings, train_labels, tokenizer)
val_dataset = Cifar100Dataset(val_embeddings, val_labels, tokenizer)

train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)
val_dataloader = DataLoader(val_dataset, batch_size=4, shuffle=False, collate_fn=collate_fn)

In [13]:
for param in model.visual_bert.parameters():
    param.requires_grad = False
for param in model.classifier.parameters():
    param.requires_grad = True

In [14]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.classifier.parameters(), lr=5e-5)
num_epochs = 5

In [15]:
best_params = None

for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    
    for visual_embeddings, text_inputs, labels in tqdm.tqdm(train_dataloader, desc=f"Epoch {epoch + 1}/{num_epochs}"):
        visual_embeddings = visual_embeddings.to(device)
        text_inputs = {k: v.to(device) for k, v in text_inputs.items()}
        labels = labels.to(device)

        optimizer.zero_grad()
        
        outputs = model(visual_embeddings, text_inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        
    train_loss = total_loss / len(dataloader)
    
    # Evaluate the model on the validation set
    model.eval()
    val_predictions = []
    val_labels = []
    
    with torch.no_grad():
        for visual_embeddings, text_inputs, labels in val_dataloader:
            visual_embeddings = visual_embeddings.to(device)
            text_inputs = {k: v.to(device) for k, v in text_inputs.items()}
            labels = labels.to(device)
            
            outputs = model(visual_embeddings, text_inputs)
            _, preds = torch.max(outputs, 1)
            
            val_predictions.extend(preds.cpu().numpy())
            val_labels.extend(labels.cpu().numpy())
    
    val_accuracy = accuracy_score(val_labels, val_predictions)
    
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {train_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")

    # Save a checkpoint if validation accuracy is greater than the previous best validation accuracy
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        best_params = model.state_dict()
        torch.save(best_params, "best_model.pth")

Epoch 1/5:   0%|          | 0/6250 [00:00<?, ?it/s]


OutOfMemoryError: CUDA out of memory. Tried to allocate 12.25 GiB (GPU 0; 23.68 GiB total capacity; 12.67 GiB already allocated; 9.81 GiB free; 12.73 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF