In [None]:
# @title Cell 0: Mount Google Drive (for Colab use)
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# @title Cell 1: Install Dependencies (for Colab use)
!pip install -r /content/drive/MyDrive/Product_Detector/requirements.txt

In [None]:
# @title Cell 2: Imports

# File and path handling
import os

# Image and video processing
import cv2
import numpy as np

# Detection framework (YOLOv8)
import torch
from ultralytics import YOLO

# Dataset loading (COCO) for class verification
from datasets import load_dataset

# Quick web interface
import gradio as gr

# Typing and utilities
from typing import List, Tuple, Dict

In [None]:
# @title Step 0: Load and explore the COCO dataset (rafaelpadilla/coco2017)

# Load the YOLOv8 model to access the full list of COCO class names
model = YOLO("yolov8n.pt")

# List of class names you want to exclude from detection
EXCLUDED_CLASSES = ["apple", "orange", "hot dog", "skateboard", "laptop" ]  # <-- modificá acá según tus necesidades

# Generate the list of target classes excluding the ones you don't want
TARGET_CLASSES = [name for name in model.names.values() if name not in EXCLUDED_CLASSES]

print(f"Using {len(TARGET_CLASSES)} COCO classes (excluding {len(EXCLUDED_CLASSES)}):")
print(TARGET_CLASSES)



In [None]:
# @title Step 1: Load and configure the YOLOv8 model

model = YOLO("yolov8n.pt")  # Nano version pretrained on COCO
model.conf = 0.38           # Confidence threshold: discard detections with score < 0.5

print(f"Model loaded with confidence threshold {model.conf}")

In [None]:
# @title Step 2: IoU-based tracker for product detection
from typing import List
import numpy as np

def compute_iou(box1: np.ndarray, box2: np.ndarray) -> float:
    """
    Compute the Intersection over Union (IoU) between two boxes.
    Boxes are in [x1, y1, x2, y2] format.
    """
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])

    inter_w = max(0, x2 - x1)
    inter_h = max(0, y2 - y1)
    inter_area = inter_w * inter_h

    area1 = max(0, box1[2] - box1[0]) * max(0, box1[3] - box1[1])
    area2 = max(0, box2[2] - box2[0]) * max(0, box2[3] - box2[1])

    union_area = area1 + area2 - inter_area
    return inter_area / union_area if union_area > 0 else 0.0

class SimpleIoUTracker:
    """
    Simple IoU-based tracker to assign unique IDs to boxes
    across consecutive frames.
    """
    def __init__(self, iou_threshold: float = 0.5):
        self.iou_threshold = iou_threshold
        self.next_id = 0
        self.prev_boxes: List[np.ndarray] = []
        self.prev_ids: List[int] = []

    def update(self, boxes: List[np.ndarray]) -> List[int]:
        """
        Assign IDs to new boxes based on IoU with previous boxes.
        boxes: list of [x1, y1, x2, y2] arrays for the current frame.
        Returns: list of assigned IDs in the same order.
        """
        assigned_ids: List[int] = []
        unmatched_prev = list(range(len(self.prev_boxes)))

        for box in boxes:
            best_iou = 0.0
            best_idx = -1
            for idx in unmatched_prev:
                iou = compute_iou(box, self.prev_boxes[idx])
                if iou > best_iou:
                    best_iou = iou
                    best_idx = idx

            if best_iou >= self.iou_threshold and best_idx != -1:
                assigned_id = self.prev_ids[best_idx]
                unmatched_prev.remove(best_idx)
            else:
                assigned_id = self.next_id
                self.next_id += 1

            assigned_ids.append(assigned_id)

        # Update state for next frame
        self.prev_boxes = boxes.copy()
        self.prev_ids = assigned_ids.copy()
        return assigned_ids

# Test the tracker
tracker = SimpleIoUTracker(iou_threshold=0.3)
boxes_frame1 = [np.array([10,10,50,50]), np.array([100,100,150,150])]
ids1 = tracker.update(boxes_frame1)
boxes_frame2 = [np.array([12,12,52,52]), np.array([200,200,250,250])]
ids2 = tracker.update(boxes_frame2)
print(ids1, ids2)  # expect [0, 1] then [0, 2]

In [None]:
# @title Step 3: Initialize the tracker and global accumulator, corrected to use TARGET_CLASSES

tracker = SimpleIoUTracker(iou_threshold=0.5)
detected_products = set()

def process_frame(frame: np.ndarray) -> np.ndarray:
    """
    Process a frame: detect products, assign IDs, accumulate list, and render annotations.
    Now filters on TARGET_CLASSES (all COCO classes).
    """
    # YOLOv8 inference
    results = model(frame)
    det     = results[0]
    boxes   = det.boxes.xyxy.cpu().numpy()
    class_ids = det.boxes.cls.cpu().numpy().astype(int)
    scores    = det.boxes.conf.cpu().numpy()

    # Filter detections only for our TARGET_CLASSES and above confidence threshold
    filtered_boxes = []
    filtered_names = []
    for box, cls_id, score in zip(boxes, class_ids, scores):
        name = model.names[cls_id]
        if name in TARGET_CLASSES and score >= model.conf:
            filtered_boxes.append(box.astype(int))
            filtered_names.append(name)

    # IoU tracking
    ids = tracker.update(filtered_boxes)

    # Accumulate unique detected products
    for name in filtered_names:
        detected_products.add(name)

    # Draw bounding boxes and labels
    for box, name, id_ in zip(filtered_boxes, filtered_names, ids):
        x1, y1, x2, y2 = box
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(frame, f"{name}-{id_}", (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # Display the list of detected classes in the top-left corner
    y0 = 30
    for prod in sorted(detected_products):
        cv2.putText(frame, prod, (10, y0),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)
        y0 += 25

    return frame


In [None]:
# @title Step 4: Video processing function
import cv2
import tempfile

def process_video(video_path: str) -> str:
    """
    Takes a video path, processes frame by frame with process_frame(),
    writes a new annotated video file, and returns its path.
    """
    # Open input video
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"Could not open video: {video_path}")

    # Video properties
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps    = cap.get(cv2.CAP_PROP_FPS) or 24.0

    # Prepare writer with a temporary file
    tmp_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
    out_path = tmp_file.name
    fourcc   = cv2.VideoWriter_fourcc(*"mp4v")
    writer   = cv2.VideoWriter(out_path, fourcc, fps, (width, height))

    # Processing loop
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        annotated = process_frame(frame)
        writer.write(annotated)

    # Release resources
    cap.release()
    writer.release()

    return out_path

In [40]:
# @title Step 5: Web interface with Gradio
import gradio as gr
from gradio.components import Video

interface = gr.Interface(
    fn=process_video,
    inputs=Video(label="Upload your shelf video"),
    outputs=Video(label="Annotated Video"),
    title="Product Detector in Video",
    description="Upload a supermarket shelf clip and the AI will list the detected products.",
    allow_flagging="never"
)

interface.launch(debug=True)

Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://fd5f21a839eff8729f.gradio.live


