In [9]:
pip install -U ultralytics

Note: you may need to restart the kernel to use updated packages.


In [1]:
import cv2
import numpy as np
import os
import time
from collections import defaultdict
from ultralytics import YOLO


In [5]:
 class CCTVAnomalyDetection:
    def __init__(self, video_source=0, output_dir="output", frame_interval=15, use_webcam=False):
        self.video_source = video_source
        self.output_dir = output_dir
        self.frame_interval = frame_interval
        self.use_webcam = use_webcam

        
        os.makedirs(output_dir, exist_ok=True)
        
       
        print("Loading YOLO model...")
        self.yolo_model = YOLO("yolov8n.pt")
        print("YOLO model loaded successfully!")

        
        self.people_tracks = defaultdict(list)
        self.next_person_id = 0
        self.loitering_time_threshold = 5  
        self.loitering_distance_threshold = 50  
        self.loitering_alerts = []

        
        self.detection_history = []
        self.anomaly_frames = []
        self.colors = {}
        
        if use_webcam:
            print(f"Initialization complete. Using webcam as video source.")
        else:
            print(f"Initialization complete. Processing video: {video_source}")

    def _detect_people(self, frame):
        try:
            
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            
            
            results = self.yolo_model.predict(source=rgb_frame, imgsz=416, conf=0.5, verbose=False)

            people = []
            for result in results:
                for box in result.boxes:
                    cls = int(box.cls[0])
                    conf = float(box.conf[0])
                    # Class 0 is person in COCO dataset
                    if cls == 0 and conf > 0.5:
                        x1, y1, x2, y2 = map(int, box.xyxy[0])
                        
                        x = max(0, x1)
                        y = max(0, y1)
                        w = min(frame.shape[1] - x, x2 - x1)
                        h = min(frame.shape[0] - y, y2 - y1)
                        if w > 0 and h > 0:
                            people.append({
                                'bbox': (x, y, w, h),
                                'center': (x + w//2, y + h//2)
                            })
            return people
        except Exception as e:
            print(f"YOLOv8 detection error: {e}")
            return []

    def _get_color_for_id(self, person_id):
        if person_id not in self.colors:
            self.colors[person_id] = tuple(np.random.randint(0, 255, 3).tolist())
        return self.colors[person_id]

    def _track_people(self, people, timestamp):
        if not people:
            return

        
        if not any(self.people_tracks.values()):
            for person in people:
                self.people_tracks[self.next_person_id].append({
                    'center': person['center'],
                    'bbox': person['bbox'],
                    'timestamp': timestamp
                })
                self.next_person_id += 1
            return

      
        unmatched_people = list(range(len(people)))
        tracked_ids = list(self.people_tracks.keys())

        for track_id in tracked_ids:
            last_detection = self.people_tracks[track_id][-1]
            last_center = last_detection['center']
            min_dist = float('inf')
            match_idx = None

          
            for i in unmatched_people:
                curr_center = people[i]['center']
                dist = np.linalg.norm(np.array(curr_center) - np.array(last_center))
                if dist < min_dist and dist < 100:  
                    min_dist = dist
                    match_idx = i

          
            if match_idx is not None:
                self.people_tracks[track_id].append({
                    'center': people[match_idx]['center'],
                    'bbox': people[match_idx]['bbox'],
                    'timestamp': timestamp
                })
                unmatched_people.remove(match_idx)

    
        for idx in unmatched_people:
            self.people_tracks[self.next_person_id].append({
                'center': people[idx]['center'],
                'bbox': people[idx]['bbox'],
                'timestamp': timestamp
            })
            self.next_person_id += 1

    def _draw_tracking_trail(self, frame, person_id, max_length=20):
        if person_id not in self.people_tracks:
            return frame
            
        
        detections = self.people_tracks[person_id][-max_length:]
        color = self._get_color_for_id(person_id)
        
      
        for i in range(1, len(detections)):
            cv2.line(frame, detections[i - 1]['center'], detections[i]['center'], color, 2)
            
        return frame

    def _detect_loitering(self, current_time):
        loiterers = []
        for person_id, detections in self.people_tracks.items():
            
            if len(detections) < 5:
                continue
                
            
            duration = current_time - detections[0]['timestamp']
            
            if duration >= self.loitering_time_threshold:
         
                centers = [d['center'] for d in detections]
                max_distance = max(np.linalg.norm(np.array(c1) - np.array(c2)) 
                                   for i, c1 in enumerate(centers) 
                                   for c2 in centers[i+1:])
                                   
                if max_distance < self.loitering_distance_threshold:
                    loiterers.append({
                        'person_id': person_id,
                        'bbox': detections[-1]['bbox'],
                        'duration': duration
                    })
                    
                    
                    if person_id not in [a['person_id'] for a in self.loitering_alerts]:
                        self.loitering_alerts.append({
                            'person_id': person_id,
                            'timestamp': current_time,
                            'location': detections[-1]['center'],
                            'duration': duration
                        })
                        print(f"ALERT: Person {person_id} loitering for {duration:.1f} seconds")
                        
        return loiterers

    def _add_info_panel(self, frame):
        """Add an information panel with detection stats"""
        h, w = frame.shape[:2]
        panel_height = 70
        panel = np.zeros((panel_height, w, 3), dtype=np.uint8)
        
      
        cv2.rectangle(panel, (0, 0), (w, panel_height), (50, 50, 50), -1)
        
      
        active_tracks = len(self.people_tracks)
        loitering_count = len([a for a in self.loitering_alerts 
                              if a['person_id'] in self.people_tracks])
                              
        cv2.putText(panel, f"Active Tracks: {active_tracks}", (10, 25), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (200, 200, 200), 2)
        cv2.putText(panel, f"Loitering Alerts: {loitering_count}", (250, 25), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (50, 50, 255), 2)
        cv2.putText(panel, f"Total Persons Detected: {self.next_person_id}", (10, 55), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (200, 200, 200), 2)
        
        
        result = np.vstack([panel, frame])
        return result

    def process_video(self):
        
        cap = cv2.VideoCapture(self.video_source)
        if not cap.isOpened():
            print(f"Error opening video source: {self.video_source}")
            return False

        fps = cap.get(cv2.CAP_PROP_FPS)
        if self.use_webcam:
            fps = 30  
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        
        if self.use_webcam:
            total_frames = float('inf')  
            print(f"Webcam started: {width}x{height} at {fps} FPS")
        else:
            total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            print(f"Video details: {width}x{height}, {fps} FPS, {total_frames} total frames")

        
        out_path = os.path.join(self.output_dir, "output.mp4")
        out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height + 70))

        frame_count = 0
        start_time = time.time()
        
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                if self.use_webcam:
                    print("Error reading from webcam. Attempting to reconnect...")
                    cap = cv2.VideoCapture(self.video_source)
                    if not cap.isOpened():
                        print("Failed to reconnect to webcam. Exiting.")
                        break
                    continue
                else:
                    print("End of video file reached.")
                    break

            process_this_frame = (frame_count % self.frame_interval == 0) if not self.use_webcam else True
            
            if process_this_frame:
                current_time = time.time() - start_time if self.use_webcam else frame_count / fps
                
                
                if not self.use_webcam and frame_count % (self.frame_interval * 10) == 0:
                    elapsed = time.time() - start_time
                    progress = frame_count / total_frames * 100
                    print(f"Processing: {progress:.1f}% complete | Frame {frame_count}/{total_frames} | Time: {elapsed:.1f}s")
                
                
                people = self._detect_people(frame)
                self.detection_history.append(len(people))
                
                
                self._track_people(people, current_time)
                
                
                loiterers = self._detect_loitering(current_time)

            
                for person_id in self.people_tracks:
                    frame = self._draw_tracking_trail(frame, person_id)

               
                for person in people:
                    x, y, w, h = person['bbox']
                    cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

                
                for l in loiterers:
                    x, y, w, h = l['bbox']
                    color = (0, 0, 255)  
                    cv2.rectangle(frame, (x, y), (x + w, y + h), color, 3)
                    cv2.putText(frame, f"Loitering: {l['duration']:.1f}s", (x, y - 10), 
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

               
                frame_with_panel = self._add_info_panel(frame)
                
               
                out.write(frame_with_panel)
                
                
                display_frame = frame_with_panel
                if width > 1280:
                    scale = 1280 / width
                    display_frame = cv2.resize(frame_with_panel, (int(width * scale), int((height + 70) * scale)))
                
                cv2.imshow("CCTV Anomaly Detection", display_frame)
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    print("Quitting - 'q' key pressed")
                    break
                elif key == ord('s') and self.use_webcam:
                    
                    snapshot_path = os.path.join(self.output_dir, f"snapshot_{time.strftime('')}.jpg")
                    cv2.imwrite(snapshot_path, frame)
                    print(f"Snapshot saved to {snapshot_path}")

            frame_count += 1

   
        end_time = time.time()
        processing_time = end_time - start_time
        
        cap.release()
        out.release()
        cv2.destroyAllWindows()
        
        print(f"Video processing completed in {processing_time:.2f} seconds.")
        print(f"Output saved to: {out_path}")
        print(f"Total people detected: {self.next_person_id}")
        print(f"Total loitering alerts: {len(self.loitering_alerts)}")
        
        return True

def check_requirements():
    """Check if all required libraries are installed"""
    try:
        import cv2
        import numpy
        from ultralytics import YOLO
        print("All required packages are installed.")
        return True
    except ImportError as e:
        print(f"Missing requirement: {e}")
        print("Please install required packages with: pip install opencv-python numpy ultralytics")
        return False

def list_sample_videos():
    """Check for sample videos in the current directory"""
    video_extensions = ['.mp4', '.avi', '.mov', '.mkv']
    sample_videos = []
    
    for file in os.listdir('.'):
        if any(file.lower().endswith(ext) for ext in video_extensions):
            sample_videos.append(file)
            
    return sample_videos


if __name__ == "__main__":
    print("CCTV Anomaly Detection System")
    print("-----------------------------")
    
   
    if not check_requirements():
        exit(1)
    
    
    sample_videos = list_sample_videos()
    if sample_videos:
        print("\nAvailable video files in current directory:")
        for i, video in enumerate(sample_videos, 1):
            print(f"{i}. {video}")
    else:
        print("\nNo video files found in the current directory.")
    
    
    print("\nSelect video source:")
    print("1. Use webcam")
    print("2. Enter video file path")
    if sample_videos:
        print("3. Use one of the listed videos")
    
    choice = input("Enter your choice (1/2/3): ")
    
    use_webcam = False
    video_source = None
    
    if choice == '1':
        use_webcam = True
        video_source = 0  
        
   
        test_cam = cv2.VideoCapture(0)
        if test_cam.isOpened():
            test_cam.release()
            cam_choice = input("Enter webcam index (0 for default, or try 1,2,etc. for other cameras): ")
            try:
                video_source = int(cam_choice)
            except ValueError:
                print("Invalid input. Using default webcam (0).")
                video_source = 0
    elif choice == '2':
        video_path = input("Enter complete path to video file: ")
        if os.path.exists(video_path):
            video_source = video_path
        else:
            print(f"Error: File '{video_path}' not found!")
            exit(1)
    elif choice == '3' and sample_videos:
        try:
            idx = int(input(f"Enter video number (1-{len(sample_videos)}): ")) - 1
            if 0 <= idx < len(sample_videos):
                video_source = sample_videos[idx]
            else:
                print("Invalid selection. Exiting.")
                exit(1)
        except ValueError:
            print("Invalid input. Exiting.")
            exit(1)
    else:
        print("Invalid choice. Exiting.")
        exit(1)
        
    output_dir = "output"
    
    
    frame_interval = 1 if use_webcam else 15
    if not use_webcam:
        try:
            interval_input = input("Enter frame processing interval (or press Enter for default 15): ")
            if interval_input:
                frame_interval = int(interval_input)
        except ValueError:
            print("Invalid input. Using default frame interval of 15.")
            frame_interval = 15
    
    print(f"\nStarting detection with settings:")
    if use_webcam:
        print(f"- Video source: Webcam (index {video_source})")
    else:
        print(f"- Video source: {video_source}")
    print(f"- Output directory: {output_dir}")
    print(f"- Frame interval: {frame_interval}")
    
    if use_webcam:
        print("\nControls:")
        print("- Press 'q' to quit")
        print("- Press 's' to save a snapshot")
    
    print("\nStarting detection...")
    
    try:
        detector = CCTVAnomalyDetection(video_source, output_dir, frame_interval, use_webcam)
        success = detector.process_video()
        if not success:
            print("\nDetection failed. Please try again with a different video source.")
    except KeyboardInterrupt:
        print("\nOperation cancelled by user.")
    except Exception as e:
        print(f"\nError running the detector: {e}")
        import traceback
        traceback.print_exc()

CCTV Anomaly Detection System
-----------------------------
All required packages are installed.

Available video files in current directory:
1. 4.0-39.0.mp4
2. Footage of jewellery store robbery.mp4

Select video source:
1. Use webcam
2. Enter video file path
3. Use one of the listed videos


Enter your choice (1/2/3):  3
Enter video number (1-2):  1
Enter frame processing interval (or press Enter for default 15):  3



Starting detection with settings:
- Video source: 4.0-39.0.mp4
- Output directory: output
- Frame interval: 3

Starting detection...
Loading YOLO model...
YOLO model loaded successfully!
Initialization complete. Processing video: 4.0-39.0.mp4
Video details: 640x360, 30.0 FPS, 1011 total frames
Processing: 0.0% complete | Frame 0/1011 | Time: 0.0s
Processing: 3.0% complete | Frame 30/1011 | Time: 0.6s
Processing: 5.9% complete | Frame 60/1011 | Time: 1.0s
Processing: 8.9% complete | Frame 90/1011 | Time: 1.4s
Processing: 11.9% complete | Frame 120/1011 | Time: 1.8s
Processing: 14.8% complete | Frame 150/1011 | Time: 2.2s
Processing: 17.8% complete | Frame 180/1011 | Time: 2.6s
Processing: 20.8% complete | Frame 210/1011 | Time: 3.1s
Processing: 23.7% complete | Frame 240/1011 | Time: 3.6s
Processing: 26.7% complete | Frame 270/1011 | Time: 4.2s
Processing: 29.7% complete | Frame 300/1011 | Time: 4.8s
Processing: 32.6% complete | Frame 330/1011 | Time: 5.5s
Processing: 35.6% complete | 