# tracking the birds
currently only compatible with roboflow's csv output format (working on it) 

bounding boxes can be downloaded from serverless roboflow API, or run locally using:
#### Import the InferencePipeline object
from inference import InferencePipeline
import cv2

def my_sink(result, video_frame):
    if result.get("output_image"): # Display an image from the workflow response
        cv2.imshow("Workflow Image", result["output_image"].numpy_image)
        cv2.waitKey(1)
    print(result) # do something with the predictions of each frame


#### initialize a pipeline object
pipeline = InferencePipeline.init_with_workflow(
    api_key="uRlfoIT0LJUs3mEMvNXV",
    workspace_name="basis-internship",
    workflow_id="detect-count-and-visualize-4",
    video_reference=0, # Path to video, device id (int, usually 0 for built in webcams), or RTSP stream url
    max_fps=30,
    on_prediction=my_sink
)
pipeline.start() #start the pipeline
pipeline.join() #wait for the pipeline thread to finish


### set up the environment
istall and import any necessary packages 
move to GPU if available (i.e. on runpod) 

In [1]:
# ------------------------------
# Environment Setup
# ------------------------------
# Install necessary packages
# !pip install tqdm opencv-python-headless ultralytics matplotlib ipywidgets supervision rfdetr
# !pip install git+https://github.com/kadirnar/bytetrack-pip.git

import os
import json
import numpy as np
import pandas as pd
from pathlib import Path
from tqdm import tqdm
import torch
import cv2
from ultralytics import YOLO
import yaml
import random, shutil
import tempfile

from bytetracker.byte_tracker import BYTETracker

#plotting
import matplotlib.pyplot as plt
from matplotlib.widgets import RectangleSelector
from ipywidgets import interact, IntSlider
from IPython.display import HTML


# Check GPU
print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("GPU:", torch.cuda.get_device_name(0))


CUDA available: False


### visualize the model output
before tracking, check the bounding boxes and object **detections** 

In [2]:
# start by manually checking the boudning boxes from running the object detection

def visualize_detections_from_csv(csv_path: Path, frame_dir: Path, output_video_path: Path, fps: int =30):
    df = pd.read_csv(csv_path)
    df["parsed_predictions"] = df["predictions"].apply(json.loads)

    frame_paths = sorted(frame_dir.glob("15_*.jpg"))
    if not frame_paths:
        print(f"⚠️ No frames found in {frame_dir}")
        return

    sample_frame = cv2.imread(str(frame_paths[0]))
    h, w = sample_frame.shape[:2]

    writer = cv2.VideoWriter(str(output_video_path), cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h))

    for idx, frame_path in enumerate(frame_paths):
        frame = cv2.imread(str(frame_path))
        if frame is None or idx >= len(df):
            continue

        preds = df.iloc[idx]["parsed_predictions"]["predictions"]
        for obj in preds:
            x, y, w_box, h_box = obj["x"], obj["y"], obj["width"], obj["height"]
            conf = obj.get("confidence", None)

            x1 = int(x - w_box / 2)
            y1 = int(y - h_box / 2)
            x2 = int(x + w_box / 2)
            y2 = int(y + h_box / 2)

            color = (0, 255, 0)
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            label = f"{obj.get('class', 'bird')}: {conf:.2f}" if conf is not None else "bird"
            cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

        writer.write(frame)

    writer.release()
    print(f"✅ Annotated video saved to: {output_video_path}")


based on the folder structure, there should be a 'extracted_frames' folder in each of the thermal subfolders. This is what we will be feeding to the tracker and visualization

In [None]:
# run it! 

TARGET_ROOT_DIR = Path("Desktop/labeling_data/data_structure")
SESSION_ROOT_DIR = '2024_05_18-session_0004'
thermal = "thermal_1"

csv_file = Path(f"{TARGET_ROOT_DIR}/{SESSION_ROOT_DIR}/{thermal}/thermal_7.0_21.0.csv")
frame_dir = Path(f"{TARGET_ROOT_DIR}/extracted_frames/{SESSION_ROOT_DIR}_{thermal}")
output_video_path = Path(f"{TARGET_ROOT_DIR}/{SESSION_ROOT_DIR}/{thermal}/detections.mp4")

visualize_detections_from_csv(csv_file, frame_dir, output_video_path)

FileNotFoundError: [Errno 2] No such file or directory: 'Desktop/labeling_data/data_structure/2024-05-18-session_0004/thermal_1/thermal_7.0_21.0.csv'

## tracking! 

In [None]:
#run the bytetracker tracker! 

def track_objects(csv_path: Path) -> dict:
    df = pd.read_csv(csv_path)
    df["parsed_predictions"] = df["predictions"].apply(json.loads)
    all_frames = []

    for _, row in df.iterrows():
        detections = []
        for obj in row["parsed_predictions"]["predictions"]:
            x, y, w, h, conf = obj["x"], obj["y"], obj["width"], obj["height"], obj["confidence"]
            x1, y1 = x - w / 2, y - h / 2
            x2, y2 = x + w / 2, y + h / 2
            detections.append([x1, y1, x2, y2, conf])
        all_frames.append(detections)

    tracker = BYTETracker()
    track_history = {}

    for frame_idx, detections in enumerate(all_frames):
        if not detections:
            continue
        dets_np = np.array(detections)
        if dets_np.ndim != 2 or dets_np.shape[1] < 5:
            continue
        class_ids = np.zeros((dets_np.shape[0], 1))
        formatted_dets = np.hstack((dets_np[:, :5], class_ids))
        tracked = tracker.update(formatted_dets)

        for det in tracked:
            x1, y1, x2, y2, track_id, _ = det[:6]
            cx, cy = int((x1 + x2) / 2), int((y1 + y2) / 2)
            track_history.setdefault(track_id, []).append((frame_idx, (cx, cy)))

    return track_history


### actually running the tracking!

In [None]:

# Root structure
sessions = [p for p in TARGET_ROOT_DIR.iterdir() if p.is_dir() and p.name.startswith("2024_05_18-session_")]

results_dir = Path(TARGET_ROOT_DIR/"tracking_results")
results_dir.mkdir(exist_ok=True)

session_path = sessions[0]
print("Selected session:", session_path.name)

for thermal_sub in ["thermal_1", "thermal_2"]:
    subfolder_path = Path(session_path / thermal_sub)
    if not subfolder_path.exists():
        continue

    try:
        csv_file = next(subfolder_path.glob("time_cropped-*.csv"))
    except StopIteration:
        print(f"⚠️ No CSV found in {subfolder_path}")
        continue

    frame_subdir_name = f"{session_path.name}_{thermal_sub}"
    frame_dir = TARGET_ROOT_DIR / "extracted_frames" / frame_subdir_name

    if not frame_dir.exists():
        print(f"⚠️ Missing frames at: {frame_dir}")
        continue

    # Get tracking history
    track_history = track_objects(csv_file)
        
    # Save each track's timeline as a CSV in the same folder as the input CSV
    output_csv = subfolder_path / f"{thermal_sub}_tracks.csv"
    
    with open(output_csv, "w") as f:
        f.write("track_id,frame,x,y\n")
        for tid, points in track_history.items():
            for frame_idx, (x, y) in points:
                f.write(f"{tid},{frame_idx},{x},{y}\n")


Selected session: 2024_05_18-session_0002


In [None]:
#visualization! 

def overlay_tracks_on_video(csv_path: Path, frame_dir: Path, output_video: Path, fps: int = 15):
    df = pd.read_csv(csv_path)
    df["track_id"] = df["track_id"].astype(int)

    track_colors = {}

    frame_paths = sorted(frame_dir.glob("15_*.jpg"))
    if not frame_paths:
        raise FileNotFoundError(f"No frames found in {frame_dir}")

    # Get frame size
    sample_frame = cv2.imread(str(frame_paths[0]))
    h, w = sample_frame.shape[:2]
    writer = cv2.VideoWriter(str(output_video), cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))

    for frame_path in frame_paths:
        frame_idx = int(frame_path.stem.split("_")[-1])
        frame = cv2.imread(str(frame_path))

        frame_tracks = df[df["frame"] == frame_idx]
        for _, row in frame_tracks.iterrows():
            tid, x, y = int(row["track_id"]), int(row["x"]), int(row["y"])
            color = track_colors.setdefault(tid, tuple((np.random.rand(3) * 255).astype(int)))

            #cv2.circle(frame, (x, y), 5, color, -1)
            cv2.circle(frame, (x, y), 5, (255, 255, 0), -1)
            #cv2.putText(frame, f"ID {tid}", (x + 10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

        writer.write(frame)

    writer.release()
    print(f"✅ Saved annotated video to: {output_video}")


In [None]:
session = "2024_05_18-session_0002"
thermal = "thermal_1"

csv_file = Path(f"Desktop/labeling_data/data_structure/{session}/{thermal}/{thermal}_tracks.csv")
frames = Path(f"Desktop/labeling_data/data_structure/extracted_frames/{session}_{thermal}")
output_video_path = Path(f"Desktop/labeling_data/data_structure/{session}/{thermal}/tracking_overlay.mp4")


# run it! 

TARGET_ROOT_DIR = Path("Desktop/labeling_data/data_structure")
SESSION_ROOT_DIR = '2024_05_18-session_0004'
thermal = "thermal_1"

csv_file = Path(f"Desktop/labeling_data/data_structure/{SESSION_ROOT_DIR}/{thermal}/thermal_7.0_21.0.csv")
frame_dir = Path(f"Desktop/labeling_data/data_structure/extracted_frames/{SESSION_ROOT_DIR}_{thermal}")
output_video_path = Path(f"Desktop/labeling_data/data_structure/{SESSION_ROOT_DIR}/{thermal}/detections.mp4")


overlay_tracks_on_video(csv_file, frames, output_video_path)


✅ Saved annotated video to: Desktop/labeling_data/data_structure/2024_05_18-session_0002/thermal_1/tracking_overlay.mp4
