In [10]:
from PIL import Image
import requests
requests.__version__

'2.27.1'

In [2]:
url = "./data/flickr30k_images/flickr30k_images/5897297135.jpg"
image = Image.open(requests.get(url, stream=True).raw).convert("RGB")

MissingSchema: Invalid URL './data/flickr30k_images/flickr30k_images/5897297135.jpg': No scheme supplied. Perhaps you meant http://./data/flickr30k_images/flickr30k_images/5897297135.jpg?

In [11]:
import torch
import pandas as pd
from sklearn.model_selection import train_test_split
from transformers import LxmertTokenizer, LxmertConfig, LxmertModel
from modeling_frcnn import GeneralizedRCNN
import utils
from processing_image import Preprocess
from transformers import AdamW
from torch.utils.data import DataLoader

In [12]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, hypothesis, images, labels):
        self.hypothesis = hypothesis
        self.images = images
        self.labels = labels
    
    def __getitem__(self, idx):
        item = {'text':self.hypothesis[idx],'img':self.images[idx],
                'label': self.labels[idx]}
        return item

    def __len__(self):
        return len(self.labels)

In [13]:
class MyTrainer():
    def __init__(self,model,device='cpu'):
        self.device = device
        self.train = self.read_dataset(data_path ='../e-Vil/data/', dataset_path='esnlive_train.csv',
                                       img_path='flickr30k_images/flickr30k_images/')
        self.test = self.read_dataset(data_path ='../e-Vil/data/', dataset_path='esnlive_test.csv',
                                       img_path='flickr30k_images/flickr30k_images/')
        self.train_dataset = MyDataset(self.train['hypothesis'].values,
                                 self.train['image'].values,
                                 self.train['label'].values)
        self.test_dataset = MyDataset(self.test['hypothesis'].values,
                                 self.test['image'].values,
                                 self.test['label'].values)
        self.lxmert_tokenizer = LxmertTokenizer.from_pretrained("unc-nlp/lxmert-base-uncased")
        self.rcnn_cfg = utils.Config.from_pretrained("unc-nlp/frcnn-vg-finetuned")
        self.rcnn_cfg.MODEL.DEVICE = self.device
        self.rcnn = GeneralizedRCNN.from_pretrained("unc-nlp/frcnn-vg-finetuned", config=self.rcnn_cfg)
        self.image_preprocess = Preprocess(self.rcnn_cfg)
        self.model = model.to(self.device)
    
    def get_visual_features(self,img):
        #preprocess image
        images, sizes, scales_yx = self.image_preprocess(img)
        output_dict = self.rcnn(
            images, 
            sizes, 
            scales_yx=scales_yx, 
            padding="max_detections",
            max_detections=self.rcnn_cfg.max_detections,
            return_tensors="pt"
        )
        
        #Very important that the boxes are normalized
        normalized_boxes = output_dict.get("normalized_boxes")
        features = output_dict.get("roi_features")
        return normalized_boxes, features
    
    def get_text_features(self,text): 
        #preprocess text
        inputs = self.lxmert_tokenizer(
            text,
            padding="max_length",
            max_length=20,
            truncation=True,
            return_token_type_ids=True,
            return_attention_mask=True,
            add_special_tokens=True,
            return_tensors="pt"
        )
        return inputs
      
    def read_dataset(self,data_path=None,dataset_path=None,img_path=None):
        data = pd.read_csv(data_path+dataset_path)
        labels_encoding = {'contradiction':0,'neutral': 1,
                           'entailment':2}
        data = data[['hypothesis','Flickr30kID','gold_label']]
        data['gold_label']=data['gold_label'].apply(lambda label: labels_encoding[label])
        data['Flickr30kID'] = data['Flickr30kID'].apply(lambda x: data_path+img_path+x)
        data.rename(columns={ data.columns[0]: "hypothesis", data.columns[1]: "image",
                              data.columns[2]: "label" }, inplace = True)
        return data
                
    def train_model(self,epochs=None):
        optim = AdamW(self.model.parameters(), lr=5e-5)
        train_loader = DataLoader(self.train_dataset, batch_size=2, shuffle=True)
        for epoch in range(epochs):
            for item in train_loader:
                text = item['text']
                img = item['img']
                label = item['label']
                inputs = self.get_text_features(text)
                normalized_boxes, features = self.get_visual_features(img)
                inputs = inputs.to(self.device)
                normalized_boxes = normalized_boxes.to(self.device)
                features = features.to(self.device)
                optim.zero_grad()
                outputs = model.forward(
                    input_ids=inputs.input_ids,
                    attention_mask=inputs.attention_mask,
                    features=features,
                    normalized_boxes=normalized_boxes,
                    token_type_ids=inputs.token_type_ids,
                    label = label
                )
                loss = outputs.loss
                loss.backward()
                optim.step()
                break
        self.model.eval()
        return

In [14]:
class Lxmert(LxmertModel):
    def __init__(self,numb_labels=3):
        super().__init__(LxmertConfig.from_pretrained("unc-nlp/lxmert-base-uncased"))
        self.config.problem_type = "single_label_classification"
        self.classification = torch.nn.Linear(self.config.hidden_size, numb_labels)
        self.num_labels = numb_labels
        if self.config.problem_type == "single_label_classification":
          self.loss_fct = torch.nn.CrossEntropyLoss()
          self.output_loss = lambda output,labels : self.loss_fct(output.logits.view(-1, self.num_labels), labels.view(-1)) 
        elif self.config.problem_type == "regression":
          self.loss_fct = torch.nn.MSELoss()
          if self.num_labels == 1: self.output_loss = lambda output,labels : self.loss_fct(output.logits.squeeze(), labels.squeeze())
          else: self.output_loss =  lambda output,labels : self.loss_fct(output.logits, labels)
        elif self.config.problem_type == "multi_label_classification":
          self.loss_fct = torch.nn.BCEWithLogitsLoss()
          self.output_loss = lambda output,labels : self.loss_fct(output.logits, labels)
        # don't forget to init the weights for the new layers
        self.init_weights()
        
    def forward(self,input_ids=None,attention_mask=None,token_type_ids=None,features=None,normalized_boxes=None,label=None):
        output = super().forward(
            input_ids=input_ids,
            attention_mask=attention_mask,
            visual_feats=features,
            visual_pos=normalized_boxes,
            token_type_ids=token_type_ids,
            return_dict=True,
            output_attentions=False,
        )
                
        aux = self.classification(output.pooled_output)
        output.logits = aux
        output.loss=None
        output.loss = self.output_loss(output, label)
        return output
    
    def save_model(self,path):
        torch.save(self.state_dict(), path)
        
    def load_model(self,path):
        self.load_state_dict(torch.load(path))
        self.eval()
        
    def run(self,dataset,trainer):
        img_path1 = dataset.loc[50,'image']
        text1 = dataset.loc[50,'hypothesis'] #"How many people are in the image?"
        label1 = torch.LongTensor([dataset.loc[50,'label']])
        print('SAMPLE1')
        print(img_path1,text1,label1)
        item1 = {'text':[text1], 'img':[img_path1], 'label':label1}
        inputs = trainer.get_text_features(text1)
        normalized_boxes, features = trainer.get_visual_features(img_path1)        
        output = self.forward(
            input_ids=inputs.input_ids,
            attention_mask=inputs.attention_mask,
            features=features,
            normalized_boxes=normalized_boxes,
            token_type_ids=inputs.token_type_ids,
            label = label1
        )
        print(output.logits)
        m = torch.nn.Softmax(dim=1)
        probs = m(output.logits)
        print(probs)
        return output

In [15]:
device = "cpu"
#device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
task = 'test'
device

'cpu'

In [16]:
if task =='train':
    model = Lxmert()
    trainer = MyTrainer(model,device=device)
    trainer.train_model(epochs=1)
    model.save_model("my_model")
elif task =='test':
    model = Lxmert()
    model.load_model("my_model")
    trainer = MyTrainer(model,device=device)
    output = model.run(trainer.test,trainer)

loading configuration file cache
loading weights file https://cdn.huggingface.co/unc-nlp/frcnn-vg-finetuned/pytorch_model.bin from cache at C:\Users\Utilizador/.cache\torch\transformers\57f6df6abe353be2773f2700159c65615babf39ab5b48114d2b49267672ae10f.77b59256a4cf8343ae0f923246a81489fc8d82f98d082edc2d2037c977c0d9d0
All model checkpoint weights were used when initializing GeneralizedRCNN.

All the weights of GeneralizedRCNN were initialized from the model checkpoint at unc-nlp/frcnn-vg-finetuned.
If your task is similar to the task the model of the checkpoint was trained on, you can already use GeneralizedRCNN for predictions without further training.
SAMPLE1
../e-Vil/data/flickr30k_images/flickr30k_images/2731298834.jpg two boys are swimming while the mom is tanning on the beach tensor([0])
cv2


  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


tensor([[-0.9356, -1.9619,  2.8482]], grad_fn=<AddmmBackward0>)
tensor([[0.0221, 0.0079, 0.9700]], grad_fn=<SoftmaxBackward0>)
