# new code complete

In [1]:
# Import necessary libraries
import os
import cv2
import torch
from transformers import AutoModelForCausalLM, AutoProcessor
from PIL import Image, ImageDraw, ImageFont
import supervision as sv
import numpy as np
from IPython.display import HTML
import base64
import logging

# Set HOME directory
HOME = os.getcwd()

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


In [2]:
def initialize_model(checkpoint="microsoft/Florence-2-large-ft", device=None):
    """
    Initialize the Florence-2 model and processor.

    Parameters:
    - checkpoint: The model checkpoint to use.
    - device: The device to run the model on.

    Returns:
    - model: The initialized model.
    - processor: The initialized processor.
    - device: The device used.
    """
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logger.info(f"Using device: {device}")

    # Load the model and processor
    model = AutoModelForCausalLM.from_pretrained(checkpoint, trust_remote_code=True).to(device)
    processor = AutoProcessor.from_pretrained(checkpoint, trust_remote_code=True)

    return model, processor, device


In [3]:
def process_frame(frame, model, processor, device, task, text):
    """
    Process a single video frame with the specified task.

    Parameters:
    - frame: The video frame to process.
    - model: The initialized model.
    - processor: The initialized processor.
    - device: The device to run the model on.
    - task: The task to perform (e.g., "<OD>", "<DETAILED_CAPTION>", etc.).
    - text: The text input for the task.

    Returns:
    - frame_bgr: The processed frame in BGR format.
    """
    # Convert the frame to RGB and then to a PIL Image
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    image = Image.fromarray(frame_rgb)

    # Preprocess the input for the model
    prompt = task + text
    inputs = processor(text=prompt, images=image, return_tensors="pt").to(device)

    # Generate results
    generated_ids = model.generate(
        input_ids=inputs["input_ids"],
        pixel_values=inputs["pixel_values"],
        max_new_tokens=1024,
        num_beams=3
    )

    # Post-process the generated text
    generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
    response = processor.post_process_generation(generated_text, task=task, image_size=image.size)

    # Handle different tasks based on the response
    if task in ["<OD>", "<OPEN_VOCABULARY_DETECTION>", "<CAPTION_TO_PHRASE_GROUNDING>", "<REGION_PROPOSAL>", "<REGION_TO_CATEGORY>", "<REGION_TO_DESCRIPTION>"]:
        # Tasks that output bounding boxes
        detections = sv.Detections.from_lmm(sv.LMM.FLORENCE_2, response, resolution_wh=image.size)
        bounding_box_annotator = sv.BoundingBoxAnnotator(color_lookup=sv.ColorLookup.INDEX)
        label_annotator = sv.LabelAnnotator(color_lookup=sv.ColorLookup.INDEX)
        image = bounding_box_annotator.annotate(image, detections)
        image = label_annotator.annotate(image, detections)

    elif task == "<REFERRING_EXPRESSION_SEGMENTATION>":
        # Task that outputs segmentation masks
        detections = sv.Detections.from_lmm(sv.LMM.FLORENCE_2, response, resolution_wh=image.size)
        mask_annotator = sv.MaskAnnotator(color_lookup=sv.ColorLookup.INDEX)
        label_annotator = sv.LabelAnnotator(color_lookup=sv.ColorLookup.INDEX)
        image = mask_annotator.annotate(image, detections)
        image = label_annotator.annotate(image, detections)

    elif task in ["<OCR_WITH_REGION>"]:
        # Task that outputs OCR results with regions
        detections = sv.Detections.from_lmm(sv.LMM.FLORENCE_2, response, resolution_wh=image.size)
        bounding_box_annotator = sv.BoundingBoxAnnotator(color_lookup=sv.ColorLookup.INDEX)
        label_annotator = sv.LabelAnnotator(color_lookup=sv.ColorLookup.INDEX, text_scale=1.5, text_thickness=2)
        image = bounding_box_annotator.annotate(image, detections)
        image = label_annotator.annotate(image, detections)

    elif task in ["<OCR>", "<DETAILED_CAPTION>", "<MORE_DETAILED_CAPTION>", "<DENSE_REGION_CAPTION>"]:
        # Tasks that output text captions
        caption = response.get(task, "")
        draw = ImageDraw.Draw(image)
        font = ImageFont.load_default()
        text_position = (10, 10)
        draw.text(text_position, caption, fill="red", font=font)

    else:
        # For any other tasks, we can print the response or handle accordingly
        logger.warning(f"Unhandled task or output format for task: {task}")

    # Convert back to OpenCV format (BGR) for saving the video
    frame_bgr = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)

    return frame_bgr


In [4]:
def process_video(input_video_path, output_video_path, model, processor, device, task, text, frame_step=1):
    """
    Process a video with the specified task.

    Parameters:
    - input_video_path: Path to the input video file.
    - output_video_path: Path to save the processed video.
    - model: The initialized model.
    - processor: The initialized processor.
    - device: Device to run the model on.
    - task: The task to perform (e.g., "<OD>", "<DETAILED_CAPTION>", etc.).
    - text: The text input for the task.
    - frame_step: Process every Nth frame (default is 1, i.e., process every frame).
    """
    cap = cv2.VideoCapture(input_video_path)
    if not cap.isOpened():
        logger.error(f"Error opening video file {input_video_path}")
        return

    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    logger.info(f"Total number of frames: {frame_count}")

    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Prepare to save the processed video
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for MP4
    out = cv2.VideoWriter(output_video_path, fourcc, fps / frame_step, (width, height))

    # Process frames
    frame_idx = 0
    processed_frames = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_idx % frame_step == 0:
            logger.info(f"Processing frame {frame_idx+1}/{frame_count}")
            processed_frame = process_frame(frame, model, processor, device, task, text)
            processed_frames += 1
        else:
            processed_frame = frame  # Use the original frame if not processing

        # Write the frame to the output video
        out.write(processed_frame)
        frame_idx += 1

    # Release video capture and writer objects
    cap.release()
    out.release()
    logger.info(f"Processed video saved to: {output_video_path}")
    logger.info(f"Total processed frames: {processed_frames}")


In [5]:
def display_video(video_path, video_width=600):
    """
    Display a video inside the notebook.

    Parameters:
    - video_path: Path to the video file.
    - video_width: Width of the video in the display.

    Returns:
    - HTML object to display the video.
    """
    video_file = open(video_path, "rb").read()
    data_url = "data:video/mp4;base64," + base64.b64encode(video_file).decode()
    return HTML(f"""
    <video width={video_width} controls>
          <source src="{data_url}" type="video/mp4">
    </video>
    """)


In [13]:
# Initialize model and processor
model, processor, device = initialize_model()


Using device: cuda


In [7]:
# Create data directory if it doesn't exist
data_dir = os.path.join(HOME, "data")
os.makedirs(data_dir, exist_ok=True)

# Download the video
video_url = "https://videos.pexels.com/video-files/3015510/3015510-hd_1920_1080_24fps.mp4"
input_video_path = os.path.join(data_dir, "hot_air_balloons.mp4")
!wget -q {video_url} -O {input_video_path}


In [12]:
# Define task and text
# You can choose any task from the extracted task list
# For example, Object Detection:
task = "<OD>"
text = "<OD>"

# For Detailed Captioning:
# task = "<DETAILED_CAPTION>"
# text = ""

# For Referring Expression Segmentation:
# task = "<REFERRING_EXPRESSION_SEGMENTATION>"
# text = "person"

# Output video path
output_video_path = os.path.join(data_dir, "processed_hot_air_balloons.mp4")

# Process the video
process_video(
    input_video_path=input_video_path,
    output_video_path=output_video_path,
    model=model,
    processor=processor,
    device=device,
    task=task,
    text=text,
    frame_step=1  # Process every frame; increase for faster processing
)


Total number of frames: 464
Processing frame 464/464...
Processed video saved to: /home/user1/hamza/Large-Vision-Models/data/processed_hot_air_balloons1.mp4
Total processed frames: 464


In [16]:
# Define a list of tasks and corresponding texts
# Supported tasks list
tasks = [
    # Object Detection
    # {"task": "<OD>", "text": "<OD>"},

    # Caption to Phrase Grounding
    {"task": "<CAPTION_TO_PHRASE_GROUNDING>", "text": "person"},

    # Dense Region Captioning
    {"task": "<DENSE_REGION_CAPTION>", "text": ""},

    # Region Proposal
    {"task": "<REGION_PROPOSAL>", "text": ""},

    # OCR with Region
    {"task": "<OCR_WITH_REGION>", "text": ""},

    # Referring Expression Segmentation
    {"task": "<REFERRING_EXPRESSION_SEGMENTATION>", "text": "person"},

    # Region to Segmentation
    {"task": "<REGION_TO_SEGMENTATION>", "text": ""},

    # Open Vocabulary Detection
    {"task": "<OPEN_VOCABULARY_DETECTION>", "text": "person"},

    # Region to Category
    {"task": "<REGION_TO_CATEGORY>", "text": ""},

    # Region to Description
    {"task": "<REGION_TO_DESCRIPTION>", "text": ""}
]

# Loop through each task and process the video
for item in tasks:
    task = item["task"]
    text = item["text"]
    
    # Generate a task-specific output video file name
    task_name = task.strip("<>").replace("_", "").lower()
    output_video_path = os.path.join(data_dir, f"processed_hot_air_balloons_{task_name}.mp4")
    
    print(f"Processing task: {task} with text: '{text}'")
    
    # Process the video
    process_video(
        input_video_path=input_video_path,
        output_video_path=output_video_path,
        model=model,
        processor=processor,
        device=device,
        task=task,
        text=text,
        frame_step=1  # Process every frame; increase for faster processing
    )
    
    # Optionally, display the processed video inside the notebook
    # display_video(output_video_path)


Processing task: <CAPTION_TO_PHRASE_GROUNDING> with text: 'person'
Total number of frames: 464
Processing frame 464/464...
Processed video saved to: /home/user1/hamza/Large-Vision-Models/data/processed_hot_air_balloons_captiontophrasegrounding.mp4
Total processed frames: 464
Processing task: <DENSE_REGION_CAPTION> with text: ''
Total number of frames: 464
Processing frame 464/464...
Processed video saved to: /home/user1/hamza/Large-Vision-Models/data/processed_hot_air_balloons_denseregioncaption.mp4
Total processed frames: 464
Processing task: <REGION_PROPOSAL> with text: ''
Total number of frames: 464
Processing frame 464/464...
Processed video saved to: /home/user1/hamza/Large-Vision-Models/data/processed_hot_air_balloons_regionproposal.mp4
Total processed frames: 464
Processing task: <OCR_WITH_REGION> with text: ''
Total number of frames: 464
Processing frame 464/464...
Processed video saved to: /home/user1/hamza/Large-Vision-Models/data/processed_hot_air_balloons_ocrwithregion.mp4


In [None]:
output_video_path = os.path.join(data_dir, "processed_hot_air_balloons.mp4")

# Display the processed video inside the notebook
display_video(output_video_path)


### **Usage Notes**

- **Different Tasks**: You can change the `task` and `text` variables to perform different tasks. For example:
  - **Object Detection**:
    ```python
    task = "<OD>"
    text = "<OD>"
    ```
  - **Detailed Captioning**:
    ```python
    task = "<DETAILED_CAPTION>"
    text = ""
    ```
  - **Referring Expression Segmentation**:
    ```python
    task = "<REFERRING_EXPRESSION_SEGMENTATION>"
    text = "person"
    ```
  - **OCR**:
    ```python
    task = "<OCR>"
    text = ""
    ```
  - **Open Vocabulary Detection**:
    ```python
    task = "<OPEN_VOCABULARY_DETECTION>"
    text = "hot air balloon"
    ```
- **Processing Fewer Frames**: To process fewer frames for faster processing, increase the `frame_step` parameter in the `process_video` function. For example, `frame_step=5` will process every 5th frame.
- **Viewing the Video**: The `display_video` function embeds the video directly into the Jupyter notebook for easy viewing.


In [1]:
# Import necessary libraries
import os
import cv2
import torch
from transformers import AutoModelForCausalLM, AutoProcessor
from PIL import Image
import supervision as sv
import numpy as np

# Set HOME directory
HOME = os.getcwd()

# Function to initialize model and processor
def initialize_model(checkpoint="microsoft/Florence-2-large-ft", device=None):
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    model = AutoModelForCausalLM.from_pretrained(checkpoint, trust_remote_code=True).to(device)
    processor = AutoProcessor.from_pretrained(checkpoint, trust_remote_code=True)

    return model, processor, device

# Function to run inference on an image
def run_inference(image: Image, model, processor, device, task: str, text: str = ""):
    prompt = task + text
    inputs = processor(text=prompt, images=image, return_tensors="pt").to(device)
    generated_ids = model.generate(
        input_ids=inputs["input_ids"],
        pixel_values=inputs["pixel_values"],
        max_new_tokens=1024,
        num_beams=3
    )
    generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
    response = processor.post_process_generation(generated_text, task=task, image_size=image.size)
    return response

# Function to process a single frame
def process_frame(frame, model, processor, device, task, text):
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    image = Image.fromarray(frame_rgb)

    response = run_inference(image=image, model=model, processor=processor, device=device, task=task, text=text)
    detections = sv.Detections.from_lmm(sv.LMM.FLORENCE_2, response, resolution_wh=image.size)

    mask_annotator = sv.MaskAnnotator(color_lookup=sv.ColorLookup.INDEX)
    label_annotator = sv.LabelAnnotator(color_lookup=sv.ColorLookup.INDEX)
    image = mask_annotator.annotate(image, detections)
    image = label_annotator.annotate(image, detections)

    frame_bgr = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
    return frame_bgr

# Function to process a video
def process_video(input_video_path, output_video_path, model, processor, device, task, text, frame_step=1):
    cap = cv2.VideoCapture(input_video_path)
    if not cap.isOpened():
        print(f"Error opening video file {input_video_path}")
        return

    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"Total number of frames: {frame_count}")

    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps / frame_step, (width, height))

    frame_idx = 0
    processed_frames = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_idx % frame_step == 0:
            print(f"Processing frame {frame_idx+1}/{frame_count}...", end="\r")
            processed_frame = process_frame(frame, model, processor, device, task, text)
            processed_frames += 1
        else:
            processed_frame = frame

        out.write(processed_frame)
        frame_idx += 1

    cap.release()
    out.release()
    print(f"\nProcessed video saved to: {output_video_path}")
    print(f"Total processed frames: {processed_frames}")

# Main code execution
model, processor, device = initialize_model()

# Create data directory if it doesn't exist
data_dir = os.path.join(HOME, "data")
os.makedirs(data_dir, exist_ok=True)

# Download the video
video_url = "https://videos.pexels.com/video-files/3015510/3015510-hd_1920_1080_24fps.mp4"
input_video_path = os.path.join(data_dir, "hot_air_balloons.mp4")
if not os.path.exists(input_video_path):
    print("Downloading video...")
    os.system(f"wget -q {video_url} -O {input_video_path}")
else:
    print("Video already downloaded.")

# Define task and text
task = "<REFERRING_EXPRESSION_SEGMENTATION>"
text = "person"

# Output video path
output_video_path = os.path.join(data_dir, "referring_expression_segmentation.mp4")

# Process the video
process_video(
    input_video_path=input_video_path,
    output_video_path=output_video_path,
    model=model,
    processor=processor,
    device=device,
    task=task,
    text=text,
    frame_step=1
)


Using device: cuda


Florence2LanguageForConditionalGeneration has generative capabilities, as `prepare_inputs_for_generation` is explicitly overwritten. However, it doesn't directly inherit from `GenerationMixin`. From 👉v4.50👈 onwards, `PreTrainedModel` will NOT inherit from `GenerationMixin`, and this model will lose the ability to call `generate` and other related functions.
  - If you are the owner of the model architecture code, please modify your model class such that it inherits from `GenerationMixin` (after `PreTrainedModel`, otherwise you'll get an exception).
  - If you are not the owner of the model architecture class, please contact the model code owner to update it.


Video already downloaded.
Total number of frames: 464
Processing frame 3/464...

This is a friendly reminder - the current text generation call will exceed the model's predefined maximum length (1024). Depending on the model, you may observe exceptions, performance degradation, or nothing at all.


Processing frame 464/464...
Processed video saved to: /home/user1/hamza/Large-Vision-Models/data/referring_expression_segmentation.mp4
Total processed frames: 464


Captions

In [None]:
# Import necessary libraries
import os
import cv2
import torch
from transformers import AutoModelForCausalLM, AutoProcessor
from PIL import Image
import supervision as sv
import numpy as np

# Set HOME directory
HOME = os.getcwd()

# Function to initialize model and processor
def initialize_model(checkpoint="microsoft/Florence-2-large-ft", device=None):
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    model = AutoModelForCausalLM.from_pretrained(checkpoint, trust_remote_code=True).to(device)
    processor = AutoProcessor.from_pretrained(checkpoint, trust_remote_code=True)

    return model, processor, device

# Function to run inference on an image
def run_inference(image: Image, model, processor, device, task: str, text: str = ""):
    prompt = task + text
    inputs = processor(text=prompt, images=image, return_tensors="pt").to(device)
    generated_ids = model.generate(
        input_ids=inputs["input_ids"],
        pixel_values=inputs["pixel_values"],
        max_new_tokens=1024,
        num_beams=3
    )
    generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
    response = processor.post_process_generation(generated_text, task=task, image_size=image.size)
    return response

# Function to process a single frame and overlay caption
def process_frame(frame, model, processor, device, task, text):
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    image = Image.fromarray(frame_rgb)

    # Generate caption for the frame
    caption = run_inference(image=image, model=model, processor=processor, device=device, task=task, text=text)
    
    # Convert back to BGR after annotation
    frame_bgr = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
    
    # Overlay caption text on the frame
    font = cv2.FONT_HERSHEY_SIMPLEX
    cv2.putText(frame_bgr, caption, (50, 50), font, 1, (255, 255, 255), 2, cv2.LINE_AA)
    
    return frame_bgr

# Function to process a video with captions
def process_video(input_video_path, output_video_path, model, processor, device, task, text, frame_step=1):
    cap = cv2.VideoCapture(input_video_path)
    if not cap.isOpened():
        print(f"Error opening video file {input_video_path}")
        return

    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"Total number of frames: {frame_count}")

    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps / frame_step, (width, height))

    frame_idx = 0
    processed_frames = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_idx % frame_step == 0:
            print(f"Processing frame {frame_idx+1}/{frame_count}...", end="\r")
            processed_frame = process_frame(frame, model, processor, device, task, text)
            processed_frames += 1
        else:
            processed_frame = frame

        out.write(processed_frame)
        frame_idx += 1

    cap.release()
    out.release()
    print(f"\nProcessed video saved to: {output_video_path}")
    print(f"Total processed frames: {processed_frames}")

# Main code execution
model, processor, device = initialize_model()

# Create data directory if it doesn't exist
data_dir = os.path.join(HOME, "data")
os.makedirs(data_dir, exist_ok=True)

# Download the video
video_url = "https://videos.pexels.com/video-files/3015510/3015510-hd_1920_1080_24fps.mp4"
input_video_path = os.path.join(data_dir, "hot_air_balloons.mp4")
if not os.path.exists(input_video_path):
    print("Downloading video...")
    os.system(f"wget -q {video_url} -O {input_video_path}")
else:
    print("Video already downloaded.")

# Define task and text for captioning
task = "<DETAILED_CAPTION>"
text = ""  # Additional prompt text can be added if needed

# Output video path
output_video_path = os.path.join(data_dir, "detailed_caption_output.mp4")

# Process video with captions
process_video(
    input_video_path=input_video_path,
    output_video_path=output_video_path,
    model=model,
    processor=processor,
    device=device,
    task=task,
    text=text,
    frame_step=1
)


2024-11-11 13:08:25.060821: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-11-11 13:08:25.471243: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1731326905.663241 1130744 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1731326905.706276 1130744 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-11-11 13:08:26.097000: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

Using device: cuda


Florence2LanguageForConditionalGeneration has generative capabilities, as `prepare_inputs_for_generation` is explicitly overwritten. However, it doesn't directly inherit from `GenerationMixin`. From 👉v4.50👈 onwards, `PreTrainedModel` will NOT inherit from `GenerationMixin`, and this model will lose the ability to call `generate` and other related functions.
  - If you are the owner of the model architecture code, please modify your model class such that it inherits from `GenerationMixin` (after `PreTrainedModel`, otherwise you'll get an exception).
  - If you are not the owner of the model architecture class, please contact the model code owner to update it.


NameError: name 'input_video_path' is not defined