In [1]:
import os

HOME = os.getcwd()

CHECKPOINT_PATH = os.path.join(HOME, "models", "sam_vit_h_4b8939.pth")
print(CHECKPOINT_PATH, "; exist:", os.path.isfile(CHECKPOINT_PATH))

/home/thebird/Dynamworks/LLM_Module/Hackathon/models/sam_vit_h_4b8939.pth ; exist: True


In [2]:
import torch
from segment_anything import sam_model_registry, SamAutomaticMaskGenerator, SamPredictor

DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
MODEL_TYPE = "vit_h"

sam = sam_model_registry[MODEL_TYPE](checkpoint=CHECKPOINT_PATH).to(device=DEVICE)

In [3]:
torch.cuda.is_available()

True

In [None]:
mask_generator = SamAutomaticMaskGenerator(sam)


In [None]:
import cv2
import supervision as sv # pip install supervision==0.24.0

image_bgr = cv2.imread("/home/thebird/Dynamworks/LLM_Module/Hackathon/Vehicle_Detection_Image_Dataset/sample_image.jpg")
image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)

masks = mask_generator.generate(image_rgb)


In [None]:
print(len(masks))
print(masks[0].keys())


In [None]:
mask_annotator = sv.MaskAnnotator(color_lookup = sv.ColorLookup.INDEX)

detections = sv.Detections.from_sam(sam_result=masks)

annotated_image = mask_annotator.annotate(scene=image_bgr.copy(), detections=detections)

sv.plot_images_grid(
    images=[image_bgr, annotated_image],
    grid_size=(1, 2),
    titles=['source image', 'segmented image']
)

In [None]:
import numpy as np
select_mask  = [mask["segmentation"] for mask in masks[0:25]]

sv.plot_images_grid(
    images=select_mask,
    grid_size= (5,5),
    size=(20,20)
)

In [1]:
from ultralytics import YOLO

from IPython.display import display, Image

model = YOLO('/home/thebird/Dynamworks/LLM_Module/Hackathon/runs/detect/train/weights/best.pt')
model.fuse()

Model summary (fused): 168 layers, 3,005,843 parameters, 0 gradients, 8.1 GFLOPs


In [5]:
mask_predictor = SamPredictor(sam)

In [6]:
CLASS_NAMES_DICT = model.model.names

# class_ids of interest - based on the number of classses
CLASS_ID = [item for item in range(0,len(CLASS_NAMES_DICT))]

CLASS_NAMES_DICT

{0: 'Vehicle'}

In [7]:
def get_video_dimensions(cap):
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    return width, height

def add_color_to_mask(mask, color):
    # Convert the color tensor to CPU
    color = torch.tensor(color).cpu().numpy()

    # Create a binary mask based on the original mask
    color_mask = np.zeros_like(mask.cpu().numpy(), dtype=np.uint8)
    color_mask[mask.cpu().numpy() > 0] = 1  # Set non-zero values to 1

    # Expand the color tensor and apply it to the binary mask
    colored_mask = color_mask[..., None] * color

    return colored_mask

def draw_class_names(frame, class_names, positions, color, font_size=0.5):
    for class_name, position in zip(class_names, positions):
        cv2.putText(frame, class_name, position, cv2.FONT_HERSHEY_SIMPLEX, font_size, color, 2, cv2.LINE_AA)

def draw_yolov8_boxes(frame, boxes, color):
    for box in boxes:
        box = list(map(int, box))
        cv2.rectangle(frame, (box[0], box[1]), (box[2], box[3]), color, 2)

def get_predicted_masks(model, mask_predictor, frame, width, height):
    # Run frame through YOLOv8 to get detections
    detections = model.predict(frame, conf=0.7)

    # Check if there are fish detections
    if len(detections[0].boxes) == 0:
        return None, None, None, None  # Skip processing for frames without fish detections

    # Run frame and detections through SAM to get masks
    transformed_boxes = mask_predictor.transform.apply_boxes_torch(
        detections[0].boxes.xyxy, [width, height]
    )
    mask_predictor.set_image(frame)
    masks, scores, logits = mask_predictor.predict_torch(
        boxes=transformed_boxes,
        multimask_output=False,
        point_coords=None,
        point_labels=None
    )
    return masks, scores, logits, detections

In [8]:
import cv2
import numpy as np
import torch

# Replace the following line with your actual VIDEO_PATH
VIDEO_PATH = "/home/thebird/Dynamworks/LLM_Module/Hackathon/Vehicle_Detection_Image_Dataset/sample_video.mp4"
OUTPUT_VIDEO_PATH = "/home/thebird/Dynamworks/LLM_Module/Hackathon/working/sample_video_yolo_sam.mp4"

# This will contain the resulting mask predictions for local use
mask_frames = []


constant_mask_color = np.array([0, 0, 255], dtype=np.uint8)  # Red color for masks
output_class_color = (0, 255, 0)  # Green color for class names
yolov8_box_color = (255, 0, 0)  # Blue color for YOLOv8 bounding boxes

cap = cv2.VideoCapture(VIDEO_PATH)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

fourcc = cv2.VideoWriter_fourcc(*'XVID')
output_video = cv2.VideoWriter(OUTPUT_VIDEO_PATH, fourcc, 15.0, (width, height))

frame_num = 1
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Check if the frame is empty or None
    if frame is None:
        continue  # Skip processing for empty frames

    masks, scores, logits, detections = get_predicted_masks(model,
                                                            mask_predictor,
                                                            frame,
                                                            width,
                                                            height)

    if len(detections[0].boxes) == 0:
        continue  # Skip processing for frames without fish detections

    # Check if the mask is empty
    if masks[0][0].numel() == 0:
        continue  # Skip processing for empty masks

    # Combine mask predictions into a single mask, each with the same color
    class_ids = detections[0].boxes.cpu().cls
    merged_with_colors = add_color_to_mask(masks[0][0], constant_mask_color)
    for i in range(1, len(masks)):
        curr_mask_with_colors = add_color_to_mask(masks[i][0], constant_mask_color)
        merged_with_colors = np.bitwise_or(merged_with_colors, curr_mask_with_colors)

    # Draw YOLOv8 bounding boxes on the frame
    draw_yolov8_boxes(frame, detections[0].boxes.xyxy, yolov8_box_color)

    # Draw class names on the frame with a slightly larger font
    class_names = [CLASS_NAMES_DICT[int(class_id)] for class_id in class_ids]
    draw_class_names(frame, class_names, [(int(box[0]), int(box[1])) for box in detections[0].boxes.xyxy], output_class_color, font_size=0.7)

    # Overlay the SAM masks onto the frame
    frame_with_masks = cv2.addWeighted(frame, 1, merged_with_colors, 0.5, 0)

    # Write the frame with masks, YOLOv8 boxes, and class names to the output video
    output_video.write(frame_with_masks)

    frame_num += 1

cap.release()
output_video.release()




OpenCV: FFMPEG: tag 0x44495658/'XVID' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'


0: 384x640 3 Vehicles, 61.3ms
Speed: 3.3ms preprocess, 61.3ms inference, 86.9ms postprocess per image at shape (1, 3, 384, 640)


KeyboardInterrupt: 

In [4]:
from __future__ import annotations

import argparse
import logging
import os
from pathlib import Path
from typing import Final
from urllib.parse import urlparse

import cv2
import numpy as np
import requests
import rerun as rr  # pip install rerun-sdk
import rerun.blueprint as rrb
import torch
import torchvision
from segment_anything import SamAutomaticMaskGenerator, sam_model_registry
from segment_anything.modeling import Sam
from tqdm import tqdm

DESCRIPTION = """
Example of using Rerun to log and visualize the output of [Segment Anything](https://segment-anything.com/).

The full source code for this example is available [on GitHub](https://github.com/rerun-io/rerun/blob/latest/examples/python/segment_anything_model).
""".strip()

MODEL_DIR: Final = os.path.join(os.getcwd(),"models/")
MODEL_URLS: Final = {
    "vit_h": "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth",
    "vit_l": "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_l_0b3195.pth",
    "vit_b": "https://dl.fbaipublicfiles.com/segment_anything/sam_vit_b_01ec64.pth",
}

In [5]:
def create_sam(model: str, device: str, model_name: str) -> Sam:
    """Load the segment-anything model, fetching the model-file as necessary."""
    model_path = os.path.join(MODEL_DIR, model_name)

    logging.info(f"PyTorch version: {torch.__version__}")
    logging.info(f"Torchvision version: {torchvision.__version__}")
    logging.info(f"CUDA is available: {torch.cuda.is_available()}")

    logging.info(f"Building sam from: {model_path}")
    sam = sam_model_registry[model](checkpoint=model_path)
    return sam.to(device=device)

def run_segmentation(mask_generator: SamAutomaticMaskGenerator, image: cv2.typing.MatLike) -> None:
    """Run segmentation on a single image."""
    rr.log("image", rr.Image(image))

    logging.info("Finding masks")
    masks = mask_generator.generate(image)

    logging.info(f"Found {len(masks)} masks")

    # Log all the masks stacked together as a tensor
    # TODO(jleibs): Tensors with class-ids and annotation-coloring would make this much slicker
    mask_tensor = (
        np.dstack([np.zeros((image.shape[0], image.shape[1]))] + [m["segmentation"] for m in masks]).astype("uint8")
        * 128
    )
    rr.log("mask_tensor", rr.Tensor(mask_tensor))

    # Note: for stacking, it is important to sort these masks by area from largest to smallest
    # this is because the masks are overlapping and we want smaller masks to
    # be drawn on top of larger masks.
    # TODO(jleibs): we could instead draw each mask as a separate image layer, but the current layer-stacking
    # does not produce great results.
    masks_with_ids = list(enumerate(masks, start=1))
    print(masks_with_ids[0][1])
    masks_with_ids.sort(key=(lambda x: x[1]["area"]), reverse=True)  # type: ignore[no-any-return]

    # Layer all of the masks together, using the id as class-id in the segmentation
    segmentation_img = np.zeros((image.shape[0], image.shape[1]))
    for id, m in masks_with_ids:
        segmentation_img[m["segmentation"]] = id

    rr.log("image/masks", rr.SegmentationImage(segmentation_img.astype(np.uint8)))

    mask_bbox = np.array([m["bbox"] for _, m in masks_with_ids])
    rr.log(
        "image/boxes",
        rr.Boxes2D(array=mask_bbox, array_format=rr.Box2DFormat.XYWH, class_ids=[id for id, _ in masks_with_ids]),
    )

def load_image(image) -> cv2.typing.MatLike:
    """Conditionally download an image from URL or load it from disk."""
    logging.info(f"Loading: image")

    # Rerun can handle BGR as well, but SAM requires RGB.
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    return image

In [None]:
# parser = argparse.ArgumentParser(
#         description="Run the Facebook Research Segment Anything example.",
#         formatter_class=argparse.ArgumentDefaultsHelpFormatter,
#     )

# rr.script_add_args(parser)
# args = parser.parse_args()


blueprint = rrb.Vertical(
        rrb.Spatial2DView(name="Image and segmentation mask", origin="/image"),
        rrb.Horizontal(
            rrb.TextLogView(name="Log", origin="/logs"),
            rrb.TextDocumentView(name="Description", origin="/description"),
            column_shares=[2, 1],
        ),
        row_shares=[3, 1],
    )

# rr.script_setup(args, "rerun_example_segment_anything_model", default_blueprint=blueprint)
logging.getLogger().addHandler(rr.LoggingHandler("logs"))
logging.getLogger().setLevel(logging.INFO)

rr.log("description", rr.TextDocument(DESCRIPTION, media_type=rr.MediaType.MARKDOWN), timeless=True)

sam = create_sam("vit_h", "cuda", "sam_vit_h_4b8939.pth")

In [6]:
mask_config = {"points_per_batch": 32}
mask_generator = SamAutomaticMaskGenerator(sam, **mask_config)

In [7]:
cap = cv2.VideoCapture("/home/thebird/Dynamworks/LLM_Module/Hackathon/working/sample_video.mp4")
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

images_in_video = []
frame_num = 1

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    if frame_num > 8:
        images_in_video.append(frame)
    frame_num += 1
    # if frame_num == 10:
        # break

cap.release()




In [8]:
rr.set_time_sequence("image", 0)
image = load_image(images_in_video[0])
run_segmentation(mask_generator, image)

{'segmentation': array([[ True,  True,  True, ...,  True,  True,  True],
       [ True,  True,  True, ...,  True,  True,  True],
       [ True,  True,  True, ...,  True,  True,  True],
       ...,
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False]]), 'area': 168925, 'bbox': [0, 0, 1279, 197], 'predicted_iou': 1.0428577661514282, 'point_coords': [[180.0, 101.25]], 'stability_score': 0.9829621911048889, 'crop_box': [0, 0, 1280, 720]}


In [10]:
for n, image_uri in enumerate(images_in_video):
    rr.set_time_sequence("image", n)
    image = load_image(image_uri)
    run_segmentation(mask_generator, image)
    print(n)

{'segmentation': array([[ True,  True,  True, ...,  True,  True,  True],
       [ True,  True,  True, ...,  True,  True,  True],
       [ True,  True,  True, ...,  True,  True,  True],
       ...,
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False]]), 'area': 168925, 'bbox': [0, 0, 1279, 197], 'predicted_iou': 1.0428577661514282, 'point_coords': [[180.0, 101.25]], 'stability_score': 0.9829621911048889, 'crop_box': [0, 0, 1280, 720]}
0
{'segmentation': array([[ True,  True,  True, ...,  True,  True,  True],
       [ True,  True,  True, ...,  True,  True,  True],
       [ True,  True,  True, ...,  True,  True,  True],
       ...,
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False]]), 'area': 168913, 'bbox': [0, 0, 1279, 197], 'predicted_iou': 1.0433796644210815,

KeyboardInterrupt: 