# Key Features:
- YOLOv8 Object Detection - YOLO model for accuracy
- Line Crossing Detection - Counts people, cars, trucks, motorcycles crossing lines
- OpenCV Integration - Full video processing pipeline
- IP Camera Ready - RTSP stream support for cameras
- Real-time Processing - Webcam and live stream analysis

In [2]:
# Install and Import Dependencies
!pip install ultralytics opencv-python pillow matplotlib numpy
!pip install requests urllib3

import cv2
import numpy as np
import matplotlib.pyplot as plt
from ultralytics import YOLO
import time
from collections import defaultdict, deque
from PIL import Image
import requests
from io import BytesIO
import os
from datetime import datetime

# Set up matplotlib for video display
plt.rcParams['figure.figsize'] = [12, 8]

print("All dependencies installed and imported successfully!")
print(f"OpenCV version: {cv2.__version__}")


Collecting ultralytics
  Downloading ultralytics-8.3.146-py3-none-any.whl.metadata (37 kB)
Collecting opencv-python
  Downloading opencv_python-4.11.0.86-cp37-abi3-macosx_13_0_arm64.whl.metadata (20 kB)
Collecting scipy>=1.4.1 (from ultralytics)
  Using cached scipy-1.15.3-cp312-cp312-macosx_14_0_arm64.whl.metadata (61 kB)
Collecting torch>=1.8.0 (from ultralytics)
  Using cached torch-2.7.0-cp312-none-macosx_11_0_arm64.whl.metadata (29 kB)
Collecting torchvision>=0.9.0 (from ultralytics)
  Downloading torchvision-0.22.0-cp312-cp312-macosx_11_0_arm64.whl.metadata (6.1 kB)
Collecting tqdm>=4.64.0 (from ultralytics)
  Using cached tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Collecting py-cpuinfo (from ultralytics)
  Downloading py_cpuinfo-9.0.0-py3-none-any.whl.metadata (794 bytes)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.14-py3-none-any.whl.metadata (9.4 kB)
Collecting filelock (from torch>=1.8.0->ultralytics)
  Using ca

In [3]:
# Load pretrained YOLOv8 model and set up detection parameters
class CCTVDetector:
    def __init__(self, model_size='n'):
        """
        Initialize CCTV detector with YOLO model
        model_size: 'n' (nano), 's' (small), 'm' (medium), 'l' (large), 'x' (extra large)
        """
        self.model = YOLO(f'yolov8{model_size}.pt')

        # Target classes for CCTV monitoring (COCO dataset indices)
        self.target_classes = {
            0: 'person',
            1: 'bicycle',
            2: 'car',
            3: 'motorcycle',
            5: 'bus',
            7: 'truck'
        }

        # Detection parameters
        self.confidence_threshold = 0.5
        self.nms_threshold = 0.4

        # Tracking parameters
        self.max_disappeared = 30
        self.max_distance = 100

        print(f"YOLO model initialized: YOLOv8{model_size}")
        print(f"Monitoring classes: {list(self.target_classes.values())}")

# Initialize detector
detector = CCTVDetector(model_size='s')  # Used s for speed on my laptop

Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8s.pt to 'yolov8s.pt'...


#######################################################################   99.8%                                                     29.7%################                              61.6%####################################################     96.5%

YOLO model initialized: YOLOv8s
Monitoring classes: ['person', 'bicycle', 'car', 'motorcycle', 'bus', 'truck']


######################################################################## 100.0%


In [4]:
# Implement line crossing detection and object counting logic
class LineCrossingCounter:
    def __init__(self, line_start, line_end, direction="both"):
        """
        Initialize line crossing counter
        line_start: (x1, y1) start point of counting line
        line_end: (x2, y2) end point of counting line
        direction: "both", "up", "down", "left", "right"
        """
        self.line_start = line_start
        self.line_end = line_end
        self.direction = direction

        # Counting variables
        self.crossings = defaultdict(int)  # Count by object class
        self.total_crossings = 0

        # Tracking variables
        self.tracked_objects = {}
        self.next_object_id = 0

        # History for direction detection
        self.position_history = defaultdict(lambda: deque(maxlen=10))

    def point_line_side(self, point, line_start, line_end):
        """Determine which side of line a point is on"""
        return ((line_end[0] - line_start[0]) * (point[1] - line_start[1]) -
                (line_end[1] - line_start[1]) * (point[0] - line_start[0]))

    def has_crossed_line(self, prev_pos, curr_pos):
        """Check if object crossed the counting line"""
        if prev_pos is None or curr_pos is None:
            return False

        prev_side = self.point_line_side(prev_pos, self.line_start, self.line_end)
        curr_side = self.point_line_side(curr_pos, self.line_start, self.line_end)

        # Check if signs are different (crossed line)
        return (prev_side > 0) != (curr_side > 0)

    def update_tracking(self, detections):
        """Update object tracking and check for line crossings"""
        current_centroids = []
        current_classes = []

        # Extract centroids and classes from detections
        for detection in detections:
            x1, y1, x2, y2, conf, cls = detection
            centroid = (int((x1 + x2) / 2), int((y1 + y2) / 2))
            current_centroids.append(centroid)
            current_classes.append(int(cls))

        # Simple tracking based on nearest neighbor
        if len(current_centroids) == 0:
            return

        # Update existing tracks or create new ones
        for i, (centroid, cls) in enumerate(zip(current_centroids, current_classes)):
            # Find closest existing track
            min_distance = float('inf')
            closest_id = None

            for obj_id, (prev_centroid, prev_cls) in self.tracked_objects.items():
                if cls == prev_cls:  # Only match same class
                    distance = np.sqrt((centroid[0] - prev_centroid[0])**2 +
                                     (centroid[1] - prev_centroid[1])**2)
                    if distance < min_distance and distance < self.next_object_id:
                        min_distance = distance
                        closest_id = obj_id

            # Update existing track or create new one
            if closest_id is not None and min_distance < 100:
                obj_id = closest_id
                prev_centroid = self.tracked_objects[obj_id][0]

                # Check for line crossing
                if self.has_crossed_line(prev_centroid, centroid):
                    class_name = detector.target_classes.get(cls, f'class_{cls}')
                    self.crossings[class_name] += 1
                    self.total_crossings += 1
                    print(f"🚨 {class_name} crossed the line! Total: {self.total_crossings}")

                self.tracked_objects[obj_id] = (centroid, cls)
            else:
                # Create new track
                self.tracked_objects[self.next_object_id] = (centroid, cls)
                self.next_object_id += 1

In [5]:
# Core video processing and detection pipeline
def process_frame(frame, detector, line_counter, show_line=True):
    """
    Process single frame: detect objects, draw bounding boxes, update counting
    """
    # Run YOLO detection
    results = detector.model(frame, conf=detector.confidence_threshold)

    # Extract detections
    detections = []
    annotated_frame = frame.copy()

    for result in results:
        boxes = result.boxes
        if boxes is not None:
            for box in boxes:
                # Extract box coordinates and info
                x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                confidence = box.conf[0].cpu().numpy()
                class_id = int(box.cls[0].cpu().numpy())

                # Only process target classes
                if class_id in detector.target_classes:
                    detections.append([x1, y1, x2, y2, confidence, class_id])

                    # Draw bounding box
                    color = (0, 255, 0)  # Green
                    cv2.rectangle(annotated_frame, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)

                    # Draw label
                    label = f"{detector.target_classes[class_id]}: {confidence:.2f}"
                    label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0]
                    cv2.rectangle(annotated_frame, (int(x1), int(y1) - label_size[1] - 5),
                                (int(x1) + label_size[0], int(y1)), color, -1)
                    cv2.putText(annotated_frame, label, (int(x1), int(y1) - 5),
                              cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)

    # Update line crossing detection
    line_counter.update_tracking(detections)

    # Draw counting line
    if show_line:
        cv2.line(annotated_frame, line_counter.line_start, line_counter.line_end, (0, 0, 255), 3)
        cv2.putText(annotated_frame, "COUNTING LINE",
                   (line_counter.line_start[0], line_counter.line_start[1] - 10),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

    # Draw statistics
    y_offset = 30
    cv2.putText(annotated_frame, f"Total Crossings: {line_counter.total_crossings}",
               (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)

    y_offset += 30
    for class_name, count in line_counter.crossings.items():
        cv2.putText(annotated_frame, f"{class_name}: {count}",
                   (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
        y_offset += 25

    return annotated_frame, detections

In [6]:
# Download sample CCTV traffic video for demonstration
def download_free_cctv_videos():
    """Download free CCTV footage - Just run this!"""

    # Free CCTV-style videos (no copyright issues)
    free_videos = {
        'traffic_intersection.mp4': 'https://videos.pexels.com/video-files/2103099/2103099-hd_1920_1080_30fps.mp4',
        'street_crossing.mp4': 'https://videos.pexels.com/video-files/1721294/1721294-hd_1920_1080_25fps.mp4',
        'parking_lot.mp4': 'https://videos.pexels.com/video-files/2169880/2169880-hd_1920_1080_30fps.mp4',
        'busy_street.mp4': 'https://videos.pexels.com/video-files/3571264/3571264-hd_1920_1080_30fps.mp4'
    }

    # Create folder
    os.makedirs('cctv_test_videos', exist_ok=True)

    print("Downloading CCTV footage...")

    for filename, url in free_videos.items():
        try:
            print(f"Downloading {filename}...")
            response = requests.get(url, stream=True)

            if response.status_code == 200:
                filepath = f'cctv_test_videos/{filename}'
                with open(filepath, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        f.write(chunk)
                print(f" Downloaded: {filename}")
            else:
                print(f" Failed: {filename}")

        except Exception as e:
            print(f" Error downloading {filename}: {e}")

    return 'cctv_test_videos'

video_folder = download_free_cctv_videos()

Downloading CCTV footage...
Downloading traffic_intersection.mp4...
 Downloaded: traffic_intersection.mp4
Downloading street_crossing.mp4...
 Downloaded: street_crossing.mp4
Downloading parking_lot.mp4...
 Downloaded: parking_lot.mp4
Downloading busy_street.mp4...
 Downloaded: busy_street.mp4

 Testing detection on cctv_test_videos/traffic_intersection.mp4


NameError: name 'analyze_cctv_video' is not defined

In [7]:
# Process sample video with object detection and line crossing
def analyze_cctv_video(video_path, line_start=(400, 200), line_end=(400, 400),
                      max_frames=300, save_output=True):
    """
    Analyze CCTV video with object detection and line crossing
    """
    if not os.path.exists(video_path):
        print(f"Video file not found: {video_path}")
        return None

    # Initialize video capture
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error opening video file")
        return None

    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    print(f"Video Info: {width}x{height}, {fps} FPS, {total_frames} frames")

    # Initialize line counter (we can adjust coordinates based on video)
    line_counter = LineCrossingCounter(line_start, line_end)

    # Setup output video writer
    output_path = "analyzed_cctv_output.mp4"
    if save_output:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # Process video frames
    frame_count = 0
    start_time = time.time()

    print("Processing video frames...")

    try:
        while frame_count < max_frames:
            ret, frame = cap.read()
            if not ret:
                break

            # Process frame
            processed_frame, detections = process_frame(frame, detector, line_counter)

            # Save frame if output enabled
            if save_output:
                out.write(processed_frame)

            # Display progress
            if frame_count % 30 == 0:
                elapsed = time.time() - start_time
                fps_current = frame_count / elapsed if elapsed > 0 else 0
                print(f"Frame {frame_count}/{max_frames}, FPS: {fps_current:.1f}, Crossings: {line_counter.total_crossings}")

            frame_count += 1

    except KeyboardInterrupt:
        print("Processing interrupted by user")

    # Cleanup
    cap.release()
    if save_output:
        out.release()
        print(f"Output saved to: {output_path}")

    # Print final statistics
    print("\n Final Analysis Results:")
    print(f"Total frames processed: {frame_count}")
    print(f"Total line crossings: {line_counter.total_crossings}")
    print("Crossings by object type:")
    for class_name, count in line_counter.crossings.items():
        print(f"  {class_name}: {count}")

    return line_counter


In [None]:
# Real-time object detection using built-in webcam for later don't run this cell !!!
def real_time_detection(camera_index=0, line_position=0.5):
    """
    Real-time object detection using webcam
    camera_index: 0 for built-in camera, 1 for external
    line_position: vertical position of counting line (0.0-1.0)
    """
    # Initialize camera
    cap = cv2.VideoCapture(camera_index)
    if not cap.isOpened():
        print("❌ Cannot open camera")
        return

    # Get camera resolution
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Setup counting line (horizontal line across middle)
    line_y = int(height * line_position)
    line_counter = LineCrossingCounter((0, line_y), (width, line_y))

    print("📹 Starting real-time detection...")
    print("Press 'q' to quit, 'r' to reset counters")

    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Process frame
            processed_frame, _ = process_frame(frame, detector, line_counter)

            # Display frame
            cv2.imshow('CCTV Object Detection', processed_frame)

            # Handle key presses
            key = cv2.waitKey(1) & 0xFF
            if key == ord('q'):
                break
            elif key == ord('r'):
                line_counter.crossings.clear()
                line_counter.total_crossings = 0
                print("Counters reset")

    except KeyboardInterrupt:
        print("Detection stopped by user")

    # Cleanup
    cap.release()
    cv2.destroyAllWindows()

    return line_counter

# Uncomment to run real-time detection
# real_time_results = real_time_detection()

In [None]:
# Functions for connecting to IP cameras and RTSP streams don't this until previous cell is up and running guys !!

class IPCameraHandler:
    def __init__(self):
        self.supported_formats = [
            "rtsp://username:password@ip_address:port/stream",
            "http://ip_address:port/video.mjpg",
            "rtsp://ip_address:port/live.sdp"
        ]

    def connect_ip_camera(self, camera_url, timeout=10):
        """
        Connect to IP camera using URL
        Common formats:
        - RTSP: rtsp://admin:password@192.168.1.100:554/stream1
        - HTTP: http://192.168.1.100:8080/video.mjpg
        """
        try:
            cap = cv2.VideoCapture(camera_url)
            cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)  # Reduce latency

            # Test connection
            ret, frame = cap.read()
            if ret:
                print(f"Successfully connected to IP camera: {camera_url}")
                height, width = frame.shape[:2]
                print(f"Stream resolution: {width}x{height}")
                return cap
            else:
                print(f"Failed to read from IP camera: {camera_url}")
                cap.release()
                return None

        except Exception as e:
            print(f"Error connecting to IP camera: {e}")
            return None

    def analyze_ip_stream(self, camera_url, line_start=None, line_end=None,
                         duration_minutes=5):
        """
        Analyze IP camera stream for specified duration
        """
        cap = self.connect_ip_camera(camera_url)
        if cap is None:
            return None

        # Get stream properties
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        # Default line position if not specified
        if line_start is None:
            line_start = (width // 4, height // 2)
        if line_end is None:
            line_end = (3 * width // 4, height // 2)

        # Initialize line counter
        line_counter = LineCrossingCounter(line_start, line_end)

        # Analysis parameters
        start_time = time.time()
        duration_seconds = duration_minutes * 60
        frame_count = 0

        print(f"Analyzing IP stream for {duration_minutes} minutes...")
        print("Press Ctrl+C to stop early")

        try:
            while time.time() - start_time < duration_seconds:
                ret, frame = cap.read()
                if not ret:
                    print("⚠️ Lost connection to IP camera")
                    break

                # Process frame
                processed_frame, _ = process_frame(frame, detector, line_counter)

                # Optional: Display frame (comment out for headless operation)
                # cv2.imshow('IP Camera Analysis', processed_frame)
                # if cv2.waitKey(1) & 0xFF == ord('q'):
                #     break

                frame_count += 1

                # Progress update every 30 seconds
                if frame_count % 900 == 0:  # Assuming ~30 FPS
                    elapsed = time.time() - start_time
                    print(f"Elapsed: {elapsed/60:.1f}min, Crossings: {line_counter.total_crossings}")

        except KeyboardInterrupt:
            print("⏹️ Analysis stopped by user")

        # Cleanup
        cap.release()
        cv2.destroyAllWindows()

        return line_counter

# Initialize IP camera handler
ip_camera = IPCameraHandler()

# Example usage (uncomment and modify URL to use)
# ip_results = ip_camera.analyze_ip_stream("rtsp://admin:password@192.168.1.100:554/stream1")


In [8]:
# Configure detection parameters and advanced features
class AdvancedDetectionConfig:
    def __init__(self):
        self.detection_zones = []
        self.alert_settings = {
            'max_objects': 10,
            'crowding_threshold': 5,
            'speed_detection': True
        }
        self.recording_settings = {
            'record_detections': True,
            'save_snapshots': True,
            'alert_video_length': 10  # seconds
        }

    def add_detection_zone(self, points, zone_name="Zone1"):
        """Add polygon detection zone"""
        self.detection_zones.append({
            'name': zone_name,
            'points': np.array(points, dtype=np.int32),
            'crossings': 0
        })

    def point_in_polygon(self, point, polygon):
        """Check if point is inside polygon zone"""
        return cv2.pointPolygonTest(polygon, point, False) >= 0

    def analyze_zones(self, detections):
        """Analyze detections within defined zones"""
        zone_counts = {}

        for zone in self.detection_zones:
            count = 0
            for detection in detections:
                x1, y1, x2, y2, conf, cls = detection
                center = ((x1 + x2) / 2, (y1 + y2) / 2)

                if self.point_in_polygon(center, zone['points']):
                    count += 1

            zone_counts[zone['name']] = count

        return zone_counts

# Initialize advanced configuration
advanced_config = AdvancedDetectionConfig()

# Example: Add detection zones (modify coordinates as needed)
# advanced_config.add_detection_zone([(100, 100), (300, 100), (300, 300), (100, 300)], "Entrance")
# advanced_config.add_detection_zone([(400, 200), (600, 200), (600, 400), (400, 400)], "Parking")


In [9]:
# Monitor detection performance and generate statistics
class PerformanceMonitor:
    def __init__(self):
        self.frame_times = deque(maxlen=100)
        self.detection_counts = deque(maxlen=1000)
        self.start_time = time.time()

    def update(self, frame_time, detection_count):
        """Update performance metrics"""
        self.frame_times.append(frame_time)
        self.detection_counts.append(detection_count)

    def get_fps(self):
        """Calculate current FPS"""
        if len(self.frame_times) < 2:
            return 0
        return len(self.frame_times) / sum(self.frame_times)

    def get_average_detections(self):
        """Get average detections per frame"""
        if not self.detection_counts:
            return 0
        return np.mean(self.detection_counts)

    def get_runtime(self):
        """Get total runtime in minutes"""
        return (time.time() - self.start_time) / 60

    def generate_report(self):
        """Generate performance report"""
        report = {
            'runtime_minutes': self.get_runtime(),
            'average_fps': self.get_fps(),
            'average_detections_per_frame': self.get_average_detections(),
            'total_frames_processed': len(self.frame_times),
            'peak_detections': max(self.detection_counts) if self.detection_counts else 0
        }
        return report

# Initialize performance monitor
performance_monitor = PerformanceMonitor()

In [10]:
# Export detection data and create analysis reports
class DataExporter:
    def __init__(self, output_dir="cctv_analysis_output"):
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)
        self.detection_log = []

    def log_detection(self, timestamp, object_type, position, confidence):
        """Log individual detection"""
        self.detection_log.append({
            'timestamp': timestamp,
            'object_type': object_type,
            'position': position,
            'confidence': confidence
        })

    def export_crossing_data(self, line_counter, filename=None):
        """Export line crossing data to CSV"""
        if filename is None:
            filename = f"crossings_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"

        filepath = os.path.join(self.output_dir, filename)

        with open(filepath, 'w') as f:
            f.write("Object_Type,Crossings\n")
            for obj_type, count in line_counter.crossings.items():
                f.write(f"{obj_type},{count}\n")
            f.write(f"Total,{line_counter.total_crossings}\n")

        print(f"📊 Crossing data exported to: {filepath}")
        return filepath

    def create_summary_report(self, line_counter, performance_monitor):
        """Create comprehensive analysis report"""
        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        performance_data = performance_monitor.generate_report()

        report = f"""
CCTV Analysis Report
Generated: {timestamp}

=== DETECTION SUMMARY ===
Total Line Crossings: {line_counter.total_crossings}

Crossings by Object Type:
"""
        for obj_type, count in line_counter.crossings.items():
            report += f"  {obj_type}: {count}\n"

        report += f"""
=== PERFORMANCE METRICS ===
Runtime: {performance_data['runtime_minutes']:.1f} minutes
Average FPS: {performance_data['average_fps']:.1f}
Frames Processed: {performance_data['total_frames_processed']}
Average Detections/Frame: {performance_data['average_detections_per_frame']:.1f}
Peak Detections: {performance_data['peak_detections']}

=== CONFIGURATION ===
Model: YOLOv8 Nano
Confidence Threshold: {detector.confidence_threshold}
Target Classes: {list(detector.target_classes.values())}
"""

        # Save report
        report_path = os.path.join(self.output_dir, f"analysis_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt")
        with open(report_path, 'w') as f:
            f.write(report)

        print(f"📋 Analysis report saved to: {report_path}")
        return report_path

# Initialize data exporter
data_exporter = DataExporter()

In [11]:
# Tools for configuring detection parameters and camera calibration
def interactive_line_setup(video_path_or_camera=0):
    """
    Interactive tool to set up counting line position
    Click two points to define the counting line
    """
    # Global variables for mouse callback
    points = []

    def mouse_callback(event, x, y, flags, param):
        if event == cv2.EVENT_LBUTTONDOWN:
            points.append((x, y))
            print(f"Point {len(points)}: ({x}, {y})")

    # Open video or camera
    if isinstance(video_path_or_camera, str):
        cap = cv2.VideoCapture(video_path_or_camera)
    else:
        cap = cv2.VideoCapture(video_path_or_camera)

    if not cap.isOpened():
        print("Cannot open video source")
        return None, None

    # Get first frame
    ret, frame = cap.read()
    if not ret:
        print("Cannot read from video source")
        return None, None

    # Setup window and mouse callback
    cv2.namedWindow('Setup Counting Line')
    cv2.setMouseCallback('Setup Counting Line', mouse_callback)

    print("Click two points to define the counting line")
    print("Press 'r' to reset, 'q' to confirm and quit")

    while len(points) < 2:
        display_frame = frame.copy()

        # Draw existing points
        for i, point in enumerate(points):
            cv2.circle(display_frame, point, 5, (0, 255, 0), -1)
            cv2.putText(display_frame, f"P{i+1}", (point[0]+10, point[1]),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        # Draw line if we have both points
        if len(points) == 2:
            cv2.line(display_frame, points[0], points[1], (0, 0, 255), 2)
            cv2.putText(display_frame, "Press 'q' to confirm", (10, 30),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

        cv2.imshow('Setup Counting Line', display_frame)

        key = cv2.waitKey(1) & 0xFF
        if key == ord('r'):
            points.clear()
            print("Points reset")
        elif key == ord('q') and len(points) == 2:
            break

    cv2.destroyAllWindows()
    cap.release()

    if len(points) == 2:
        print(f"Counting line configured: {points[0]} to {points[1]}")
        return points[0], points[1]
    else:
        return None, None

def calibrate_detection_settings():
    """
    Interactive calibration for detection settings
    """
    print("Detection Settings Calibration")
    print("Current settings:")
    print(f"  Confidence threshold: {detector.confidence_threshold}")
    print(f"  Target classes: {list(detector.target_classes.values())}")

    # Allow user to adjust settings
    new_confidence = input(f"Enter new confidence threshold (current: {detector.confidence_threshold}): ")
    if new_confidence:
        try:
            detector.confidence_threshold = float(new_confidence)
            print(f"Confidence threshold updated to: {detector.confidence_threshold}")
        except ValueError:
            print("Invalid value, keeping current setting")

    print("Calibration complete")

# Example: Set up counting line interactively (uncomment to use)
# line_start, line_end = interactive_line_setup(sample_video_path)

In [12]:
# Functions to test with my provided video files
def test_custom_video(video_path, line_coordinates=None, max_duration_minutes=2):
    """
    Test detection system with custom video file
    """
    if not os.path.exists(video_path):
        print(f"Video file not found: {video_path}")
        return None

    # Get video properties
    cap = cv2.VideoCapture(video_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    cap.release()

    print(f" Testing video: {video_path}")
    print(f"   Resolution: {width}x{height}")
    print(f"   FPS: {fps}")

    # Set default line coordinates if not provided
    if line_coordinates is None:
        line_start = (width // 4, height // 2)
        line_end = (3 * width // 4, height // 2)
    else:
        line_start, line_end = line_coordinates

    # Run analysis
    max_frames = int(fps * max_duration_minutes * 60)
    results = analyze_cctv_video(video_path, line_start, line_end, max_frames)

    return results

def batch_process_videos(video_directory, output_summary=True):
    """
    Process multiple video files in a directory
    """
    if not os.path.exists(video_directory):
        print(f"Directory not found: {video_directory}")
        return

    video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.wmv']
    video_files = []

    # Find video files
    for file in os.listdir(video_directory):
        if any(file.lower().endswith(ext) for ext in video_extensions):
            video_files.append(os.path.join(video_directory, file))

    if not video_files:
        print(f"No video files found in {video_directory}")
        return

    print(f"Found {len(video_files)} video files to process")

    # Process each video
    all_results = {}
    for video_path in video_files:
        print(f"\n Processing: {os.path.basename(video_path)}")
        result = test_custom_video(video_path, max_duration_minutes=1)
        if result:
            all_results[os.path.basename(video_path)] = result

    # Create summary report
    if output_summary and all_results:
        print(f"\n Batch Processing Summary:")
        print("-" * 50)
        for filename, result in all_results.items():
            print(f"{filename}: {result.total_crossings} crossings")

    return all_results

# Example usage
def test_with_downloaded_video():
    """Test detection with downloaded video"""
    test_video = 'cctv_test_videos/traffic_intersection.mp4'

    if os.path.exists(test_video):
        print(f"\n Testing detection on {test_video}")

        # Set counting line (adjust these coordinates as needed)
        line_start = (400, 300)  # Left side of line
        line_end = (800, 300)    # Right side of line

        # Run analysis
        results = analyze_cctv_video(test_video, line_start, line_end, max_frames=200)

        if results:
            print(f"\n Results:")
            print(f"Total crossings: {results.total_crossings}")
            for obj_type, count in results.crossings.items():
                print(f"{obj_type}: {count}")
    else:
        print("Video not found. Run download_free_cctv_videos() first")

# Run the test:
test_with_downloaded_video()


 Testing detection on cctv_test_videos/traffic_intersection.mp4
Video Info: 1920x1080, 30 FPS, 1800 frames
🔄 Processing video frames...

0: 384x640 13 cars, 2 trucks, 46.5ms
Speed: 1.9ms preprocess, 46.5ms inference, 1.1ms postprocess per image at shape (1, 3, 384, 640)
Frame 0/200, FPS: 0.0, Crossings: 0

0: 384x640 12 cars, 3 trucks, 38.2ms
Speed: 1.5ms preprocess, 38.2ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 cars, 4 trucks, 36.6ms
Speed: 1.3ms preprocess, 36.6ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 cars, 4 trucks, 35.8ms
Speed: 1.1ms preprocess, 35.8ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 cars, 3 trucks, 35.8ms
Speed: 1.2ms preprocess, 35.8ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 12 cars, 3 trucks, 36.3ms
Speed: 1.0ms preprocess, 36.3ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)
🚨 truck crossed t

In [13]:
import torch


# Optimize settings for any device (CUDA, MPS, CPU)
def optimize_for_device():
    """
    Automatically detect and optimize YOLO detection for available hardware
    """
    print(" Detecting optimal device configuration...")

    # Detect best available device
    if torch.cuda.is_available():
        device = 'cuda'
        device_name = torch.cuda.get_device_name(0)
        print(f"Using CUDA GPU: {device_name}")
        memory_gb = torch.cuda.get_device_properties(0).total_memory / 1024**3
        print(f"   GPU Memory: {memory_gb:.1f} GB")
    elif hasattr(torch, 'backends') and hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
        device = 'mps'
        print("Using Metal Performance Shaders (MPS) - Apple Silicon")
    else:
        device = 'cpu'
        print(" Using CPU")

    # Device-specific optimization settings
    if device == 'cuda':
        optimal_settings = {
            'model_size': 's',  # Small model for CUDA
            'confidence_threshold': 0.5,
            'input_resolution': (832, 832),  # Higher resolution for GPU
            'batch_size': 4,
            'max_fps': 60
        }
        performance_tips = [
            "GPU detected - using optimized settings for CUDA",
            "Higher resolution and batch size enabled",
            "Monitor GPU temperature and memory usage",
            "Consider using larger model (YOLOv8s/m) for better accuracy"
        ]
    elif device == 'mps':
        optimal_settings = {
            'model_size': 'n',  # Nano for MPS
            'confidence_threshold': 0.5,
            'input_resolution': (640, 640),
            'batch_size': 1,
            'max_fps': 30
        }
        performance_tips = [
            "Apple Silicon detected - optimized for MPS",
            "Nano model recommended for real-time performance",
            "Monitor thermal throttling on sustained loads",
            "Close other applications to maximize performance"
        ]
    else:  # CPU
        optimal_settings = {
            'model_size': 'n',  # Nano for CPU
            'confidence_threshold': 0.6,  # Slightly higher to reduce processing
            'input_resolution': (416, 416),  # Lower resolution for CPU
            'batch_size': 1,
            'max_fps': 15
        }
        performance_tips = [
            "CPU mode - optimized for lower computational load",
            "Reduced resolution and FPS for better performance",
            "Consider upgrading to GPU for real-time applications",
            "Use threading for better CPU utilization"
        ]

    # Apply optimizations
    detector.confidence_threshold = optimal_settings['confidence_threshold']

    print(f"\n Optimization settings for {device.upper()}:")
    for setting, value in optimal_settings.items():
        print(f"   {setting}: {value}")

    print(f"\n Device-specific Performance Tips:")
    for tip in performance_tips:
        print(f"   • {tip}")

    return device, optimal_settings

# Apply device-specific optimizations
current_device, device_settings = optimize_for_device()

🔧 Detecting optimal device configuration...
Using Metal Performance Shaders (MPS) - Apple Silicon

 Optimization settings for MPS:
   model_size: n
   confidence_threshold: 0.5
   input_resolution: (640, 640)
   batch_size: 1
   max_fps: 30

 Device-specific Performance Tips:
   • Apple Silicon detected - optimized for MPS
   • Nano model recommended for real-time performance
   • Monitor thermal throttling on sustained loads
   • Close other applications to maximize performance


In [14]:
# Comprehensive demo and instructions
def run_complete_demo():
    """
    Complete demonstration of the CCTV analysis system
    """
    print(" CCTV Object Detection and Line Crossing Analysis Demo")
    print("=" * 60)

    # Demo configuration
    demo_config = {
        'model': 'YOLOv8 Nano',
        'target_objects': list(detector.target_classes.values()),
        'features': [
            'Real-time object detection',
            'Line crossing detection and counting',
            'Multi-object tracking',
            'Performance monitoring',
            'IP camera support',
            'Data export and reporting'
        ]
    }

    print(" System Capabilities:")
    for feature in demo_config['features']:
        print(f"   {feature}")

    print(f"\n🔧 Configuration:")
    print(f"   Model: {demo_config['model']}")
    print(f"   Target Objects: {', '.join(demo_config['target_objects'])}")
    print(f"   Confidence Threshold: {detector.confidence_threshold}")

    print(f"\n Usage Instructions:")
    print("1. For video analysis:")
    print("   results = analyze_cctv_video('your_video.mp4')")

    print("\n2. For real-time webcam:")
    print("   real_time_detection()")

    print("\n3. For IP camera:")
    print("   ip_camera.analyze_ip_stream('rtsp://your.camera.url')")

    print("\n4. Export results:")
    print("   data_exporter.export_crossing_data(results)")
    print("   data_exporter.create_summary_report(results, performance_monitor)")

    print(f"\n Demo Complete! System ready for presentation.")
    return demo_config


def get_ip_camera_examples():
    """
    Provide example IP camera URLs for common camera brands
    """
    examples = {
        'Generic RTSP': 'rtsp://username:password@192.168.1.100:554/stream1',
        'Hikvision': 'rtsp://admin:password@192.168.1.100:554/Streaming/Channels/101',
        'Dahua': 'rtsp://admin:password@192.168.1.100:554/cam/realmonitor?channel=1&subtype=0',
        'Axis': 'rtsp://root:password@192.168.1.100/axis-media/media.amp',
        'Generic HTTP': 'http://192.168.1.100:8080/video.mjpg'
    }

    print(" IP Camera URL Examples:")
    for brand, url in examples.items():
        print(f"   {brand}: {url}")

    print("\n Remember to:")
    print("   • Replace username/password with actual credentials")
    print("   • Replace IP address with your camera's IP")
    print("   • Check your camera's documentation for exact URL format")

    return examples

# Run the complete demo
demo_results = run_complete_demo()
ip_examples = get_ip_camera_examples()

print("\n CCTV Analysis System Ready!")

 CCTV Object Detection and Line Crossing Analysis Demo
 System Capabilities:
   Real-time object detection
   Line crossing detection and counting
   Multi-object tracking
   Performance monitoring
   IP camera support
   Data export and reporting

🔧 Configuration:
   Model: YOLOv8 Nano
   Target Objects: person, bicycle, car, motorcycle, bus, truck
   Confidence Threshold: 0.5

 Usage Instructions:
1. For video analysis:
   results = analyze_cctv_video('your_video.mp4')

2. For real-time webcam:
   real_time_detection()

3. For IP camera:
   ip_camera.analyze_ip_stream('rtsp://your.camera.url')

4. Export results:
   data_exporter.export_crossing_data(results)
   data_exporter.create_summary_report(results, performance_monitor)

 Demo Complete! System ready for presentation.
 IP Camera URL Examples:
   Generic RTSP: rtsp://username:password@192.168.1.100:554/stream1
   Hikvision: rtsp://admin:password@192.168.1.100:554/Streaming/Channels/101
   Dahua: rtsp://admin:password@192.168.1.1