In [None]:
import os
from base64 import b64encode

import lovely_numpy as ln
import lovely_tensors as lt
import matplotlib.pyplot as plt
import numpy as np
import torch
from cotracker.utils.visualizer import Visualizer, read_video_from_path
from IPython.display import HTML
from PIL import Image
import torch.nn.functional as F

In [None]:
video_file_path = "/home/017534556/projects/cmpe_297/object_tracker/test_2.mp4"

In [None]:
video = read_video_from_path(video_file_path)
image = video[0].copy()
video = torch.from_numpy(video).permute(0, 3, 1, 2)[None].float()

In [None]:
plt.imshow(image)

In [None]:
def show_video(video_path):
    video_file = open(video_path, "r+b").read()
    video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
    return HTML(
        f"""<video width="640" height="480" autoplay loop controls><source src="{video_url}"></video>"""
    )


show_video(video_file_path)

In [None]:
from cotracker.predictor import CoTrackerPredictor

model = CoTrackerPredictor(
    checkpoint=os.path.join(
        "/home/017534556/projects/cmpe_297/object_tracker/co-tracker/notebooks/checkpoints/cotracker2.pth"
    )
)

In [None]:
if torch.cuda.is_available():
    model = model.cuda()
    video = video.cuda()
    video_interp = video_interp.cuda()

### Regular inference

In [None]:
pred_tracks, pred_visibility = model(video_interp, grid_size=10, backward_tracking=True)

In [None]:
vis = Visualizer(
    save_dir="/home/017534556/projects/cmpe_297/object_tracker/notebooks/cotracker_output_videos",
    pad_value=100,
    tracks_leave_trace=-1,
    linewidth=6,
)
visual = vis.visualize(
    video=video, tracks=pred_tracks, visibility=pred_visibility, filename="teaser"
)

In [None]:
show_video(
    "/home/017534556/projects/cmpe_297/object_tracker/notebooks/cotracker_output_videos/teaser.mp4"
)

In [None]:
visual.shape

### Prompt Based Inference [bbox]

In [None]:
queries = torch.tensor(
    [
        # [0.0, 350.0, 300.0], # apple
        [0.0, 400.0, 400.0],
        [0.0, 180.0, 420.0],
        [0.0, 250.0, 280.0],
        [0.0, 900.0, ]
        
    ]
)
if torch.cuda.is_available():
    queries = queries.cuda()

In [None]:
pred_tracks, pred_visibility = model(video, queries=queries[None])

In [None]:
vis = Visualizer(
    save_dir="/home/017534556/projects/cmpe_297/object_tracker/notebooks/cotracker_output_videos",
    pad_value=100,
    tracks_leave_trace=-1,
    linewidth=6,
)
visual = vis.visualize(
    video=video, tracks=pred_tracks, visibility=pred_visibility, filename="queries"
)

In [None]:
show_video(
    "/home/017534556/projects/cmpe_297/object_tracker/notebooks/cotracker_output_videos/queries.mp4"
)

### Prompt Based Inference [segmentation mask] [SAM]

In [None]:
from segment_anything import sam_model_registry, SamAutomaticMaskGenerator, SamPredictor
import numpy as np

In [None]:
sam_checkpoint = "/scratch/cmpe297-sp24/sam_weights/sam_vit_h_4b8939.pth"
model_type = "vit_h"

device = "cuda"

sam = sam_model_registry[model_type](checkpoint=sam_checkpoint)

In [None]:
sam.to(device=device)
predictor = SamPredictor(sam)
mask_generator = SamAutomaticMaskGenerator(sam)

In [None]:
def show_anns(anns):
    if len(anns) == 0:
        return
    sorted_anns = sorted(anns, key=(lambda x: x["area"]), reverse=True)
    ax = plt.gca()
    ax.set_autoscale_on(False)

    img = np.ones(
        (
            sorted_anns[0]["segmentation"].shape[0],
            sorted_anns[0]["segmentation"].shape[1],
            4,
        )
    )
    img[:, :, 3] = 0
    for ann in sorted_anns:
        m = ann["segmentation"]
        color_mask = np.concatenate([np.random.random(3), [0.35]])
        img[m] = color_mask
    ax.imshow(img)


def show_mask(mask, ax, random_color=False):
    if random_color:
        color = np.concatenate([np.random.random(3), np.array([0.6])], axis=0)
    else:
        color = np.array([30 / 255, 144 / 255, 255 / 255, 0.6])
    h, w = mask.shape[-2:]
    mask_image = mask.reshape(h, w, 1) * color.reshape(1, 1, -1)
    ax.imshow(mask_image)


def show_points(coords, labels, ax, marker_size=375):
    pos_points = coords[labels == 1]
    neg_points = coords[labels == 0]
    ax.scatter(
        pos_points[:, 0],
        pos_points[:, 1],
        color="green",
        marker="*",
        s=marker_size,
        edgecolor="white",
        linewidth=1.25,
    )
    ax.scatter(
        neg_points[:, 0],
        neg_points[:, 1],
        color="red",
        marker="*",
        s=marker_size,
        edgecolor="white",
        linewidth=1.25,
    )


def show_box(box, ax):
    x0, y0 = box[0], box[1]
    w, h = box[2] - box[0], box[3] - box[1]
    ax.add_patch(
        plt.Rectangle((x0, y0), w, h, edgecolor="green", facecolor=(0, 0, 0, 0), lw=2)
    )

In [None]:
ln.plot(image)

In [None]:
image.shape

In [None]:
predictor.set_image(image)

In [None]:
# input_point = np.array([[450, 300]])
input_point = np.array([400, 400])
input_label = np.array([1])

In [None]:
masks, scores, logits = predictor.predict(
    point_coords=input_point,
    point_labels=input_label,
    multimask_output=False,
)
segm_mask = masks[0]
segm_mask = segm_mask.astype(np.uint8)

In [None]:
for i, (mask, score) in enumerate(zip(masks, scores)):
    plt.figure(figsize=(10, 10))
    plt.imshow(image)
    show_mask(mask, plt.gca())
    show_points(input_point, input_label, plt.gca())
    plt.title(f"Mask {i+1}, Score: {score:.3f}", fontsize=18)
    plt.axis("on")
    plt.show()

In [None]:
grid_size = 100
pred_tracks, pred_visibility = model(
    video, grid_size=grid_size, segm_mask=torch.from_numpy(segm_mask)[None, None]
)

vis = Visualizer(
    save_dir="/home/017534556/projects/cmpe_297/object_tracker/notebooks/cotracker_output_videos",
    pad_value=100,
    linewidth=4,
    tracks_leave_trace=-1,
)

visual = vis.visualize(
    video=video,
    tracks=pred_tracks,
    visibility=pred_visibility,
    filename="segm_grid",
)

In [None]:
show_video(
    "/home/017534556/projects/cmpe_297/object_tracker/notebooks/cotracker_output_videos/segm_grid.mp4"
)

### Yolo-v8 Tasks

In [None]:
from ultralytics import YOLO

In [None]:
yolo_model = YOLO("yolov8x-obb.pt")

In [None]:
results = yolo_model.predict(Image.fromarray(image))

In [None]:
plt.imshow(results[0].plot())

In [None]:
plt.imshow(results[0].masks.data.cpu().numpy().sum(axis=0))

In [None]:
segm_mask = results[0].masks.data.cpu().numpy().sum(axis=0)
# track using only 1 segmentation mask, so sum all of them togehter

In [None]:
grid_size = 100
pred_tracks, pred_visibility = model(
    video, grid_size=grid_size, segm_mask=torch.from_numpy(segm_mask)[None, None]
)

vis = Visualizer(
    save_dir="/home/017534556/projects/cmpe_297/object_tracker/notebooks/cotracker_output_videos",
    pad_value=100,
    linewidth=4,
    tracks_leave_trace=-1,
)

visual = vis.visualize(
    video=video,
    tracks=pred_tracks,
    visibility=pred_visibility,
    filename="yolo_segm_grid",
)

In [None]:
show_video(
    "/home/017534556/projects/cmpe_297/object_tracker/notebooks/cotracker_output_videos/yolo_segm_grid.mp4"
)

### RoboFlow

In [None]:
from inference import get_model
import supervision as sv
from roboflow import Roboflow
import cv2
rf = Roboflow(api_key="nVeBbpBLolxpTnnaEMTT")

In [None]:
project = rf.workspace().project("yolov8_detfly-02")
rb_model = project.version(1).model

In [None]:
plt.imshow(image)

In [None]:
results = rb_model.predict(image, confidence=50, overlap=30).json()

In [None]:
detections = sv.Detections.from_inference(results)

In [None]:
detections

In [None]:
drones_bboxes = detections.xyxy.copy()

In [None]:
# if have array of batch x,y,x,y -> find mid point for each element
midpoints = (drones_bboxes[:, :2] + drones_bboxes[:, 2:]) / 2

In [None]:
queries = np.hstack([np.zeros((midpoints.shape[0], 1)), midpoints])
queries = torch.from_numpy(queries)
queries = queries.to(torch.float32)

In [None]:
if torch.cuda.is_available():
    queries = queries.cuda()

In [None]:
pred_tracks, pred_visibility = model(video, queries=queries[None])

In [None]:
vis = Visualizer(
    save_dir="/home/017534556/projects/cmpe_297/object_tracker/notebooks/cotracker_output_videos",
    pad_value=100,
    tracks_leave_trace=-1,
    linewidth=6,
    mode="optical_flow"
)
visual = vis.visualize(
    video=video, tracks=pred_tracks, visibility=pred_visibility, filename="queries_rb"
)

In [None]:
show_video(
    "/home/017534556/projects/cmpe_297/object_tracker/notebooks/cotracker_output_videos/queries_rb.mp4"
)