## Captioning / Scene Description

In [None]:
import cv2
import json
import numpy as np
import re

from datetime import timedelta
from os import listdir, makedirs, path

from PIL import Image as PImage

VIDEO_DB_PATH = "./metadata/keyframe-500/videos.json"
OUT_PATH = "./metadata/caption-1152"
makedirs(OUT_PATH, exist_ok=True)

VIDEO_PATH = "../../vids/0801-500"
DIR_PATTERN = re.compile("^[0-3][0-9]-")

### Open Video Data

In [None]:
with open(VIDEO_DB_PATH, "r") as f:
  video_data = json.load(f)

### Init Models

In [None]:
from transformers import BlipProcessor, BlipForConditionalGeneration, BlipForQuestionAnswering
from transformers import VisionEncoderDecoderModel, ViTImageProcessor, AutoTokenizer

MODEL_NAMES = ["Salesforce/blip-image-captioning-large", "nlpconnect/vit-gpt2-image-captioning"]

CAP_MODELS = [
  {
    "model": BlipForConditionalGeneration.from_pretrained(MODEL_NAMES[0]).to("cuda"),
    "pre": BlipProcessor.from_pretrained(MODEL_NAMES[0]),
    "conditional": "image of"
  },
  {
    "model": VisionEncoderDecoderModel.from_pretrained(MODEL_NAMES[1]).to("cuda"),
    "pre": ViTImageProcessor.from_pretrained(MODEL_NAMES[1]),
    "post": AutoTokenizer.from_pretrained(MODEL_NAMES[1])
  }
]

for m in CAP_MODELS:
  m["post"] = m.get("post", m["pre"])

In [None]:
import torch
from transformers import AutoModel, AutoTokenizer

QA_MODEL_NAME = "openbmb/MiniCPM-V-2"

qa_model = AutoModel.from_pretrained(QA_MODEL_NAME, trust_remote_code=True, torch_dtype=torch.bfloat16).to("cuda", dtype=torch.bfloat16)
qa_tokenizer = AutoTokenizer.from_pretrained(QA_MODEL_NAME, trust_remote_code=True)
_ = qa_model.eval()

In [None]:
OSOI = [
  "people",
  "police officers",
  "protesters",
  "cars",
  "flags",
  "tables",
  "chairs",
  "mirrors",
  "windows",
  "doors",
  "ramps",
  "stairs",
  "elevators",
  "support columns",
  "paintings",
  "statues"
]
questions = [f"are {obj} in the image?" for obj in OSOI]

### Run Caption

In [None]:
%%time

input_dirs = sorted([d for d in listdir(VIDEO_PATH) if DIR_PATTERN.search(d) is not None])
mLang = "en"

for io_dir in input_dirs[:1]:
  output_dir_path = path.join(OUT_PATH, io_dir)
  makedirs(output_dir_path, exist_ok=True)

  input_dir_path = path.join(VIDEO_PATH, io_dir)
  input_files = sorted([f for f in listdir(input_dir_path) if f.endswith("mp4")])

  for io_file in input_files:
    input_file_path = path.join(input_dir_path, io_file)
    output_file_path = path.join(output_dir_path, io_file.replace(".mp4", ".json"))

    if io_file not in video_data:
      print(io_file, "not in video_data")
      continue

    if path.isfile(output_file_path):
      continue

    print(io_dir, io_file)

    vid = cv2.VideoCapture(input_file_path)
    vw = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))
    vh = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
    frame_count = int(vid.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = int(vid.get(cv2.CAP_PROP_FPS))
    rep_frames = video_data[io_file]["representative_frames"]
    print(len(rep_frames))

    cap_languages = set(video_data[io_file].get("caption_languages", []))
    cap_languages.add(mLang)
    video_data[io_file]["caption_languages"] = list(cap_languages)

    vid.set(cv2.CAP_PROP_POS_FRAMES, 0)
    for frame_data in rep_frames:
      frame_data["captions"] = frame_data.get("captions", {})
      frame_data["captions"][mLang] = []

      frameIdx = frame_data["index"]
      vid.set(cv2.CAP_PROP_POS_FRAMES, frameIdx)
      _, frame = vid.read()
      frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
      image = PImage.fromarray(frame)

      for m in CAP_MODELS:
        if "conditional" in m:
          input = m["pre"](image, m["conditional"], return_tensors="pt").to("cuda")
          cap_offset = len(m["conditional"]) + 1
        else:
          input = m["pre"](image, return_tensors="pt").to("cuda")
          cap_offset = 0
        cap_out = m["model"].generate(**input, max_length=50)
        caption = m["post"].decode(cap_out[0], skip_special_tokens=True)
        frame_data["captions"][mLang].append(caption[cap_offset:])

      msgs = [
        {'role': 'user', 'content': "The following image was taken during a protest."},
        {'role': 'user', 'content': "Give a short description of the image."},
        {'role': 'user', 'content': "Don't mention sports or winter."},
      ]

      caption, _, _ = qa_model.chat(
        image=image,
        msgs=msgs,
        max_length=32,
        context=None,
        tokenizer=qa_tokenizer,
        sampling=True,
        temperature=0.1
      )
      caption = caption[:caption.find(".") + 1]
      caption = caption[:caption.find(", possibly")]
      frame_data["captions"][mLang].append(caption)

      answers = []
      for o in OSOI:
        question = f'using only yes or no, are there any {o} in the image?'
        msgs = [{'role': 'user', 'content': question}]
        res, _, _ = qa_model.chat(
          image=image,
          msgs=msgs,
          context=None,
          tokenizer=qa_tokenizer,
          sampling=True,
          temperature=0.1
        )
        answers.append(res.split(',')[0].lower())

      frame_data["objects"] = [o for o,a in zip(OSOI, answers) if a == "yes"]

    with open(output_file_path, "w") as of:
      json.dump(video_data[io_file]["representative_frames"], of, sort_keys=True, indent=2, separators=(',',':'))

    vid.release()

In [None]:
video_data

### Post-Process: add to metadata

In [None]:
import json
import re

from os import listdir, path

VIDEO_DB_PATH_IN = "./metadata/keyframe-500/videos.json"

CAPTION_PATH = "./metadata/caption-1152"
VIDEO_DB_PATH_OUT = path.join(CAPTION_PATH, "videos.json")

DIR_PATTERN = re.compile("^[0-3][0-9]-")

In [None]:
# open all caption files
cap_data = {}

input_dirs = sorted([d for d in listdir(CAPTION_PATH) if DIR_PATTERN.search(d) is not None])

for io_dir in input_dirs:
  input_dir_path = path.join(CAPTION_PATH, io_dir)
  input_files = sorted([f for f in listdir(input_dir_path) if f.endswith("json")])

  for io_file in input_files:
    input_file_path = path.join(input_dir_path, io_file)
    video_key = io_file.replace("json", "mp4")
    with open(input_file_path, "r") as f:
      cap_data[video_key] = json.load(f)

In [None]:
with open(VIDEO_DB_PATH_IN, "r") as f:
  video_data = json.load(f)

In [None]:
for k, vdata in video_data.items():
  if k not in cap_data:
    print(k, "has no caption info")
  else:
    video_data[k]["representative_frames"] = cap_data[k]

In [None]:
with open(VIDEO_DB_PATH_OUT, "w") as f:
  json.dump(video_data, f, indent=2, separators=(',',':'))

### Post-Process: create objects json

In [None]:
import json

from os import path

CAPTION_PATH = "./metadata/caption-1152"
VIDEO_DB_PATH_IN = path.join(CAPTION_PATH, "videos.json")
OBJ_PATH_OUT = path.join(CAPTION_PATH, "objects.json")

with open(VIDEO_DB_PATH_IN, "r") as f:
  video_data = json.load(f)

In [None]:
def get_timestamp(pos, seek):
  lt = [[ts,s] for ts,s in seek if s < pos]
  gt = [[ts,s] for ts,s in seek if s > pos]
  ts0, pos0 = lt[-1]
  ts1, pos1 = gt[0]
  return int(((pos - pos0) / (pos1 - pos0)) * (ts1 - ts0) + ts0)

### Files/Frames/Objects

In [None]:
file_data = []
frame_data = []
obj_data = {}

for vid,data in video_data.items():
  m_path = f"{data['camera']}/{data['name']}"
  m_file_key = len(file_data)
  file_data.append(m_path)
  for f in data["representative_frames"]:
    m_frame = f["index"]
    m_pos = f["index"] / data["fps"]
    m_timestamp = get_timestamp(m_pos, data["seek"])
    m_caption = f["captions"]["en"][-1].replace("The image shows ", "")
    m_frame_key = len(frame_data)
    frame_data.append({
      "file": m_file_key,
      "frame": m_frame,
      "time": m_pos,
      "timestamp": m_timestamp,
      "caption": m_caption
    })

    for o in f["objects"]:
      if o not in obj_data:
        obj_data[o] = []
      obj_data[o].append({
        "frame": m_frame_key,
        "timestamp": m_timestamp
      })

In [None]:
for k,v in obj_data.items():
  sorted_by_ts = sorted(v, key=lambda x: x["timestamp"])
  obj_data[k] = [x["frame"] for x in sorted_by_ts]

In [None]:
out_data = {
  "files": file_data,
  "frames": frame_data,
  "objects": obj_data
}

In [None]:
with open(OBJ_PATH_OUT, "w") as f:
  json.dump(out_data, f, separators=(',',':'), sort_keys=True)

### Flat

In [None]:
obj_data = {}

for k,data in video_data.items():
  for f in data["representative_frames"]:
    m_frame = f["index"]
    m_pos = f["index"] / data["fps"]
    m_timestamp = get_timestamp(m_pos, data["seek"])

    for o in f["objects"]:
      if o not in obj_data:
        obj_data[o] = []
      
      obj_data[o].append({
        "file": f"{data['camera']}/{data['name']}",
        "frame": m_frame,
        "time": m_pos,
        "timestamp": m_timestamp
      })

In [None]:
for k,v in obj_data.items():
  obj_data[k] = sorted(v, key=lambda x: x["timestamp"])

In [None]:
with open(OBJ_PATH_OUT.replace(".json", "_flat.json"), "w") as f:
  json.dump(obj_data, f, indent=2, separators=(',',':'), sort_keys=True)

### Model tests

In [None]:
input_dir = sorted([d for d in listdir(VIDEO_PATH) if DIR_PATTERN.search(d) is not None])[0]
input_dir_path = path.join(VIDEO_PATH, input_dir)
input_files = sorted([f for f in listdir(input_dir_path) if f.endswith("mp4")])
input_file_path = path.join(input_dir_path, input_files[0])

vid = cv2.VideoCapture(input_file_path)
vid.set(cv2.CAP_PROP_POS_FRAMES, 140)
_, frame = vid.read()

frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
image = PImage.fromarray(frame)

vid.release()

In [None]:
from IPython.display import display, Image
display(image)

In [None]:
from transformers import BlipProcessor, BlipForConditionalGeneration

MODEL = "Salesforce/blip-image-captioning-large"

processor = BlipProcessor.from_pretrained(MODEL)
model = BlipForConditionalGeneration.from_pretrained(MODEL).to("cuda")

input = processor(image, "image of", return_tensors="pt").to("cuda")

out = model.generate(**input, max_length=50)
caption = processor.decode(out[0], skip_special_tokens=True)
print(caption)

In [None]:
from transformers import BlipProcessor, BlipForQuestionAnswering

MODEL = "Salesforce/blip-vqa-capfilt-large"

processor = BlipProcessor.from_pretrained(MODEL)
model = BlipForQuestionAnswering.from_pretrained(MODEL).to("cuda")

question = "are people in the image?"
inputs = processor(image, question, return_tensors="pt").to("cuda")

out = model.generate(**inputs, max_length=32)
print(processor.decode(out[0], skip_special_tokens=True))

In [None]:
%%time
images = [image] * len(questions)
inputs = processor(images, questions, padding=True, return_tensors="pt").to("cuda")

out = model.generate(**inputs, max_length=32)
answers = processor.batch_decode(out, skip_special_tokens=True)
objs = [o for o,a in zip(OSOI, answers) if a == "yes"]
print(objs)

In [None]:
from transformers import AutoProcessor, AutoModelForCausalLM

MODEL = "microsoft/git-large-coco"

processor = AutoProcessor.from_pretrained(MODEL)
model = AutoModelForCausalLM.from_pretrained(MODEL).to("cuda")

input = processor(images=image, return_tensors="pt").to("cuda")

out = model.generate(**input, max_length=50)
caption = processor.decode(out[0], skip_special_tokens=True)
print(caption)

In [None]:
from transformers import VisionEncoderDecoderModel, ViTImageProcessor, AutoTokenizer

MODEL = "nlpconnect/vit-gpt2-image-captioning"

model = VisionEncoderDecoderModel.from_pretrained(MODEL).to("cuda")
processor = ViTImageProcessor.from_pretrained(MODEL)
tokenizer = AutoTokenizer.from_pretrained(MODEL)

input = processor(images=image, return_tensors="pt").to("cuda")

out = model.generate(**input, max_length=50)
caption = tokenizer.decode(out[0], skip_special_tokens=True)
print(caption)

In [None]:
import torch
from transformers import AutoModel, AutoTokenizer

MODEL = "openbmb/MiniCPM-V-2"

model = AutoModel.from_pretrained(MODEL, trust_remote_code=True, torch_dtype=torch.bfloat16)
model = model.to(device='cuda', dtype=torch.bfloat16)
tokenizer = AutoTokenizer.from_pretrained(MODEL, trust_remote_code=True)
_ = model.eval()

In [None]:
%%time

msgs = [
    {'role': 'user', 'content': "The following image was taken during a protest."},
    {'role': 'user', 'content': "Give a short description of the image."},
    {'role': 'user', 'content': "Don't mention sports or winter."},
]

caption, _, _ = qa_model.chat(
  image=image,
  msgs=msgs,
  max_length=32,
  context=None,
  tokenizer=qa_tokenizer,
  sampling=True,
  temperature=0.1
)
print(caption)

In [None]:
%%time

answers = []
for o in OSOI:
  question = f'using only yes or no, are there any {o} in the image?'
  msgs = [{'role': 'user', 'content': question}]

  res, context, _ = model.chat(
    image=image,
    msgs=msgs,
    context=None,
    tokenizer=tokenizer,
    sampling=True,
    temperature=0.1
  )
  print(res.split(',')[0].lower())
  answers.append(res.split(',')[0].lower())

objs = [o for o,a in zip(OSOI, answers) if a == "yes"]
print(objs)


### Time Tests

In [None]:
input_dir = sorted([d for d in listdir(VIDEO_PATH) if DIR_PATTERN.search(d) is not None])[0]
input_dir_path = path.join(VIDEO_PATH, input_dir)
input_files = sorted([f for f in listdir(input_dir_path) if f.endswith("mp4")])

In [None]:
%%time

for io_file in input_files:
  input_file_path = path.join(input_dir_path, io_file)
  vid = cv2.VideoCapture(input_file_path)
  rep_frames = video_data[io_file]["representative_frames"]

  vid.set(cv2.CAP_PROP_POS_FRAMES, 0)
  for frame_data in rep_frames:
    frameIdx = frame_data["index"]
    vid.set(cv2.CAP_PROP_POS_FRAMES, frameIdx)
    _, frame = vid.read()
    frame_grey = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
  vid.release()

In [None]:
%%time

for io_file in input_files:
  input_file_path = path.join(input_dir_path, io_file)
  vid = cv2.VideoCapture(input_file_path)
  rep_frames = video_data[io_file]["representative_frames"]
  rep_frame_idxs = [f["index"] for f in rep_frames]

  vid.set(cv2.CAP_PROP_POS_FRAMES, 0)
  for frameIdx in range(0, frame_count):
    _, frame = vid.read()
    if frameIdx not in rep_frame_idxs:
      continue
    else:
      frame_grey = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
  vid.release()