In [24]:
import cv2
import threading
import queue
import time
from ultralytics import YOLO
from collections import defaultdict

# --- CONFIGURATION ---
# RECOMMENDATION: Use .engine (TensorRT) or .onnx files for 2x-5x speedup
MODEL_PATH = 'data/model/detect_yolo_small_v3.pt'  # Change to 'best5.engine' after exporting
MODEL_CLASSIFY_PATH = 'data/model/classify_yolo_nano_v2.pt.pt'  # Change to 'best_classify.engine' after exporting
VIDEO_PATH = "output.mp4"
TRACKER_CONFIG = 'custom_bytetrack.yaml'

N_FRAMES_OPEN_CONFIRM = 3
M_FRAMES_CLOSED_CONFIRM = 4

# Reduced visualization resolution (width) for faster display
DISPLAY_WIDTH = 1280
# --- END OF CONFIGURATION ---

# --- Shared Resources for Threading ---
classification_queue = queue.Queue()
class_counts = defaultdict(int)
counts_lock = threading.Lock() # Prevents race conditions when reading/writing counts

def classifier_worker():
    """
    Worker thread that waits for ROIs (images of bags) and classifies them.
    This runs in the background so the video doesn't stutter.
    """
    print("Loading classification model in worker thread...")
    # Load model inside the thread or pass it in.
    # NOTE: For TensorRT/Exported models, loading here is safer for thread isolation.
    model_classify = YOLO(MODEL_CLASSIFY_PATH)

    while True:
        try:
            # Wait for an item, but allow checking for exit every second
            item = classification_queue.get(timeout=1)
        except queue.Empty:
            continue

        if item is None: # Sentinel to kill thread
            break

        track_id, roi_img = item

        # Run inference (verbose=False is slightly faster)
        results = model_classify(roi_img, verbose=False)

        detected_label = "Unknown"
        # Logic to extract label
        if results[0].probs is not None:
            detected_label = results[0].names[results[0].probs.top1]
        elif len(results[0].boxes) > 0:
            top_cls = int(results[0].boxes.cls[0])
            detected_label = model_classify.names[top_cls]

        with counts_lock:
            class_counts[detected_label] += 1

        # print(f" [Async] Track {track_id} -> {detected_label}")
        classification_queue.task_done()

# Start the classification thread
t = threading.Thread(target=classifier_worker, daemon=True)
t.start()

# --- Main Tracker Setup ---
model = YOLO(MODEL_PATH)
class_names = model.names

try:
    OPEN_CLASS_NAME = 'bread-bag-opened'
    CLOSED_CLASS_NAME = 'bread-bag-closed'
    names_to_ids = {v: k for k, v in class_names.items()}
    open_class_id = names_to_ids[OPEN_CLASS_NAME]
    closed_class_id = names_to_ids[CLOSED_CLASS_NAME]
except KeyError as e:
    print(f"Error: Class {e} not found.")
    exit()

track_states = {}
cap = cv2.VideoCapture(VIDEO_PATH)

# Optimization: Pre-calculate resize factor for display to avoid resizing full arrays constantly if not needed
# Or simply resize the final image.
frame_count = 0
start_time = time.time()

while cap.isOpened():
    success, frame = cap.read()
    if not success:
        break

    # Optimization: Run tracking.
    # 'persist=True' is required for tracking.
    # 'verbose=False' suppresses printing to console (saves I/O time)
    results = model.track(frame, persist=True, tracker=TRACKER_CONFIG, verbose=False)

    # Extract data locally to avoid repeated access
    boxes = results[0].boxes
    current_detections = {}

    # Optimization: Manual Drawing is faster than results[0].plot()
    # We will draw on 'frame' directly.
    if boxes.id is not None:
        # Move tensors to CPU once and convert to numpy/list
        track_ids = boxes.id.int().cpu().tolist()
        cls_ids = boxes.cls.int().cpu().tolist()
        xyxys = boxes.xyxy.cpu().tolist()

        for track_id, cls_id, box in zip(track_ids, cls_ids, xyxys):
            current_detections[track_id] = cls_id

            # Draw Bounding Box (Green for Closed, Red for Open, White for others)
            x1, y1, x2, y2 = map(int, box)
            color = (200, 200, 200)
            if cls_id == open_class_id: color = (0, 0, 255)     # Red
            elif cls_id == closed_class_id: color = (0, 255, 0) # Green

            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

            # Draw Label (Simplified)
            label = f"{track_id}: {class_names[cls_id]}"
            cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

            # --- State Machine Logic ---
            # Retrieve or create state
            if track_id not in track_states:
                track_states[track_id] = {'state': 'detecting_open', 'open_count': 0, 'closed_count': 0}

            state_info = track_states[track_id]

            # Logic
            if state_info['state'] == 'detecting_open':
                if cls_id == open_class_id:
                    state_info['open_count'] += 1
                else:
                    state_info['open_count'] = 0

                if state_info['open_count'] >= N_FRAMES_OPEN_CONFIRM:
                    state_info['state'] = 'detecting_closed'
                    state_info['open_count'] = 0

            elif state_info['state'] == 'detecting_closed':
                if cls_id == closed_class_id:
                    state_info['closed_count'] += 1
                else:
                    state_info['closed_count'] = 0

                if state_info['closed_count'] >= M_FRAMES_CLOSED_CONFIRM:
                    state_info['state'] = 'counted'

                    # --- ASYNC CLASSIFICATION TRIGGER ---
                    # Validate coordinates
                    h, w, _ = frame.shape
                    cx1, cy1 = max(0, x1), max(0, y1)
                    cx2, cy2 = min(w, x2), min(h, y2)

                    if cx2 > cx1 and cy2 > cy1:
                        # Copy ROI to avoid memory issues when frame changes
                        roi = frame[cy1:cy2, cx1:cx2].copy()
                        # Push to queue
                        classification_queue.put((track_id, roi))

    # Clean up old tracks from state dictionary
    # (Optional: Keeps dictionary small)
    for tid in list(track_states.keys()):
        if tid not in current_detections:
            # You might want to keep them briefly or delete immediately
            # Here we just reset counts if lost
            track_states[tid]['open_count'] = 0
            track_states[tid]['closed_count'] = 0

    # --- Display Stats ---
    # Read counts safely
    with counts_lock:
        display_counts = list(class_counts.items())

    y_offset = 60
    cv2.putText(frame, "FPS: calculate if needed", (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 1.6, (0, 255, 255), 3)

    sorted_counts = sorted(display_counts)
    print(f"sorted_counts: {sorted_counts}")
    for cls_name, count in sorted_counts:
        y_offset += 70
        text = f"{cls_name}: {count}"
        cv2.putText(frame, text, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 3)

    # Resize only for display (keeps processing on original resolution)
    if DISPLAY_WIDTH:
        h, w = frame.shape[:2]
        scale = DISPLAY_WIDTH / w
        frame_disp = cv2.resize(frame, (int(w*scale), int(h*scale)))
    else:
        frame_disp = frame

    cv2.imshow("Optimized Tracker", frame_disp)

    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

# Cleanup
cap.release()
cv2.destroyAllWindows()

Loading classification model in worker thread...


Exception in thread Thread-12 (classifier_worker):
Traceback (most recent call last):
  File "C:\Python312\Lib\threading.py", line 1052, in _bootstrap_inner
    self.run()
  File "C:\Python312\Lib\threading.py", line 989, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Khaled\AppData\Local\Temp\ipykernel_21952\2566700008.py", line 35, in classifier_worker
  File "C:\Users\Khaled\PyCharmMiscProject\.venv\Lib\site-packages\ultralytics\models\yolo\model.py", line 81, in __init__
    super().__init__(model=model, task=task, verbose=verbose)
  File "C:\Users\Khaled\PyCharmMiscProject\.venv\Lib\site-packages\ultralytics\engine\model.py", line 149, in __init__
    self._load(model, task=task)
  File "C:\Users\Khaled\PyCharmMiscProject\.venv\Lib\site-packages\ultralytics\engine\model.py", line 288, in _load
    self.model, self.ckpt = load_checkpoint(weights)
                            ^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Khaled\PyCharmMiscProject\.venv\Lib\site-pac

In [18]:
from ultralytics import YOLO
model = YOLO('data/model/classify_yolo_nano_v2.pt')

# 2. Get the class names dictionary
class_dict = model.names
print(f"Detected Classes: {class_dict}")

Detected Classes: {0: 'Blue_Yellow', 1: 'Bran', 2: 'Brown_Orange_Overlay', 3: 'Brown_Orange_Small', 4: 'Green_Yellow', 5: 'Red_Yellow', 6: 'Wheatberry'}
