In [4]:
import cv2
from ultralytics import YOLO
import numpy as np

# Load your trained model
model = YOLO(r'E:\elevatetrsest\CamEDGE counting\bestv2.pt') # Use your model path

# Define the classes your model was trained on
# CLASSES = ['box', 'box_broken', 'open_package', 'package']  #for best_only_box.pt
CLASSES = ['box', 'box_broken', 'forklift', 'open_package', 'package', 'pallets', 'person'] #bestv2.pt
CLASS_TO_TRACK_EXCLUSIVELY = 'box' 

# For managing our own sequential box IDs
box_id_map = {}  # Maps original tracker ID of a box to our sequential display ID
next_display_box_id = 1 # Counter for our sequential display IDs

def detect_and_custom_track_video(video_path, output_path=None, conf_threshold=0.5, tracker_config="bytetrack.yaml"):
    """
    Detect all objects, display sequential track IDs only for the 'box' class.
    Counts all objects per frame and unique 'box' instances using custom IDs.

    Args:
        video_path: Path to input video
        output_path: Path to save output video (optional)
        conf_threshold: Confidence threshold for detections
        tracker_config: Path to the tracker configuration file
    """
    global box_id_map, next_display_box_id # Use global for map and counter
    box_id_map.clear() # Clear for fresh run if function is called multiple times
    next_display_box_id = 1

    cap = cv2.VideoCapture(video_path)
    writer = None

    if output_path:
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    frame_count = 0
    # For counting all object types per frame
    current_frame_object_counts = {class_name: 0 for class_name in CLASSES}
    # For overall summary of any object detected at least once
    overall_objects_summary = {class_name: 0 for class_name in CLASSES}
    overall_seen_tracker_ids = set() # Uses the tracker's original IDs for this summary

    while True:
        ret, frame = cap.read()
        if not ret:
            print("End of video or error reading frame.")
            break
        frame_count += 1
        annotated_frame = frame.copy()

        results = model.track(frame, conf=conf_threshold, persist=True, tracker=tracker_config, verbose=False)

        current_frame_object_counts = {class_name: 0 for class_name in CLASSES}

        if results[0].boxes is not None:
            boxes_data = results[0].boxes.xyxy.cpu().numpy()
            confs_data = results[0].boxes.conf.cpu().numpy()
            class_ids_data = results[0].boxes.cls.cpu().numpy().astype(int)
            
            original_tracker_ids_data = None
            if results[0].boxes.id is not None:
                original_tracker_ids_data = results[0].boxes.id.cpu().numpy().astype(int)

            for i in range(len(boxes_data)):
                x1, y1, x2, y2 = map(int, boxes_data[i])
                conf = confs_data[i]
                class_id = class_ids_data[i]
                class_name = model.names[class_id]

                current_frame_object_counts[class_name] += 1
                
                label = f"{class_name} {conf:.2f}"
                display_id_for_this_box = None

                if original_tracker_ids_data is not None:
                    original_tracker_id = original_tracker_ids_data[i]

                    # Update overall summary (counts any tracked object once using original ID)
                    if original_tracker_id not in overall_seen_tracker_ids:
                        overall_objects_summary[class_name] = overall_objects_summary.get(class_name, 0) + 1
                        overall_seen_tracker_ids.add(original_tracker_id)

                    if class_name == CLASS_TO_TRACK_EXCLUSIVELY:
                        if original_tracker_id not in box_id_map:
                            box_id_map[original_tracker_id] = next_display_box_id
                            next_display_box_id += 1
                        display_id_for_this_box = box_id_map[original_tracker_id]
                        label = f"ID:{display_id_for_this_box} {class_name} {conf:.2f}"

                # --- Visuals: Bounding box for all ---
                box_color = (0, 255, 0) # Default Green
                if class_name == "box_broken": box_color = (0,0,255)
                elif class_name == "open_package": box_color = (0,165,255)
                elif class_name == "forklift": box_color = (255,0,0)
                elif class_name == "pallets": box_color = (255,255,0)
                elif class_name == "person": box_color = (128,0,128)
                # No specific color for 'box' here as it's handled by default or its ID presence

                cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), box_color, 2)
                cv2.putText(annotated_frame, label, (x1, y1 - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, box_color, 2)
        
        y_offset = 30
        cv2.putText(annotated_frame, f"Frame: {frame_count}",
                    (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        y_offset += 25
        for cls_name_txt, count_txt in current_frame_object_counts.items():
            if count_txt > 0:
                cv2.putText(annotated_frame, f"{cls_name_txt}: {count_txt}",
                            (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
                y_offset += 25

        if writer:
            writer.write(annotated_frame)

        cv2.imshow(f'Warehouse Tracking (Sequential IDs for "{CLASS_TO_TRACK_EXCLUSIVELY}")', annotated_frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

        if frame_count % 100 == 0:
            # next_display_box_id is 1 more than the highest ID assigned
            unique_box_display_count = next_display_box_id -1 if next_display_box_id > 1 else 0
            print(f"Processed {frame_count} frames. Unique '{CLASS_TO_TRACK_EXCLUSIVELY}' display count: {unique_box_display_count}. Current frame counts: {current_frame_object_counts}")

    cap.release()
    if writer:
        writer.release()
    cv2.destroyAllWindows()

    total_unique_display_boxes = next_display_box_id - 1 if next_display_box_id > 1 else 0

    print(f"\n🎉 Video processing complete!")
    print(f"📝 Total frames processed: {frame_count}")
    print(f"📦 Total unique '{CLASS_TO_TRACK_EXCLUSIVELY}' objects (with sequential display IDs): {total_unique_display_boxes}")
    # print(f"📊 Overall summary of unique objects (any class with original tracker ID) detected: {overall_objects_summary}") # Optional
    
    return total_unique_display_boxes, overall_objects_summary




In [7]:
# --- Usage Example ---
if __name__ == "__main__":
    print(f"🚀 Warehouse Video Detection (Custom Sequential Tracking for '{CLASS_TO_TRACK_EXCLUSIVELY}') System Ready!")
    # --- CONFIGURE YOUR VIDEO PATH HERE ---
    video_input_path = r"E:\elevatetrsest\CamEDGE counting\test\test9_warehouse.mp4"# REPLACE THIS
    video_output_path = r'E:\elevatetrsest\CamEDGE counting\tracking_output\output9.mp4' # REPLACE THIS (OPTIONAL)

    if video_input_path == r'path_to_your_warehouse_video.mp4':
        print("\n⚠️ PLEASE UPDATE 'video_input_path' in the script with your actual video file path.")
    else:
        try:
            print(f"\nProcessing video: {video_input_path}")
            # Example: Process and display only
            unique_boxes_display_count, _ = detect_and_custom_track_video(video_input_path, conf_threshold=0.6)
            # Example: Process and save
            # unique_boxes_display_count, _ = detect_and_custom_track_video(video_input_path, video_output_path, conf_threshold=0.4)
            
            print(f"\nFinal count of unique '{CLASS_TO_TRACK_EXCLUSIVELY}' objects (with sequential display IDs): {unique_boxes_display_count}")

        except FileNotFoundError:
            print(f"ERROR: Video file not found at '{video_input_path}'. Please check the path.")
        except Exception as e:
            print(f"An error occurred: {e}")
            print("Ensure your video path is correct and OpenCV and Ultralytics are installed properly.")

🚀 Warehouse Video Detection (Custom Sequential Tracking for 'box') System Ready!

Processing video: E:\elevatetrsest\CamEDGE counting\test\test9_warehouse.mp4
Processed 100 frames. Unique 'box' display count: 5. Current frame counts: {'box': 3, 'box_broken': 0, 'forklift': 0, 'open_package': 0, 'package': 0, 'pallets': 0, 'person': 0}
Processed 200 frames. Unique 'box' display count: 8. Current frame counts: {'box': 2, 'box_broken': 0, 'forklift': 0, 'open_package': 0, 'package': 0, 'pallets': 0, 'person': 0}
Processed 300 frames. Unique 'box' display count: 14. Current frame counts: {'box': 2, 'box_broken': 0, 'forklift': 0, 'open_package': 0, 'package': 0, 'pallets': 0, 'person': 0}
Processed 400 frames. Unique 'box' display count: 17. Current frame counts: {'box': 2, 'box_broken': 0, 'forklift': 0, 'open_package': 0, 'package': 0, 'pallets': 0, 'person': 0}
Processed 500 frames. Unique 'box' display count: 19. Current frame counts: {'box': 1, 'box_broken': 0, 'forklift': 0, 'open_pa