In [None]:
from transformers import AutoProcessor, AutoModelForCausalLM, BitsAndBytesConfig
from PIL import Image
import torch
import cv2
import os
import time
%matplotlib inline  

In [None]:
torch.backends.cudnn.benchmark = True

In [None]:
import os
os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:True")

In [None]:
MODEL_ID = "microsoft/Florence-2-base-ft"
DEVICE   = "cuda" if torch.cuda.is_available() else "cpu"
DTYPE    = torch.float16 if DEVICE == "cuda" else torch.float32

# 1) Load Florence processor and model once
processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True)

bnb = BitsAndBytesConfig(
    load_in_4bit=True,
    # bnb_4bit_use_double_quant=True,
    # bnb_4bit_quant_type="nf4",
    # bnb_4bit_compute_dtype=torch.float16
)

# load the model in the quantization specified above 
model = AutoModelForCausalLM.from_pretrained(MODEL_ID, trust_remote_code=True, quantization_config=bnb, low_cpu_mem_usage=True).eval()

model.to(DEVICE) # move 4-bit model to GPU 

print(f"device_maps: {model.hf_device_map}")

def get_vram(): 
    if torch.cuda.is_available(): 
        print(f'Memory summary: {torch.cuda.memory_summary()}')
    else: 
        print("cuda unavailable")

def get_targeted_mem_stats():
    if torch.cuda.is_available(): 
        print(f'Total allocated: {torch.cuda.memory_allocated() / 1024 / 1024} MB')
        print(f'Total reserved: {torch.cuda.memory_reserved() / 1024 / 1024} MB')

def _warmup(img):
    """Optional one-time warmup to stabilize kernel caching / allocator behavior."""
    print("Warming up model...")
    inputs = processor(text="Caption", images=img, return_tensors="pt").to(DEVICE, DTYPE)
    with torch.inference_mode():
        _ = model.generate(**inputs, max_new_tokens=8)
    if DEVICE == "cuda":
        torch.cuda.synchronize()
    print("Warmup complete.\n")

def capture_frame(max_side=640):
    """Capture one webcam frame and return a resized PIL.Image."""
    cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)  # DSHOW = faster on Windows
    cap.set(cv2.CAP_PROP_FRAME_WIDTH,  640) # frame width and heigh in pixels do i need to change? 
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
    t0 = time.time()

    ok, frame = cap.read()
    cap.release()

    if not ok:
        raise RuntimeError("Could not read from webcam")

    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    h, w = frame.shape[:2]
    scale = max_side / max(h, w)
    if scale < 1.0:
        frame = cv2.resize(frame, (int(w*scale), int(h*scale)), interpolation=cv2.INTER_AREA)

    print(f"[capture] {time.time() - t0:.3f}s")
    return Image.fromarray(frame)

def caption_image(img, max_new_tokens=24):
    """Run greedy captioning on the GPU using Florence-2."""
    inputs = processor(text='<CAPTION>', images=img, return_tensors="pt").to(DEVICE, DTYPE)

    t0 = time.time()
    with torch.inference_mode():
        out = model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            do_sample=False,
            num_beams=1,        # greedy
            use_cache=True,
            early_stopping=False
        )
    if DEVICE == "cuda":
        torch.cuda.synchronize()
    gen_time = time.time() - t0
    print(f"[generate] {gen_time:.3f}s")

    t1 = time.time()
    text = processor.batch_decode(out, skip_special_tokens=True)[0]
    print(f"[decode] {time.time() - t1:.3f}s")
    return text

def reset_cache():
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        torch.cuda.reset_peak_memory_stats()

def main():
    # Optional warmup on a single frame to stabilize timings
    reset_cache()
    
    print("VRAM usage after cache clear")
    # get_vram()
    get_targeted_mem_stats()

    warmup_img = capture_frame()
    _warmup(warmup_img)

    # reset_cache()
    print("Starting Florence captioning loop. Press 'q' in the console to quit.\n")
    print("VRAM usage after loading")
    # get_vram()
    get_targeted_mem_stats()
    
    while True:
        img = capture_frame(max_side=512)
        caption = caption_image(img)
        print(f"üìù Caption: {caption}\n")

        # Non-blocking check for user quit
        if cv2.waitKey(1) & 0xFF == ord('q'):
            print("Quitting...")
            break

    cv2.destroyAllWindows()


In [None]:
print("Any param device:", next(model.parameters()).device)  # should be cuda:0

In [None]:
main()