Having answered the questions in textual form, let's draw bounding boxes for obtained object names

# Image grounding with OFA

Download all the necessary tools and models

In [None]:
!git clone https://github.com/OFA-Sys/OFA.git

In [None]:
!mkdir -p OFA/checkpoints/
!wget https://ofa-silicon.oss-us-west-1.aliyuncs.com/checkpoints/refcocog_large_best.pt
!mv refcocog_large_best.pt OFA/checkpoints/refcocog.pt

In [None]:
!git clone https://github.com/pytorch/fairseq.git -b v0.12.0

In [None]:
cd fairseq

In [None]:
!pip -q install --use-feature=no-binary-enable-wheel-cache ./

In [None]:
cd ../OFA

In [None]:
%%capture
!sed '1d' requirements.txt | xargs -I {} pip install {}

In [None]:
!pip -q install wget

In [None]:
import torch
import numpy as np
from fairseq import utils, tasks
from fairseq import checkpoint_utils
from utils.eval_utils import eval_step
from tasks.mm_tasks.refcoco import RefcocoTask
from PIL import Image

import pandas as pd
import os
import cv2

import requests
from io import BytesIO
from tqdm.notebook import tqdm

from joblib import Parallel, delayed
import wget

In [None]:
# Register refcoco task
tasks.register_task('refcoco', RefcocoTask)

# turn on cuda if GPU is available
use_cuda = torch.cuda.is_available()
# use fp16 only when GPU is available
use_fp16 = False

In [None]:
# Load pretrained ckpt & config
overrides={"bpe_dir":"utils/BPE"}
models, cfg, task = checkpoint_utils.load_model_ensemble_and_task(
        utils.split_paths('checkpoints/refcocog.pt'),
        arg_overrides=overrides
    )

cfg.common.seed = 7

# Fix seed for stochastic decoding
if cfg.common.seed is not None and not cfg.generation.no_seed_provided:
    np.random.seed(cfg.common.seed)
    utils.set_torch_seed(cfg.common.seed)

# Move models to GPU
for model in models:
    model.eval()
    if use_fp16:
        model.half()
    if use_cuda and not cfg.distributed_training.pipeline_model_parallel:
        model.cuda()
    model.prepare_for_inference_(cfg)

# Initialize generator
generator = task.build_generator(models, cfg.generation)

In [None]:
# Image transform
from torchvision import transforms
mean = [0.5, 0.5, 0.5]
std = [0.5, 0.5, 0.5]

patch_resize_transform = transforms.Compose([
    lambda image: image.convert("RGB"),
    transforms.Resize((cfg.task.patch_image_size, cfg.task.patch_image_size), interpolation=Image.BICUBIC),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std),
])

# Text preprocess
bos_item = torch.LongTensor([task.src_dict.bos()])
eos_item = torch.LongTensor([task.src_dict.eos()])
pad_idx = task.src_dict.pad()
def encode_text(text, length=None, append_bos=False, append_eos=False):
    s = task.tgt_dict.encode_line(
        line=task.bpe.encode(text.lower()),
        add_if_not_exist=False,
        append_eos=False
    ).long()
    if length is not None:
        s = s[:length]
    if append_bos:
        s = torch.cat([bos_item, s])
    if append_eos:
        s = torch.cat([s, eos_item])
    return s

# Construct input for refcoco task
patch_image_size = cfg.task.patch_image_size
def construct_sample(image: Image, text: str):
    w, h = image.size
    w_resize_ratio = torch.tensor(patch_image_size / w).unsqueeze(0)
    h_resize_ratio = torch.tensor(patch_image_size / h).unsqueeze(0)
    patch_image = patch_resize_transform(image).unsqueeze(0)
    patch_mask = torch.tensor([True])
    src_text = encode_text(' which region does the text " {} " describe?'.format(text), append_bos=True, append_eos=True).unsqueeze(0)
    src_length = torch.LongTensor([s.ne(pad_idx).long().sum() for s in src_text])
    sample = {
        "id":np.array(['42']),
        "net_input": {
            "src_tokens": src_text,
            "src_lengths": src_length,
            "patch_images": patch_image,
            "patch_masks": patch_mask,
        },
        "w_resize_ratios": w_resize_ratio,
        "h_resize_ratios": h_resize_ratio,
        "region_coords": torch.randn(1, 4)
    }
    return sample
  
# Function to turn FP32 to FP16
def apply_half(t):
    if t.dtype is torch.float32:
        return t.to(dtype=torch.half)
    return t

In [None]:
def get_iou(bb1, bb2):
    # Taken from https://stackoverflow.com/a/42874377
    """
    Calculate the Intersection over Union (IoU) of two bounding boxes.

    Parameters
    ----------
    bb1 : dict
        Keys: {'x1', 'x2', 'y1', 'y2'}
        The (x1, y1) position is at the top left corner,
        the (x2, y2) position is at the bottom right corner
    bb2 : dict
        Keys: {'x1', 'x2', 'y1', 'y2'}
        The (x, y) position is at the top left corner,
        the (x2, y2) position is at the bottom right corner

    Returns
    -------
    float
        in [0, 1]
    """
    assert bb1['x1'] < bb1['x2']
    assert bb1['y1'] < bb1['y2']
    assert bb2['x1'] < bb2['x2']
    assert bb2['y1'] < bb2['y2']

    # determine the coordinates of the intersection rectangle
    x_left = max(bb1['x1'], bb2['x1'])
    y_top = max(bb1['y1'], bb2['y1'])
    x_right = min(bb1['x2'], bb2['x2'])
    y_bottom = min(bb1['y2'], bb2['y2'])

    if x_right < x_left or y_bottom < y_top:
        return 0.0

    # The intersection of two axis-aligned bounding boxes is always an
    # axis-aligned bounding box
    intersection_area = (x_right - x_left) * (y_bottom - y_top)

    # compute the area of both AABBs
    bb1_area = (bb1['x2'] - bb1['x1']) * (bb1['y2'] - bb1['y1'])
    bb2_area = (bb2['x2'] - bb2['x1']) * (bb2['y2'] - bb2['y1'])

    # compute the intersection over union by taking the intersection
    # area and dividing it by the sum of prediction + ground-truth
    # areas - the interesection area
    iou = intersection_area / float(bb1_area + bb2_area - intersection_area)
    assert iou >= 0.0
    assert iou <= 1.0
    return iou

In [None]:
vqa_answers = pd.read_csv('../vqa_answers.csv')

In [None]:
os.mkdir('../imgs/')
img_paths = Parallel(
    n_jobs=100)(delayed(wget.download)(img_url, out='../imgs') for img_url in tqdm(vqa_answers.image)
)

In [None]:
bounding_boxes = []
sum_iou = 0
n = 0
progress = tqdm(vqa_answers.iterrows(), total=len(vqa_answers))
for _, row in progress:
    image_path = row['image'].split('/')[-1]
    image = Image.open(os.path.join('../imgs', image_path))
    text = row['answer']

    # Construct input sample & preprocess for GPU if cuda available
    sample = construct_sample(image, text)
    sample = utils.move_to_cuda(sample) if use_cuda else sample
    sample = utils.apply_to_sample(apply_half, sample) if use_fp16 else sample

    # Run eval step for open-domain VQA
    with torch.no_grad():
        result, scores = eval_step(task, generator, models, sample)
            
    pred_box = [int(x) for x in result[0]["box"]]
    bounding_boxes.append(pred_box)
    gt_box = [row[key] for key in ('left', 'top', 'right', 'bottom')]
    iou = get_iou(
        {'x1': gt_box[0], 'y1': gt_box[1], 'x2': gt_box[2], 'y2': gt_box[3]},
        {'x1': pred_box[0], 'y1': pred_box[1], 'x2': pred_box[2], 'y2': pred_box[3]}
    )
    sum_iou += iou
    n += 1
    avg_iou = sum_iou / n * 100
    progress.set_description(f'IoU: {round(avg_iou, 2)}')

In [None]:
vqa_answers['pred_left'] = [x[0] for x in bounding_boxes]
vqa_answers['pred_top'] = [x[1] for x in bounding_boxes]
vqa_answers['pred_right'] = [x[2] for x in bounding_boxes]
vqa_answers['pred_bottom'] = [x[3] for x in bounding_boxes]
image_grounding = vqa_answers

In [None]:
image_grounding.to_csv('../ofa_image_grounding.csv', index=False)

# SAM re-annotation

Now let's improve tightness of rectangles with [SAM](https://github.com/facebookresearch/segment-anything):
1. Give SAM bounding box as a prompt
2. Get object mask as a result
3. Transform mask to bounding box

Download model and install necessary packages

In [None]:
cd ..

In [None]:
!wget "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth"

In [None]:
!pip -q install git+https://github.com/facebookresearch/segment-anything.git

In [None]:
!pip -q install opencv-python pycocotools matplotlib onnxruntime onnx

In [None]:
def get_bounding_box(mask):
    """
    Get the bounding box of a segmentation mask in the form of a NumPy bool array.
    
    Args:
        mask (NumPy array): The segmentation mask as a NumPy bool array.
        
    Returns:
        A NumPy array of the bounding box in the format [left, top, right, bottom].
    """
    
    rows = np.any(mask, axis=1)
    cols = np.any(mask, axis=0)
    left, right = np.where(cols)[0][[0, -1]]
    top, bottom = np.where(rows)[0][[0, -1]]
    
    return np.array([left, top, right, bottom])

In [None]:
def get_image_array(url):
    response = requests.get(url)
    img_bytes = BytesIO(response.content)
    img_cv2 = cv2.imdecode(np.frombuffer(img_bytes.read(), np.uint8), -1)
    img_np = np.asarray(img_cv2)
    return img_np

In [None]:
import sys
sys.path.append("..")
from segment_anything import sam_model_registry, SamPredictor

sam_checkpoint = "sam_vit_h_4b8939.pth"
model_type = "vit_h"

device = "cuda"

sam = sam_model_registry[model_type](checkpoint=sam_checkpoint)
sam.to(device=device)

predictor = SamPredictor(sam)

In [None]:
predictions = []
sum_iou = 0
n = 0
progress = tqdm(image_grounding.iterrows(), total=len(image_grounding))
for _, row in progress:
    url = row['image']
    input_box = np.array([row['pred_left'], row['pred_top'], row['pred_right'], row['pred_bottom']])
    gt_box = np.array([row['left'], row['top'], row['right'], row['bottom']])

    image = get_image_array(url)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)


    predictor.set_image(image)

    masks, _, _ = predictor.predict(
        point_coords=None,
        point_labels=None,
        box=input_box[None, :],
        multimask_output=False,
    )

    predicted_box = get_bounding_box(masks[0])
    iou = get_iou(
        {'x1': gt_box[0], 'y1': gt_box[1], 'x2': gt_box[2], 'y2': gt_box[3]},
        {'x1': predicted_box[0], 'y1': predicted_box[1], 'x2': predicted_box[2], 'y2': predicted_box[3]}
    )
    predictions.append([url] + list(predicted_box))
    sum_iou += iou
    n += 1
    avg_iou = sum_iou / n
    progress.set_description(f'IoU: {round(avg_iou, 2) * 100}')
predictions = pd.DataFrame(predictions, columns=['image', 'left', 'top', 'right', 'bottom'])

Finally, let's at the results and save them

In [None]:
predictions

In [None]:
predictions.to_csv('ofa_sam_result.csv', index=False)