In [1]:
import os
import json
# from Flamingo.utils import pretty_print
import pdb 
from torch.utils.data import Dataset
from PIL import Image 
from rich import print 
import numpy as np 
from Flamingo.lora_tuning import get_tokenizer
from Flamingo.config.baseline import model_config
def preprocess_laion_text(sample, tokenizer, max_tokens=32):
    """
    Preprocess text for LAION.
    Captions are truncated to 32 tokens by default.
    """
    sample = [
        (f"<image>{s.strip()}<|endofchunk|>{tokenizer.eos_token}") for s in sample
    ]
    text = tokenizer(
        sample,
        max_length=max_tokens,
        padding="longest",
        truncation="only_first",
        return_tensors="pt",
    )
    return text["input_ids"], text["attention_mask"]


class VQADataset(Dataset):
    def __init__(
        self, image_dir_path, question_path, annotations_path, is_train, dataset_name
    ):
        self.questions = json.load(open(question_path, "r"))["questions"]
        if annotations_path is not None:
            self.answers = json.load(open(annotations_path, "r"))["annotations"]
        else:
            self.answers = None
        self.image_dir_path = image_dir_path
        self.is_train = is_train
        self.dataset_name = dataset_name
        if self.dataset_name in {"vqav2", "ok_vqa"}:
            self.img_coco_split = self.image_dir_path.strip("/").split("/")[-1]
            assert self.img_coco_split in {"train2014", "val2014", "test2015"}

    def __len__(self):
        return len(self.questions)

    def get_img_path(self, question):
        if self.dataset_name in {"vqav2", "ok_vqa"}:
            return os.path.join(
                self.image_dir_path,
                f"COCO_{self.img_coco_split}_{question['image_id']:012d}.jpg"
                if self.is_train
                else f"COCO_{self.img_coco_split}_{question['image_id']:012d}.jpg",
            )
        elif self.dataset_name == "vizwiz":
            return os.path.join(self.image_dir_path, question["image_id"])
        elif self.dataset_name == "textvqa":
            return os.path.join(self.image_dir_path, f"{question['image_id']}.jpg")
        else:
            raise Exception(f"Unknown VQA dataset {self.dataset_name}")

    def collater(self, samples):
        """
            collate function 
        """
        question_list, answer_list, input_id_list, attention_mask_list, labels_list = [], [], [], [], []

        for sample in samples:
            question_list.append(sample["instruction"])
            answer_list.append(sample["answer"])
            input_id_list.append(sample["input_ids"])
            attention_mask_list.append(sample["attention_mask"])
            labels_list.append(sample["labels"])

        # We have to pad the labels before calling `tokenizer.pad` as this method won't pad them and needs them of the
        # same length to return tensors.
        max_label_length = max(len(l) for l in labels_list)
        padding_side = self.tokenizer.padding_side
        padded_labels = []
        for l in labels_list:
            remainder = [-100] * (max_label_length - len(l))
            if isinstance(l, list):
                l = l + remainder if padding_side == "right" else remainder + l
            elif padding_side == "right":
                l = np.concatenate([l, remainder]).astype(np.int64)
            else:
                l = np.concatenate([remainder, l]).astype(np.int64)
            padded_labels.append(l)

        padded_samples = self.tokenizer.pad(
            {"input_ids": input_id_list, "attention_mask": attention_mask_list, "labels": padded_labels},
            return_tensors="pt",
            padding="longest",
        )

        labels = padded_samples["labels"]
        labels[labels == self.tokenizer.pad_token_id] = -100
        labels[:, 0] = -100
        return {
            "input_ids": padded_samples["input_ids"],
            "attention_mask": padded_samples["attention_mask"],
            "labels": labels,
            "instruction": question_list,
            "answer": answer_list,
        }
    
    def __getitem__(self, idx):
        question = self.questions[idx]
        img_path = self.get_img_path(question)
        image = Image.open(img_path)
        image.load()
        results = {
            "image": image,
            "question": question["question"],
            "question_id": question["question_id"],
        }
        if self.answers is not None:
            answers = self.answers[idx]
            results["answers"] = [a["answer"] for a in answers["answers"]]
        return results

[2024-01-02 13:53:05,799] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)


In [2]:
image_dir_path = "/home/yunzhi/datasets/COCO/train2014/train2014"
question_path = '/home/yunzhi/datasets/COCO/annotations/v2_OpenEnded_mscoco_train2014_questions.json'
annoatation_path = "/home/yunzhi/datasets/COCO/annotations/v2_mscoco_train2014_annotations.json"
# image_dir_path, question_path, annotations_path, is_train, dataset_name
anno = json.load(open(annoatation_path, "r"))
# pdb.set_trace()
dataset = VQADataset(image_dir_path=image_dir_path,
            question_path=question_path,
                annotations_path=annoatation_path,
                is_train=True,
                dataset_name='vqav2')

In [3]:
from Flamingo.lora_tuning import create_model_and_transforms 
from IPython.display import clear_output
model_config['lora_tuning'] = False 
model, image_processor, text_tokenizer = create_model_and_transforms(
    **model_config
)
clear_output()

In [4]:
model = model.cuda()

In [5]:
import torch 

In [14]:
img = image_processor(dataset[0]['image']).repeat(1, 1, 1, 1)
img = img.unsqueeze(1).unsqueeze(0)
img = img.cuda()

In [15]:
img.shape

torch.Size([1, 1, 1, 3, 224, 224])

In [16]:
dataset[1]['answers']

['pitcher',
 'catcher',
 'pitcher',
 'pitcher',
 'pitcher',
 'pitcher',
 'pitcher',
 'pitcher',
 'pitcher',
 'pitcher']

In [17]:
tokenizer = get_tokenizer(tokenizer_path=model_config['tokenizer_path'], cache_dir=model_config['cache_dir'])
question_sample =  [dataset[i]['question'] for i in range(4)]
answers = [dataset[i]['answers'] for i in range(4)]
print(answers)


In [12]:
prompt = ""
for i in range(3):
    prompt_qa = f"<image>Question:{question_sample[i]} Short answer:{answers[i][0]}<|endofchunk|>"
    prompt += prompt_qa
prompt += f"<image>Question:{question_sample[3]} Short answer:"
prompt

'<image>Question:What is this photo taken looking through? Short answer:net<|endofchunk|><image>Question:What position is this man playing? Short answer:pitcher<|endofchunk|><image>Question:What color is the players shirt? Short answer:orange<|endofchunk|><image>Question:Is this man a professional baseball player? Short answer:'

In [21]:
question_sample[2]

'What color is the players shirt?'

In [28]:
question = question_sample[2] 
print(question)

prompt_qa = f"<image>Question:{question} Short answer:"
# prompt_imagenet = f"<image>A photo of "
lang_x = tokenizer(prompt_qa, return_tensors='pt')
input_ids = lang_x['input_ids'].cuda()
attention_mask = lang_x['attention_mask'].cuda()
generated_text = model.generate(
    vision_x=img,
    lang_x=input_ids,
    attention_mask=attention_mask,
    max_new_tokens=5,
    num_beams=5,
)
print("Generated text: ", "[white]" + tokenizer.decode(generated_text[0]) + "[/white]")

Setting `pad_token_id` to `eos_token_id`:50277 for open-end generation.


: 

In [34]:
question_sample

['What is this photo taken looking through?',
 'What position is this man playing?',
 'What color is the players shirt?',
 'Is this man a professional baseball player?']

In [5]:
question_sample
tokenizer.add_special_tokens({'pad_token': '<pad>'})
tokenizer.padding_side = 'left'
input_ids, attention_mask = preprocess_laion_text(question_sample, tokenizer)

In [9]:
tokenizer.batch_decode(input_ids)

['<image>What is this photo taken looking through?<|endofchunk|><|endoftext|>',
 '<image>What position is this man playing?<|endofchunk|><|endoftext|><pad>',
 '<image>What color is the players shirt?<|endofchunk|><|endoftext|><pad>',
 '<image>Is this man a professional baseball player?<|endofchunk|><|endoftext|>']

In [25]:
""" 
Prompts. For captioning tasks, we format demonstrations as 
<image> Output: [caption], replacing [caption] with the ground-truth caption.
 For VQA, we format examples as <image> Question: [question] Short answer: [answer]. For HatefulMemes,
 we prompt the model with <image> is an image with: [text] written on it. Is it hateful? Answer: [answer]

 Decoding parameters. We evaluate captioning and VQA using beam search with 3 beams,
   stopping generation at 20 tokens for captioning,
    5 tokens for VQA, or whenever the model produces an <|endofchunk|> token.
   For HatefulMemes, we compute the log-likelihood of completions “yes” and “no” and answer with the most likely completion
"""
tokenizer.eos_token

'<|endoftext|>'