### DATA

In [1]:
# Download and unzip pigs-tracking.zip
!gdown https://drive.google.com/uc?id=1aa7_wxMHVVX1b1Djfgp2JoA8R5Xc19cI -O /content/processed_videos.zip
!unzip /content/processed_videos.zip -d /content

# Download and unzip pigs-yolov8-weights.zip
!gdown https://drive.google.com/uc?id=1YTbpI-m27oDJdGivKA1x01MA37Kv8_5- -O /content/pigs-yolov8-weights.zip
!unzip /content/pigs-yolov8-weights.zip -d /content

Downloading...
From (original): https://drive.google.com/uc?id=1aa7_wxMHVVX1b1Djfgp2JoA8R5Xc19cI
From (redirected): https://drive.google.com/uc?id=1aa7_wxMHVVX1b1Djfgp2JoA8R5Xc19cI&confirm=t&uuid=8df7124e-5298-457e-b095-794f30ebf83c
To: /content/processed_videos.zip
100% 250M/250M [00:05<00:00, 44.1MB/s]
Archive:  /content/processed_videos.zip
   creating: /content/content/processed_videos/
  inflating: /content/content/processed_videos/Test Vid 1_processed.mp4  
  inflating: /content/content/processed_videos/Test Vid 7_processed.mp4  
  inflating: /content/content/processed_videos/Test Vid 6_processed.mp4  
  inflating: /content/content/processed_videos/Test Vid 8_processed.mp4  
  inflating: /content/content/processed_videos/Test Vid 3_processed.mp4  
  inflating: /content/content/processed_videos/Test Vid 5_processed.mp4  
  inflating: /content/content/processed_videos/Test Vid 2_processed.mp4  
Downloading...
From (original): https://drive.google.com/uc?id=1YTbpI-m27oDJdGivKA1x01MA

In [2]:
!pip install ultralytics[track]

Collecting ultralytics[track]
  Downloading ultralytics-8.3.182-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics[track])
  Downloading ultralytics_thop-2.0.16-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics_thop-2.0.16-py3-none-any.whl (28 kB)
Downloading ultralytics-8.3.182-py3-none-any.whl (1.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m68.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.182 ultralytics-thop-2.0.16


In [3]:
!ls /content/content/processed_videos/

'Test Vid 1_processed.mp4'  'Test Vid 6_processed.mp4'
'Test Vid 2_processed.mp4'  'Test Vid 7_processed.mp4'
'Test Vid 3_processed.mp4'  'Test Vid 8_processed.mp4'
'Test Vid 5_processed.mp4'


### SORT Tracker (Simplified Version)

This implementation maintains a fixed number of trackers and updates them
based on **Intersection over Union (IoU)** with the detections.

---

### 1. Intersection over Union (IoU)

For a predicted bounding box \( B_{pred} \) and a detection box \( B_{det} \):

$$
IoU(B_{pred}, B_{det}) =
\frac{|B_{pred} \cap B_{det}|}{|B_{pred} \cup B_{det}| + \epsilon}
$$

Where:

- $|B_{pred} \cap B_{det}|$ is the **intersection area**  
- $|B_{pred} \cup B_{det}|$ is the **union area**  
- $\epsilon$ is a small constant to avoid division by zero

---

### 2. Tracker Update Rule

Each tracker maintains:

- **hits** → number of successful matches  
- **no\_losses** → number of consecutive frames without match  

Update process:

$$
\text{If } IoU(B_{trk}, B_{det}) \geq \theta
\;\;\Rightarrow\;\;
B_{trk} \leftarrow B_{det}, \;\; hits \mathrel{+}= 1, \;\; no\_losses = 0
$$

$$
\text{Else } \;\;\Rightarrow\;\; no\_losses \mathrel{+}= 1
$$

Where \( \theta \) is the IoU threshold.  

---

### 3. Tracker Reset (Unmatched Detections)

If a detection does not match any tracker:

$$
B_{trk} \leftarrow B_{det}, \quad
hits = 1, \quad
no\_losses = 0
$$

This allows new objects to be assigned to idle trackers.  

---

### 4. Output Condition

A tracker is considered valid if:

$$
hits \geq min\_hits
\quad \text{ or } \quad
frame\_count \leq min\_hits
$$

### 5. Pipeline

* Run YOLO → get bounding boxes
* Track objects with SORT
* Pass YOLO bounding boxes into SAM to get segmentation masks
* Draw boxes and SAM Mask  on frames → save GIF

In [4]:
import os
import gc
import cv2
import torch
import numpy as np
import imageio
import pandas as pd
from ultralytics import YOLO, SAM
from tqdm.notebook import tqdm

# ==============================
# SORT Tracker Implementation
# ==============================
class KalmanBoxTracker:
    def __init__(self, bbox, tracker_id):
        self.bbox = bbox
        self.id = tracker_id
        self.hits = 1
        self.no_losses = 0

    def update(self, bbox):
        self.bbox = bbox
        self.hits += 1
        self.no_losses = 0

    def predict(self):
        self.no_losses += 1
        return self.bbox


class Sort:
    def __init__(self, max_age=10, min_hits=3, iou_threshold=0.3, max_ids=5):
        self.max_age = max_age
        self.min_hits = min_hits
        self.iou_threshold = iou_threshold
        self.max_ids = max_ids
        self.trackers = []
        self.frame_count = 0

        for i in range(self.max_ids):
            self.trackers.append(KalmanBoxTracker(bbox=np.array([0, 0, 0, 0]), tracker_id=i + 1))

    def iou(self, bb_test, bb_gt):
        xx1 = np.maximum(bb_test[0], bb_gt[0])
        yy1 = np.maximum(bb_test[1], bb_gt[1])
        xx2 = np.minimum(bb_test[2], bb_gt[2])
        yy2 = np.minimum(bb_test[3], bb_gt[3])
        w = np.maximum(0., xx2 - xx1)
        h = np.maximum(0., yy2 - yy1)
        wh = w * h
        o = wh / ((bb_test[2] - bb_test[0]) * (bb_test[3] - bb_test[1]) +
                  (bb_gt[2] - bb_gt[0]) * (bb_gt[3] - bb_gt[1]) - wh + 1e-6)
        return o

    def update(self, dets=np.empty((0, 5))):
        self.frame_count += 1
        unmatched_dets = set(range(len(dets)))
        matched_tracker_indices = set()

        for trk_idx, trk in enumerate(self.trackers):
            best_iou = 0
            best_det_idx = -1
            for det_idx in unmatched_dets:
                iou_score = self.iou(dets[det_idx][:4], trk.bbox)
                if iou_score > best_iou:
                    best_iou = iou_score
                    best_det_idx = det_idx

            if best_iou >= self.iou_threshold and best_det_idx != -1:
                trk.update(dets[best_det_idx][:4])
                unmatched_dets.remove(best_det_idx)
                matched_tracker_indices.add(trk_idx)
            else:
                trk.predict()

        for det_idx in unmatched_dets:
            candidate_idxs = [i for i in range(self.max_ids)
                              if i not in matched_tracker_indices or self.trackers[i].no_losses > self.max_age]
            if candidate_idxs:
                reset_trk_idx = candidate_idxs[0]
                trk = self.trackers[reset_trk_idx]
                trk.bbox = dets[det_idx][:4]
                trk.hits = 1
                trk.no_losses = 0
                matched_tracker_indices.add(reset_trk_idx)

        results = []
        for trk in self.trackers:
            if trk.hits >= self.min_hits or self.frame_count <= self.min_hits:
                results.append(np.concatenate((trk.bbox, [trk.id])))

        # 🚨 Enforce max_ids limit strictly
        results = sorted(results, key=lambda x: x[4])[:self.max_ids]
        return np.array(results)


# ==============================
# Load YOLO + SAM2.1
# ==============================
YOLO_MODEL_PATH = "/content/v9/weights/best.pt"
yolo_model = YOLO(YOLO_MODEL_PATH)
sam_model = SAM("sam2.1_b.pt")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
sam_model.to(device)


# ==============================
# Process Video with CSV + YOLO->SAM
# ==============================
def process_video(video_path, yolo_model, sam_model, output_gif_path, tracker_params=None, fps=5):
    if tracker_params is None:
        tracker_params = dict(max_age=10, min_hits=2, iou_threshold=0.3, max_ids=2)

    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    tracker = Sort(**tracker_params)
    frames = []
    csv_data = []

    for frame_id in tqdm(range(total_frames), desc=f"Processing {os.path.basename(video_path)}"):
        ret, frame = cap.read()
        if not ret:
            break

        results = yolo_model(frame, verbose=False)
        result = results[0]

        tracked_objects = []
        scores = []

        # check if YOLO found any boxes
        if hasattr(result, "boxes") and len(result.boxes) > 0:
            boxes = result.boxes.xyxy.cpu().numpy()
            scores = result.boxes.conf.cpu().numpy()

            if len(boxes) > 0:
                # make sure scores shape matches boxes
                if len(scores) < len(boxes):
                    scores = np.ones((len(boxes),), dtype=float)

                detections_for_sort = np.hstack((boxes, scores[:, None]))
                tracked_objects = tracker.update(detections_for_sort)

        # Copy frame for drawing
        frame_np = frame.copy()

        if len(tracked_objects) > 0:
            # enforce strict max_ids
            tracked_objects = tracked_objects[:tracker_params["max_ids"]]

            for i, obj in enumerate(tracked_objects):
                x1, y1, x2, y2, obj_id = obj.astype(int)
                confidence = scores[i] if i < len(scores) else 1.0

                # Draw bounding box (Green)
                cv2.rectangle(frame_np, (x1, y1), (x2, y2), (0, 255, 0), 2)

                # Draw object ID (Red)
                cv2.putText(frame_np, f"ID {obj_id}", (x1, y1 - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)

                # Save CSV info
                csv_data.append([frame_id, int(obj_id), int(x1), int(y1), int(x2), int(y2), float(confidence)])

                # YOLO bbox → SAM mask
                sam_results = sam_model(frame_np, bboxes=[[x1, y1, x2, y2]], verbose=False)
                if hasattr(sam_results[0], "masks") and sam_results[0].masks is not None:
                    mask = sam_results[0].masks.data[0].bool()

                    # Overlay mask (Blue)
                    overlay_color = torch.tensor([255, 0, 0], dtype=torch.uint8, device=device)
                    frame_tensor = torch.from_numpy(frame_np).to(device)
                    overlay = torch.zeros_like(frame_tensor, device=device)
                    overlay[mask] = overlay_color
                    frame_tensor[mask] = (0.5 * frame_tensor[mask] + 0.5 * overlay[mask]).to(torch.uint8)
                    frame_np = frame_tensor.cpu().numpy()

        # Resize and convert BGR→RGB
        frame_resized = cv2.resize(frame_np, (frame_np.shape[1] // 2, frame_np.shape[0] // 2))
        frames.append(cv2.cvtColor(frame_resized, cv2.COLOR_BGR2RGB))

        # Free memory
        torch.cuda.empty_cache()
        gc.collect()

    cap.release()

    # Save GIF
    imageio.mimsave(output_gif_path, frames, fps=fps)
    print(f"✅ Saved GIF at: {output_gif_path}")

    # Save CSV
    csv_path = output_gif_path.replace(".gif", ".csv")
    df = pd.DataFrame(csv_data, columns=["frame_id", "object_id", "x1", "y1", "x2", "y2", "confidence"])
    df.to_csv(csv_path, index=False)
    print(f"✅ Saved CSV at: {csv_path}")


# ==============================
# Run on Videos
# ==============================
video_list = [
    "Test Vid 1_processed.mp4",
    "Test Vid 6_processed.mp4",
    "Test Vid 2_processed.mp4",
    "Test Vid 7_processed.mp4",
    "Test Vid 3_processed.mp4",
    "Test Vid 8_processed.mp4",
    "Test Vid 5_processed.mp4",
]

input_dir = "/content/content/processed_videos"
output_dir = "/content/output_gifs"
os.makedirs(output_dir, exist_ok=True)

for vid_name in video_list:
    video_path = os.path.join(input_dir, vid_name)
    output_gif_path = os.path.join(output_dir, vid_name.replace(".mp4", ".gif"))
    process_video(video_path, yolo_model, sam_model, output_gif_path)

    # --- free memory after each video ---
    torch.cuda.empty_cache()
    gc.collect()

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/sam2.1_b.pt to 'sam2.1_b.pt': 100%|██████████| 154M/154M [00:05<00:00, 31.8MB/s]


Processing Test Vid 2_processed.mp4:   0%|          | 0/1701 [00:00<?, ?it/s]

✅ Saved GIF at: /content/output_gifs/Test Vid 2_processed.gif
✅ Saved CSV at: /content/output_gifs/Test Vid 2_processed.csv


Processing Test Vid 7_processed.mp4:   0%|          | 0/4684 [00:00<?, ?it/s]

✅ Saved GIF at: /content/output_gifs/Test Vid 7_processed.gif
✅ Saved CSV at: /content/output_gifs/Test Vid 7_processed.csv


Processing Test Vid 3_processed.mp4:   0%|          | 0/2063 [00:00<?, ?it/s]

✅ Saved GIF at: /content/output_gifs/Test Vid 3_processed.gif
✅ Saved CSV at: /content/output_gifs/Test Vid 3_processed.csv


Processing Test Vid 8_processed.mp4:   0%|          | 0/3532 [00:00<?, ?it/s]

✅ Saved GIF at: /content/output_gifs/Test Vid 8_processed.gif
✅ Saved CSV at: /content/output_gifs/Test Vid 8_processed.csv


Processing Test Vid 5_processed.mp4:   0%|          | 0/1672 [00:00<?, ?it/s]

✅ Saved GIF at: /content/output_gifs/Test Vid 5_processed.gif
✅ Saved CSV at: /content/output_gifs/Test Vid 5_processed.csv


In [5]:
!zip -r /content/output_gifs.zip /content/output_gifs

  adding: content/output_gifs/ (stored 0%)
  adding: content/output_gifs/Test Vid 7_processed.csv (deflated 59%)
  adding: content/output_gifs/Test Vid 8_processed.csv (deflated 61%)
  adding: content/output_gifs/Test Vid 5_processed.csv (deflated 62%)
  adding: content/output_gifs/Test Vid 5_processed.gif (deflated 2%)
  adding: content/output_gifs/Test Vid 2_processed.csv (deflated 60%)
  adding: content/output_gifs/Test Vid 7_processed.gif (deflated 2%)
  adding: content/output_gifs/Test Vid 8_processed.gif (deflated 2%)
  adding: content/output_gifs/Test Vid 3_processed.gif (deflated 2%)
  adding: content/output_gifs/Test Vid 2_processed.gif (deflated 2%)
  adding: content/output_gifs/Test Vid 3_processed.csv (deflated 60%)


### Save

Uncomments cells belwo and run to connetc to drive and save file in custome drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!cp -v /content/output_gifs.zip /content/drive/MyDrive/Data

In [None]:
from google.colab import files
files.download("/content/output_gifs.zip")

In [None]:
# from IPython.display import Image, display
# import os

# output_dir = "/content/output_gifs"

# # List of GIF filenames
# gif_list = [
#     "Test Vid 1_processed.gif",
#     "Test Vid 6_processed.gif",
#     "Test Vid 2_processed.gif",
#     "Test Vid 7_processed.gif",
#     "Test Vid 3_processed.gif",
#     "Test Vid 8_processed.gif",
#     "Test Vid 5_processed.gif"
# ]

# # Display each GIF in its own cell
# for gif_name in gif_list:
#     gif_path = os.path.join(output_dir, gif_name)
#     display(Image(filename=gif_path))