In [1]:
import torch
from transformers import VisionEncoderDecoderModel, AutoTokenizer, ViTFeatureExtractor
from PIL import Image
import cv2
import asyncio
import time
import threading
import queue

In [2]:
# Configuration
MODEL_DIR = "VIT_large_gpt2"

# Load the model and tokenizer
feature_extractor = ViTFeatureExtractor.from_pretrained("google/vit-base-patch16-224")
model = VisionEncoderDecoderModel.from_pretrained(MODEL_DIR)
model = model.to("cuda")



In [3]:
def build_inputs_with_special_tokens(self, token_ids_0, token_ids_1=None):
    outputs = [self.bos_token_id] + token_ids_0 + [self.eos_token_id]
    return outputs


AutoTokenizer.build_inputs_with_special_tokens = build_inputs_with_special_tokens
tokenizer = AutoTokenizer.from_pretrained("gpt2")



In [4]:
# Prediction function
def predict_step(image):
    if image.mode != "RGB":
        image = image.convert(mode="RGB")
    pixel_values = feature_extractor(images=[image], return_tensors="pt").pixel_values
    pixel_values = pixel_values.to("cuda")
    preds = tokenizer.decode(
        model.generate(
            feature_extractor(image, return_tensors="pt").pixel_values.to("cuda")
        )[0]
    )
    return preds[: preds.find(".") + 1].replace("<|endoftext|>", "")


# Multithreading for running captioning seperately
def caption_thread(frame_queue, caption_queue, stop_event):
    while not stop_event.is_set():
        try:
            frame = frame_queue.get(timeout=1)
            if frame is None:
                break
            pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            caption = predict_step(pil_image)
            caption_queue.put(caption)
            frame_queue.task_done()
        except queue.Empty:
            continue

In [5]:
# Opening Webcam and making new thread
cap = cv2.VideoCapture(0)
frame_queue = queue.Queue(maxsize=1)
caption_queue = queue.Queue(maxsize=1)
stop_event = threading.Event()

thread = threading.Thread(
    target=caption_thread, args=(frame_queue, caption_queue, stop_event), daemon=True
)
thread.start()

last_caption_time = time.time()
caption = ""

# Running model on webcam
while True:
    ret, frame = cap.read()
    if not ret:
        break
    if frame_queue.empty():
        frame_queue.put(frame)

    # Mirror the frame
    frame = cv2.flip(frame, 1)

    current_time = time.time()
    if current_time - last_caption_time >= 1:
        last_caption_time = current_time
        try:
            caption = caption_queue.get_nowait()
        except queue.Empty:
            pass

    if caption:
        (w, h), _ = cv2.getTextSize(caption, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
        x, y = 10, frame.shape[0] - 10
        overlay = frame.copy()
        cv2.rectangle(overlay, (x, y - h - 10), (x + w + 10, y + 10), (0, 255, 0), -1)
        cv2.addWeighted(overlay, 0.5, frame, 0.5, 0, frame)
        cv2.putText(
            frame,
            caption,
            (x + 5, y - 5),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.6,
            (0, 0, 0),
            1,
            cv2.LINE_AA,
        )

    cv2.imshow("Webcam Captioning", frame)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        stop_event.set()
        break

frame_queue.put(None)
thread.join()
cap.release()
cv2.destroyAllWindows()

The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


: 