In [1]:
print("ok")

ok


In [2]:
from ultralytics import YOLO
import cv2

class ObjectDetector:
    def __init__(self, model_name='yolov8n.pt'):
        self.model = YOLO(model_name)

    def detect(self, frame):
        results = self.model(frame)[0]
        detections = []
        for r in results.boxes:
            x1, y1, x2, y2 = map(int, r.xyxy[0])
            conf = float(r.conf)
            cls = int(r.cls)
            label = self.model.names[cls]
            detections.append({
                'bbox': [x1, y1, x2 - x1, y2 - y1],
                'confidence': conf,
                'class': label
            })
        return detections


In [3]:
# tracker.py
from deep_sort_realtime.deepsort_tracker import DeepSort

class ObjectTracker:
    def __init__(self):
        self.tracker = DeepSort(max_age=30)  # adjust for how long to remember

    def update(self, detections):
        track_inputs = [
            (det['bbox'], det['confidence'], det['class']) for det in detections
        ]
        tracks = self.tracker.update_tracks(track_inputs, frame=None)
        tracked = []
        for track in tracks:
            if not track.is_confirmed():
                continue
            bbox = track.to_ltrb()  # [x1, y1, x2, y2]
            tracked.append({
                'track_id': track.track_id,
                'bbox': bbox,
                'class': track.get_class()
            })
        return tracked


In [4]:
class SceneManager:
    def __init__(self):
        self.known_ids = set()

    def analyze(self, tracked_objects):
        current_ids = set(obj['track_id'] for obj in tracked_objects)
        new_objects = current_ids - self.known_ids
        missing_objects = self.known_ids - current_ids
        self.known_ids = current_ids

        return list(new_objects), list(missing_objects)


In [5]:
import cv2
import time



In [7]:
class ObjectDetectionPipeline:
    def __init__(self):
        self.detector = ObjectDetector()
        self.tracker = ObjectTracker()
        self.scene_manager = SceneManager()

    def run(self):
        cap = cv2.VideoCapture(4747)  # Webcam
        if not cap.isOpened():
            print("Error: Could not open webcam.")
            return

        output_fps = []
        while True:
            start_time = time.time()
            ret, frame = cap.read()
            if not ret:
                break

            detections = self.detector.detect(frame)
            tracked_objects = self.tracker.update(detections)
            missing_ids, new_ids = self.scene_manager.update(tracked_objects)

            for obj in tracked_objects:
                x, y, w, h = obj['bbox']
                label = f"{obj['class']} {obj['id']}"
                color = (0, 255, 0)
                if obj['id'] in new_ids:
                    label += " (New)"
                    color = (255, 0, 0)
                elif obj['id'] in missing_ids:
                    label += " (Missing)"
                    color = (0, 0, 255)
                cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
                cv2.putText(frame, label, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

            # Calculate FPS
            end_time = time.time()
            fps = 1 / (end_time - start_time)
            output_fps.append(fps)
            cv2.putText(frame, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)

            cv2.imshow("Object Detection", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()

        avg_fps = sum(output_fps) / len(output_fps)
        print(f"Average FPS: {avg_fps:.2f}")

if __name__ == '__main__':
    pipeline = ObjectDetectionPipeline()
    pipeline.run()

Error: Could not open webcam.
