In [44]:
# Enhanced Real-time Vehicle Counter with Detection and Frequency Improvements
# Requirements: pip install ultralytics opencv-python
#-----------------------------------------------------------------botsort ALGO---------------------------
#  -------------------------BOTH INCOMING AND OUTGOING VEHICLES--------------------------------
import cv2
from ultralytics import YOLO
from collections import defaultdict
import time
import numpy as np

class VehicleCounter:
    def __init__(self, model_path='yolov10s.pt', conf_threshold=0.3, device='cpu', use_clahe=True):
        """
        Initializes the VehicleCounter.

        Args:
            model_path (str): Path to the YOLO model file (e.g., 'yolov10s.pt').
            conf_threshold (float): Confidence threshold for object detection.
            device (str): Device to run the model on ('cpu' or '0' for GPU).
            use_clahe (bool): Whether to apply CLAHE pre-processing for better contrast.
        """
        self.model = YOLO(model_path)
        self.conf_threshold = conf_threshold
        self.device = device
        self.use_clahe = use_clahe
        
        # Initialize CLAHE pre-processor if enabled
        if self.use_clahe:
            self.clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))

        self.vehicle_classes = {
            2: 'car', 3: 'motorcycle', 5: 'bus', 7: 'truck'
        }

        # Tracking and counting attributes
        self.track_history = defaultdict(list)
        self.seen_ids = set()
        self.vehicle_counts = defaultdict(int)
        self.current_vehicles_in_frame = {}
        
        # Performance metrics
        self.start_time = time.time()
        self.frame_count = 0

    def preprocess_frame(self, frame):
        """Applies pre-processing steps to the frame."""
        if not self.use_clahe:
            return frame
        
        # Convert frame to LAB color space to apply CLAHE on the L-channel (Lightness)
        lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
        l, a, b = cv2.split(lab)
        l_clahe = self.clahe.apply(l)
        lab_clahe = cv2.merge((l_clahe, a, b))
        frame_clahe = cv2.cvtColor(lab_clahe, cv2.COLOR_LAB2BGR)
        return frame_clahe

    def process_frame(self, frame):
        """
        Processes a single frame for vehicle detection, tracking, and counting.
        """
        self.frame_count += 1
        
        # 1. Pre-process the frame for better detection
        preprocessed_frame = self.preprocess_frame(frame)
        
        # 2. Run tracking on the pre-processed frame
        results = self.model.track(
        preprocessed_frame,
        classes=list(self.vehicle_classes.keys()),
        conf=self.conf_threshold,
        device=self.device,
        persist=True,
        tracker="botsort.yaml",  # <-- BoT-SORT used
        verbose=False
    )


        
        self.current_vehicles_in_frame.clear()

        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xyxy.cpu().numpy()
            track_ids = results[0].boxes.id.cpu().numpy().astype(int)
            classes = results[0].boxes.cls.cpu().numpy().astype(int)
            
            for box, track_id, cls in zip(boxes, track_ids, classes):
                class_name = self.vehicle_classes.get(cls, 'unknown')
                self.current_vehicles_in_frame[track_id] = class_name

                if track_id not in self.seen_ids:
                    self.seen_ids.add(track_id)
                    self.vehicle_counts[class_name] += 1
                
                cx, cy = int((box[0] + box[2]) / 2), int((box[1] + box[3]) / 2)
                self.track_history[track_id].append((cx, cy))
                if len(self.track_history[track_id]) > 30:
                    self.track_history[track_id].pop(0)

        # 3. Draw annotations on the ORIGINAL frame, not the pre-processed one
        annotated_frame = self.draw_annotations(frame, results)
        return annotated_frame

    def draw_annotations(self, frame, results):
        """Draws bounding boxes, tracking IDs, trajectories, and statistics."""
        # ... (This function remains the same as the previous version)
        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xyxy.cpu().numpy()
            track_ids = results[0].boxes.id.cpu().numpy().astype(int)
            classes = results[0].boxes.cls.cpu().numpy().astype(int)
            
            for box, track_id, cls in zip(boxes, track_ids, classes):
                x1, y1, x2, y2 = map(int, box)
                class_name = self.vehicle_classes.get(cls, 'unknown')
                color = (0, 255, 0)

                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                label = f'ID:{track_id} {class_name}'
                
                label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)
                cv2.rectangle(frame, (x1, y1 - label_size[1] - 5), (x1 + label_size[0], y1), color, -1)
                cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2)

                points = self.track_history.get(track_id, [])
                if len(points) > 1:
                    cv2.polylines(frame, [np.array(points, dtype=np.int32)], isClosed=False, color=color, thickness=2)

        self.draw_statistics(frame)
        return frame

    def draw_statistics(self, frame):
        """Draws the statistics panel."""
        # ... (This function remains the same as the previous version)
        panel_x, panel_y, panel_w, panel_h = 10, 10, 280, 150
        overlay = frame.copy()
        cv2.rectangle(overlay, (panel_x, panel_y), (panel_x + panel_w, panel_y + panel_h), (0, 0, 0), -1)
        cv2.addWeighted(overlay, 0.6, frame, 0.4, 0, frame)

        y = panel_y + 25
        lh = 20
        
        total_unique_count = len(self.seen_ids)
        current_frame_count = len(self.current_vehicles_in_frame)
        
        cv2.putText(frame, f"Total Unique Vehicles: {total_unique_count}", (panel_x + 10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
        y += lh + 5
        cv2.putText(frame, f"Current in Frame: {current_frame_count}", (panel_x + 10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
        y += lh + 10

        for vehicle_type, count in self.vehicle_counts.items():
            cv2.putText(frame, f"- {vehicle_type.capitalize()}: {count}", (panel_x + 10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
            y += lh

        elapsed_time = time.time() - self.start_time
        fps = self.frame_count / elapsed_time if elapsed_time > 0 else 0
        cv2.putText(frame, f"FPS: {fps:.2f}", (panel_x + 10, y + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)


    def run(self, source_path):
        """Runs the vehicle counting process on a video file or camera source."""
        # ... (This function remains the same as the previous version)
        cap = cv2.VideoCapture(source_path)
        if not cap.isOpened():
            print(f"Error: Could not open video source: {source_path}")
            return

        display_width, display_height = 1280, 720
        print("Vehicle counter started. Press 'q' to quit.")

        while True:
            ret, frame = cap.read()
            if not ret:
                print("End of video stream.")
                break

            frame = cv2.resize(frame, (display_width, display_height))
            processed_frame = self.process_frame(frame)
            cv2.imshow("Vehicle Counter", processed_frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()
        self.print_final_counts()

    def print_final_counts(self):
        """Prints the final cumulative vehicle counts to the console."""
        # ... (This function remains the same as the previous version)
        print("\n--- Final Cumulative Counts ---")
        total_count = len(self.seen_ids)
        print(f"Total Unique Vehicles Detected: {total_count}")
        for vt, count in self.vehicle_counts.items():
            print(f"- {vt.capitalize()}: {count}")


def main():
    # --- Configuration ---

    # LEVEL 1: CHOOSE A MORE ACCURATE MODEL
    # 'yolov10n.pt': Fastest, baseline accuracy.
    # 'yolov10s.pt': Good balance of speed and accuracy.
    # 'yolov10m.pt': High accuracy, best with GPU.
    model = 'yolov10n.pt'
    
    # Use '0' for GPU for a massive FPS boost.
    device_to_use = 'cpu' 
    
    # LEVEL 2: ENABLE/DISABLE IMAGE PRE-PROCESSING
    # Set to True to improve detection in difficult lighting. May slightly decrease FPS.
    use_image_enhancement = True
    
    video_source = "adi.mp4" 

    # ADVANCED: For TensorRT, you would first export the model:
    # from ultralytics import YOLO
    # model = YOLO('yolov10s.pt')
    # model.export(format='tensorrt', device='0') # Creates yolov10s.engine
    # And then you would load 'yolov10s.engine' instead of '.pt'
    # model_path = 'yolov10s.engine'

    counter = VehicleCounter(
        model_path=model, 
        device=device_to_use, 
        conf_threshold=0.3,
        use_clahe=use_image_enhancement
    )
    counter.run(source_path=video_source)

if __name__ == "__main__":
    main()

Vehicle counter started. Press 'q' to quit.

--- Final Cumulative Counts ---
Total Unique Vehicles Detected: 57
- Car: 48
- Truck: 8
- Bus: 1


In [None]:
# Requirements: pip install ultralytics opencv-python
# ----------------------------------------------------------------byteTrack ALGO---------------------
#  -------------------------BOTH INCOMING AND OUTGOING VEHICLES--------------------------------
import cv2
from ultralytics import YOLO
from collections import defaultdict
import time
import numpy as np

class VehicleCounter:
    def __init__(self, model_path='yolov10s.pt', conf_threshold=0.3, device='cpu', use_clahe=True):
        """
        Initializes the VehicleCounter.

        Args:
            model_path (str): Path to the YOLO model file (e.g., 'yolov10s.pt').
            conf_threshold (float): Confidence threshold for object detection.
            device (str): Device to run the model on ('cpu' or '0' for GPU).
            use_clahe (bool): Whether to apply CLAHE pre-processing for better contrast.
        """
        self.model = YOLO(model_path)
        self.conf_threshold = conf_threshold
        self.device = device
        self.use_clahe = use_clahe
        
        # Initialize CLAHE pre-processor if enabled
        if self.use_clahe:
            self.clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))

        self.vehicle_classes = {
            2: 'car', 3: 'motorcycle', 5: 'bus', 7: 'truck'
        }

        # Tracking and counting attributes
        self.track_history = defaultdict(list)
        self.seen_ids = set()
        self.vehicle_counts = defaultdict(int)
        self.current_vehicles_in_frame = {}
        
        # Performance metrics
        self.start_time = time.time()
        self.frame_count = 0

    def preprocess_frame(self, frame):
        """Applies pre-processing steps to the frame."""
        if not self.use_clahe:
            return frame
        
        # Convert frame to LAB color space to apply CLAHE on the L-channel (Lightness)
        lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
        l, a, b = cv2.split(lab)
        l_clahe = self.clahe.apply(l)
        lab_clahe = cv2.merge((l_clahe, a, b))
        frame_clahe = cv2.cvtColor(lab_clahe, cv2.COLOR_LAB2BGR)
        return frame_clahe

    def process_frame(self, frame):
        """
        Processes a single frame for vehicle detection, tracking, and counting.
        """
        self.frame_count += 1
        
        # 1. Pre-process the frame for better detection
        preprocessed_frame = self.preprocess_frame(frame)
        
        # 2. Run tracking on the pre-processed frame
        results = self.model.track(
            preprocessed_frame,
            classes=list(self.vehicle_classes.keys()),
            conf=self.conf_threshold,
            device=self.device,
            persist=True,
            # For advanced tracker tuning, create a custom yaml file
            # tracker="custom_bytetrack.yaml", 
            tracker="bytetrack.yaml",
            verbose=False
        )
        
        self.current_vehicles_in_frame.clear()

        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xyxy.cpu().numpy()
            track_ids = results[0].boxes.id.cpu().numpy().astype(int)
            classes = results[0].boxes.cls.cpu().numpy().astype(int)
            
            for box, track_id, cls in zip(boxes, track_ids, classes):
                class_name = self.vehicle_classes.get(cls, 'unknown')
                self.current_vehicles_in_frame[track_id] = class_name

                if track_id not in self.seen_ids:
                    self.seen_ids.add(track_id)
                    self.vehicle_counts[class_name] += 1
                
                cx, cy = int((box[0] + box[2]) / 2), int((box[1] + box[3]) / 2)
                self.track_history[track_id].append((cx, cy))
                if len(self.track_history[track_id]) > 30:
                    self.track_history[track_id].pop(0)

        # 3. Draw annotations on the ORIGINAL frame, not the pre-processed one
        annotated_frame = self.draw_annotations(frame, results)
        return annotated_frame

    def draw_annotations(self, frame, results):
        """Draws bounding boxes, tracking IDs, trajectories, and statistics."""
        # ... (This function remains the same as the previous version)
        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xyxy.cpu().numpy()
            track_ids = results[0].boxes.id.cpu().numpy().astype(int)
            classes = results[0].boxes.cls.cpu().numpy().astype(int)
            
            for box, track_id, cls in zip(boxes, track_ids, classes):
                x1, y1, x2, y2 = map(int, box)
                class_name = self.vehicle_classes.get(cls, 'unknown')
                color = (0, 255, 0)

                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                label = f'ID:{track_id} {class_name}'
                
                label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)
                cv2.rectangle(frame, (x1, y1 - label_size[1] - 5), (x1 + label_size[0], y1), color, -1)
                cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2)

                points = self.track_history.get(track_id, [])
                if len(points) > 1:
                    cv2.polylines(frame, [np.array(points, dtype=np.int32)], isClosed=False, color=color, thickness=2)

        self.draw_statistics(frame)
        return frame

    def draw_statistics(self, frame):
        """Draws the statistics panel."""
    
        panel_x, panel_y, panel_w, panel_h = 10, 10, 280, 150
        overlay = frame.copy()
        cv2.rectangle(overlay, (panel_x, panel_y), (panel_x + panel_w, panel_y + panel_h), (0, 0, 0), -1)
        cv2.addWeighted(overlay, 0.6, frame, 0.4, 0, frame)

        y = panel_y + 25
        lh = 20
        
        total_unique_count = len(self.seen_ids)
        current_frame_count = len(self.current_vehicles_in_frame)
        
        cv2.putText(frame, f"Total Unique Vehicles: {total_unique_count}", (panel_x + 10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
        y += lh + 5
        cv2.putText(frame, f"Current in Frame: {current_frame_count}", (panel_x + 10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
        y += lh + 10

        for vehicle_type, count in self.vehicle_counts.items():
            cv2.putText(frame, f"- {vehicle_type.capitalize()}: {count}", (panel_x + 10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
            y += lh

        elapsed_time = time.time() - self.start_time
        fps = self.frame_count / elapsed_time if elapsed_time > 0 else 0
        cv2.putText(frame, f"FPS: {fps:.2f}", (panel_x + 10, y + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)


    def run(self, source_path):
        """Runs the vehicle counting process on a video file or camera source."""

        cap = cv2.VideoCapture(source_path)
        if not cap.isOpened():
            print(f"Error: Could not open video source: {source_path}")
            return

        display_width, display_height = 1280, 720
        print("Vehicle counter started. Press 'q' to quit.")

        while True:
            ret, frame = cap.read()
            if not ret:
                print("End of video stream.")
                break

            frame = cv2.resize(frame, (display_width, display_height))
            processed_frame = self.process_frame(frame)
            cv2.imshow("Vehicle Counter", processed_frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()
        self.print_final_counts()

    def print_final_counts(self):
        """Prints the final cumulative vehicle counts to the console."""
        print("\n--- Final Cumulative Counts ---")
        total_count = len(self.seen_ids)
        print(f"Total Unique Vehicles Detected: {total_count}")
        for vt, count in self.vehicle_counts.items():
            print(f"- {vt.capitalize()}: {count}")


def main():
    # --- Configuration ---

    # LEVEL 1: CHOOSE A MORE ACCURATE MODEL
    # 'yolov10n.pt': Fastest, baseline accuracy.
    # 'yolov10s.pt': Good balance of speed and accuracy.
    # 'yolov10m.pt': High accuracy, best with GPU.
    model = 'yolov10n.pt'
    
    # Use '0' for GPU for a massive FPS boost.
    device_to_use = 'cpu' 
    
    # LEVEL 2: ENABLE/DISABLE IMAGE PRE-PROCESSING
    # Set to True to improve detection in difficult lighting. May slightly decrease FPS.
    use_image_enhancement = True
    
    video_source = "adi.mp4" 

    # ADVANCED: For TensorRT, you would first export the model:
    # from ultralytics import YOLO
    # model = YOLO('yolov10s.pt')
    # model.export(format='tensorrt', device='0') # Creates yolov10s.engine
    # And then you would load 'yolov10s.engine' instead of '.pt'
    # model_path = 'yolov10s.engine'

    counter = VehicleCounter(
        model_path=model, 
        device=device_to_use, 
        conf_threshold=0.3,
        use_clahe=use_image_enhancement
    )
    counter.run(source_path=video_source)

if __name__ == "__main__":
    main()

Vehicle counter started. Press 'q' to quit.

--- Final Cumulative Counts ---
Total Unique Vehicles Detected: 76
- Car: 63
- Truck: 12
- Bus: 1


In [100]:
# Enhanced Real-time Vehicle Counter (Incoming Vehicles Only)
# Requirements: pip install ultralytics opencv-python
# ----------------------------------------------------------------

import cv2
from ultralytics import YOLO
from collections import defaultdict
import time
import numpy as np

class VehicleCounter:
    def __init__(self, model_path='yolov10s.pt', conf_threshold=0.3, device='cpu', use_clahe=True):
        self.model = YOLO(model_path)
        self.conf_threshold = conf_threshold
        self.device = device
        self.use_clahe = use_clahe
        
        if self.use_clahe:
            self.clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))

        self.vehicle_classes = {
            2: 'car', 3: 'motorcycle', 5: 'bus', 7: 'truck'
        }

        self.track_history = defaultdict(list)
        self.counted_ids = set()
        self.vehicle_counts = defaultdict(int)
        self.current_vehicles_in_frame = {}
        self.start_time = time.time()
        self.frame_count = 0

    def preprocess_frame(self, frame):
        if not self.use_clahe:
            return frame
        
        lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
        l, a, b = cv2.split(lab)
        l_clahe = self.clahe.apply(l)
        lab_clahe = cv2.merge((l_clahe, a, b))
        return cv2.cvtColor(lab_clahe, cv2.COLOR_LAB2BGR)

    def process_frame(self, frame):
        self.frame_count += 1
        preprocessed = self.preprocess_frame(frame)
        results = self.model.track(
            preprocessed,
            classes=list(self.vehicle_classes.keys()),
            conf=self.conf_threshold,
            device=self.device,
            persist=True,
            tracker="bytetrack.yaml",
            verbose=False
        )

        self.current_vehicles_in_frame.clear()

        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xyxy.cpu().numpy()
            ids = results[0].boxes.id.cpu().numpy().astype(int)
            classes = results[0].boxes.cls.cpu().numpy().astype(int)

            for box, track_id, cls in zip(boxes, ids, classes):
                class_name = self.vehicle_classes.get(cls, 'unknown')
                self.current_vehicles_in_frame[track_id] = class_name

                cx = int((box[0] + box[2]) / 2)
                cy = int((box[1] + box[3]) / 2)
                self.track_history[track_id].append((cx, cy))

                # Count incoming only (e.g., moving downward)
                if len(self.track_history[track_id]) > 10:
                    y_positions = [pt[1] for pt in self.track_history[track_id][-10:]]
                    dy = y_positions[-1] - y_positions[0]  # Positive if moving down

                    if dy > 15 and track_id not in self.counted_ids:
                        self.counted_ids.add(track_id)
                        self.vehicle_counts[class_name] += 1

                if len(self.track_history[track_id]) > 30:
                    self.track_history[track_id].pop(0)

        return self.draw_annotations(frame, results)

    def draw_annotations(self, frame, results):
        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xyxy.cpu().numpy()
            ids = results[0].boxes.id.cpu().numpy().astype(int)
            classes = results[0].boxes.cls.cpu().numpy().astype(int)

            for box, track_id, cls in zip(boxes, ids, classes):
                x1, y1, x2, y2 = map(int, box)
                class_name = self.vehicle_classes.get(cls, 'unknown')
                color = (0, 255, 0)
                label = f'ID:{track_id} {class_name}'

                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)
                cv2.rectangle(frame, (x1, y1 - label_size[1] - 5), (x1 + label_size[0], y1), color, -1)
                cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2)

                points = self.track_history.get(track_id, [])
                if len(points) > 1:
                    cv2.polylines(frame, [np.array(points, dtype=np.int32)], False, color, 2)

        self.draw_statistics(frame)
        return frame

    def draw_statistics(self, frame):
        panel_x, panel_y, panel_w, panel_h = 10, 10, 280, 150
        overlay = frame.copy()
        cv2.rectangle(overlay, (panel_x, panel_y), (panel_x + panel_w, panel_y + panel_h), (0, 0, 0), -1)
        cv2.addWeighted(overlay, 0.6, frame, 0.4, 0, frame)

        y = panel_y + 25
        lh = 20

        total_unique_count = len(self.counted_ids)
        current_frame_count = len(self.current_vehicles_in_frame)

        cv2.putText(frame, f"Incoming Vehicles Counted: {total_unique_count}", (panel_x + 10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
        y += lh + 5
        cv2.putText(frame, f"Current in Frame: {current_frame_count}", (panel_x + 10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
        y += lh + 10

        for vt, count in self.vehicle_counts.items():
            cv2.putText(frame, f"- {vt.capitalize()}: {count}", (panel_x + 10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
            y += lh

        elapsed_time = time.time() - self.start_time
        fps = self.frame_count / elapsed_time if elapsed_time > 0 else 0
        cv2.putText(frame, f"FPS: {fps:.2f}", (panel_x + 10, y + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

    def run(self, source_path):
        cap = cv2.VideoCapture(source_path)
        if not cap.isOpened():
            print(f"Error: Could not open video source: {source_path}")
            return

        display_width, display_height = 1280, 720
        print("Incoming vehicle counter started. Press 'q' to quit.")

        while True:
            ret, frame = cap.read()
            if not ret:
                print("End of video stream.")
                break

            frame = cv2.resize(frame, (display_width, display_height))
            processed_frame = self.process_frame(frame)
            cv2.imshow("Incoming Vehicle Counter", processed_frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()
        self.print_final_counts()

    def print_final_counts(self):
        print("\n--- Final Incoming Vehicle Counts ---")
        total = len(self.counted_ids)
        print(f"Total Incoming Vehicles Detected: {total}")
        for vt, count in self.vehicle_counts.items():
            print(f"- {vt.capitalize()}: {count}")


def main():
    model = 'yolov10n.pt'  # Change to 'yolov10s.pt' or 'yolov10m.pt' for better accuracy if needed
    device_to_use = 'cpu'  # Use '0' for GPU
    use_image_enhancement = True
    video_source = "adi.mp4"

    counter = VehicleCounter(
        model_path=model, 
        device=device_to_use, 
        conf_threshold=0.3,
        use_clahe=use_image_enhancement
    )
    counter.run(source_path=video_source)

if __name__ == "__main__":
    main()


Incoming vehicle counter started. Press 'q' to quit.

--- Final Incoming Vehicle Counts ---
Total Incoming Vehicles Detected: 7
- Car: 7


In [92]:
# Real-time Vehicle Counter (Incoming - Strict Heuristic, Outgoing - Reference Line)
# Requirements: pip install ultralytics opencv-python
# ----------------------------------------------------------------byteTrack ALGO---------------------

import cv2
from ultralytics import YOLO
from collections import defaultdict
import time
import numpy as np

class VehicleCounter:
    def __init__(self, model_path='yolov10s.pt', conf_threshold=0.3, device='cpu', use_clahe=True):
        """
        Initializes the VehicleCounter.

        Args:
            model_path (str): Path to the YOLO model file (e.g., 'yolov10s.pt').
            conf_threshold (float): Confidence threshold for object detection.
            device (str): Device to run the model on ('cpu' or '0' for GPU).
            use_clahe (bool): Whether to apply CLAHE pre-processing for better contrast.
        """
        self.model = YOLO(model_path)
        self.conf_threshold = conf_threshold
        self.device = device
        self.use_clahe = use_clahe
        
        if self.use_clahe:
            self.clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))

        self.vehicle_classes = {
            2: 'car', 3: 'motorcycle', 5: 'bus', 7: 'truck'
        }

        # Tracking and movement history for centroids
        self.track_history = defaultdict(list)
        
        # Real-time counts for current frame
        self.current_vehicles_in_frame_all = {}  # All vehicles detected in current frame
        self.current_outgoing_vehicles = set()   # IDs of vehicles flagged as outgoing in current frame
        self.current_incoming_vehicles = set()   # IDs of vehicles flagged as incoming in current frame (new strict definition)

        # To track the state of outgoing vehicles across frames (for retention)
        self.outgoing_tracking_status = defaultdict(lambda: {'status': 'unknown', 'last_crossing_frame': -1})
        self.outgoing_retention_frames = 30 # How many frames a vehicle remains 'outgoing' after crossing (e.g., 1 second at 30 FPS)

        # Reference line for OUTGOING vehicles
        self.outgoing_line_y = 0  # This will be set based on frame height in run()
        
        # Parameters for strict "incoming" movement (no fixed line, heuristic based on direction)
        self.min_history_for_incoming = 15     # Number of frames to check for consistent movement before being 'incoming'
        self.incoming_movement_threshold_y = 20 # Minimum vertical displacement (pixels) for 'incoming' movement
        self.incoming_start_y_ratio = 0.70       # Vehicle must start its current downward trajectory above this y-ratio (e.g., top 70%)


        # Performance metrics
        self.start_time = time.time()
        self.frame_count = 0

    def preprocess_frame(self, frame):
        """Applies pre-processing steps to the frame."""
        if not self.use_clahe:
            return frame
        
        lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
        l, a, b = cv2.split(lab)
        l_clahe = self.clahe.apply(l)
        lab_clahe = cv2.merge((l_clahe, a, b))
        frame_clahe = cv2.cvtColor(lab_clahe, cv2.COLOR_LAB2BGR)
        return frame_clahe

    def process_frame(self, frame):
        """
        Processes a single frame for vehicle detection, tracking, and real-time counts.
        """
        self.frame_count += 1
        
        preprocessed_frame = self.preprocess_frame(frame)
        
        results = self.model.track(
            preprocessed_frame,
            classes=list(self.vehicle_classes.keys()),
            conf=self.conf_threshold,
            device=self.device,
            persist=True,
            tracker="bytetrack.yaml",
            verbose=False
        )
        
        # Reset real-time counts for this frame
        self.current_vehicles_in_frame_all.clear()
        self.current_incoming_vehicles.clear() # Clear incoming set for current frame
        self.current_outgoing_vehicles = set() # Re-evaluate current_outgoing_vehicles based on retention

        frame_height = frame.shape[0]
        
        # Set outgoing line if not already set (e.g., first frame)
        if self.outgoing_line_y == 0:
            self.outgoing_line_y = int(frame_height * 0.4) # Adjust this value (0.4 = 40% from top)

        current_active_track_ids = set()

        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xyxy.cpu().numpy()
            track_ids = results[0].boxes.id.cpu().numpy().astype(int)
            classes = results[0].boxes.cls.cpu().numpy().astype(int)
            
            for box, track_id, cls in zip(boxes, track_ids, classes):
                class_name = self.vehicle_classes.get(cls, 'unknown')
                self.current_vehicles_in_frame_all[track_id] = class_name
                current_active_track_ids.add(track_id)

                cx, cy = int((box[0] + box[2]) / 2), int((box[1] + box[3]) / 2)
                self.track_history[track_id].append((cx, cy))

                # Keep a limited history for trajectory for general visualization and movement analysis
                if len(self.track_history[track_id]) > max(self.min_history_for_incoming, 30): 
                    self.track_history[track_id].pop(0)

                # --- Real-time Outgoing Vehicle Logic (with Reference Line) ---
                if len(self.track_history[track_id]) >= 2:
                    last_y = self.track_history[track_id][-1][1]
                    prev_y = self.track_history[track_id][-2][1]

                    # Check for crossing from below to above the line (upward movement)
                    if prev_y >= self.outgoing_line_y and last_y < self.outgoing_line_y and last_y < prev_y:
                        self.outgoing_tracking_status[track_id]['status'] = 'crossed_up'
                        self.outgoing_tracking_status[track_id]['last_crossing_frame'] = self.frame_count
                    # If it's currently below the line, reset status if it was 'crossed_up'
                    elif last_y > self.outgoing_line_y and self.outgoing_tracking_status[track_id]['status'] == 'crossed_up':
                        self.outgoing_tracking_status[track_id]['status'] = 'below_line'
                    elif last_y > self.outgoing_line_y:
                         self.outgoing_tracking_status[track_id]['status'] = 'below_line' # Explicitly mark below

                # --- Real-time Incoming Vehicle Logic (Strict Heuristic - No Line) ---
                if len(self.track_history[track_id]) >= self.min_history_for_incoming:
                    y_positions = [pt[1] for pt in self.track_history[track_id][-self.min_history_for_incoming:]]
                    dy = y_positions[-1] - y_positions[0] # Positive if moving down
                    initial_y_in_history = y_positions[0]

                    # Condition for "incoming": significant downward movement AND started from upper portion of frame
                    if (dy > self.incoming_movement_threshold_y and 
                        initial_y_in_history < (frame_height * self.incoming_start_y_ratio)):
                        
                        self.current_incoming_vehicles.add(track_id)
                
        # Populate current_outgoing_vehicles based on 'outgoing_tracking_status' and retention
        for track_id in current_active_track_ids: 
            status_info = self.outgoing_tracking_status[track_id]
            if status_info['status'] == 'crossed_up' and \
               (self.frame_count - status_info['last_crossing_frame']) < self.outgoing_retention_frames:
                self.current_outgoing_vehicles.add(track_id)

        # Clean up history and status for lost tracks (no longer in current_active_track_ids)
        keys_to_delete_history = [tid for tid in self.track_history if tid not in current_active_track_ids]
        for tid in keys_to_delete_history:
            del self.track_history[tid]
            if tid in self.outgoing_tracking_status:
                del self.outgoing_tracking_status[tid]

        # Draw annotations on the ORIGINAL frame
        annotated_frame = self.draw_annotations(frame, results)
        return annotated_frame

    def draw_annotations(self, frame, results):
        """Draws bounding boxes, tracking IDs, trajectories, and statistics."""
        # Draw the outgoing reference line first
        cv2.line(frame, (0, self.outgoing_line_y), (frame.shape[1], self.outgoing_line_y), (0, 255, 255), 2) # Yellow line
        cv2.putText(frame, "Outgoing Line", (10, self.outgoing_line_y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)

        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xyxy.cpu().numpy()
            track_ids = results[0].boxes.id.cpu().numpy().astype(int)
            classes = results[0].boxes.cls.cpu().numpy().astype(int)
            
            for box, track_id, cls in zip(boxes, track_ids, classes):
                x1, y1, x2, y2 = map(int, box)
                class_name = self.vehicle_classes.get(cls, 'unknown')
                
                color = (0, 255, 0) # Default green

                if track_id in self.current_outgoing_vehicles:
                    color = (0, 0, 255) # Red for vehicles currently identified as outgoing
                elif track_id in self.current_incoming_vehicles:
                    color = (255, 0, 0) # Blue for vehicles currently identified as incoming (strict)
                
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                label = f'ID:{track_id} {class_name}'
                
                label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)
                cv2.rectangle(frame, (x1, y1 - label_size[1] - 5), (x1 + label_size[0], y1), color, -1)
                cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2)

                points = self.track_history.get(track_id, [])
                if len(points) > 1:
                    cv2.polylines(frame, [np.array(points, dtype=np.int32)], isClosed=False, color=color, thickness=2)

        self.draw_statistics(frame)
        return frame

    def draw_statistics(self, frame):
        """Draws the statistics panel."""
        panel_x, panel_y, panel_w, panel_h = 10, 10, 350, 150 # Adjust panel size
        overlay = frame.copy()
        cv2.rectangle(overlay, (panel_x, panel_y), (panel_x + panel_w, panel_y + panel_h), (0, 0, 0), -1)
        cv2.addWeighted(overlay, 0.6, frame, 0.4, 0, frame)

        y = panel_y + 25
        lh = 20
        
        # 1. Total vehicles detected in camera frame (real-time)
        total_current_in_frame = len(self.current_vehicles_in_frame_all)
        
        # 2. Outgoing vehicles (real-time)
        realtime_outgoing_count = len(self.current_outgoing_vehicles)

        # 3. Real-time Incoming (Strict Heuristic) - this is your desired specific count
        realtime_incoming_count = len(self.current_incoming_vehicles)
        
        cv2.putText(frame, f"Real-time Incoming: {realtime_incoming_count}", (panel_x + 10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
        y += lh + 5
        cv2.putText(frame, f"Total in Frame: {total_current_in_frame}", (panel_x + 10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
        y += lh + 5
        cv2.putText(frame, f"Real-time Outgoing: {realtime_outgoing_count}", (panel_x + 10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
        y += lh + 10

        elapsed_time = time.time() - self.start_time
        fps = self.frame_count / elapsed_time if elapsed_time > 0 else 0
        cv2.putText(frame, f"FPS: {fps:.2f}", (panel_x + 10, y + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

    def run(self, source_path):
        """Runs the vehicle counting process on a video file or camera source."""
        cap = cv2.VideoCapture(source_path)
        if not cap.isOpened():
            print(f"Error: Could not open video source: {source_path}")
            return

        display_width, display_height = 1280, 720
        print("Real-time vehicle counter started. Press 'q' to quit.")

        while True:
            ret, frame = cap.read()
            if not ret:
                print("End of video stream.")
                break

            frame = cv2.resize(frame, (display_width, display_height))
            processed_frame = self.process_frame(frame)
            cv2.imshow("Real-time Vehicle Traffic Analysis", processed_frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()
        print("\nReal-time counting session ended.")


def main():
    model = 'yolov10n.pt'  
    device_to_use = 'cpu'  
    use_image_enhancement = True
    video_source = "adi.mp4" 

    counter = VehicleCounter(
        model_path=model,  
        device=device_to_use,  
        conf_threshold=0.3,
        use_clahe=use_image_enhancement
    )
    counter.run(source_path=video_source)

if __name__ == "__main__":
    main()

Real-time vehicle counter started. Press 'q' to quit.

Real-time counting session ended.
