In [None]:
import cv2
from collections import defaultdict
import supervision as sv
from ultralytics import YOLO
import math

# Load the YOLOv8 model
model = YOLO('yolov8n.pt')

# Set up video capture
cap = cv2.VideoCapture("/content/videoplayback.mp4")

# Line coordinates for counting
START_LINE = sv.Point(2, 254)
END_LINE = sv.Point(550, 254)

# Single combined lane boundary for lane detection
LANE_START = sv.Point(0, 254)
LANE_END = sv.Point(560, 254)

# Vehicle tracking and counting dictionaries
track_history = defaultdict(lambda: [])
crossed_objects = {}
lane_changes = defaultdict(int)
vehicle_speeds = {}
vehicle_counts = defaultdict(int)

# Function to calculate speed (assuming frame rate and real-world scale)
def calculate_speed(x1, y1, x2, y2, fps, distance_scale):
    pixel_distance = math.sqrt((x2 - x1)**2 + (y2 - y1)**2)
    speed = (pixel_distance / distance_scale) * fps * 3.6  # km/h
    return speed

# Open a video sink for output video
video_info = sv.VideoInfo.from_video_path("/content/videoplayback.mp4")
with sv.VideoSink("output_with_single_lane_enhanced.mp4", video_info) as sink:

    fps = int(cap.get(cv2.CAP_PROP_FPS))  # Get FPS from the video
    distance_scale = 50  # Adjust this for real-world scale

    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            break

        # Run YOLOv8 tracking on the frame
        results = model.track(frame, classes=[2, 3, 5, 7], persist=True, save=True, tracker="bytetrack.yaml")
        boxes = results[0].boxes.xywh.cpu()
        track_ids = results[0].boxes.id

        if track_ids is not None:
            track_ids = track_ids.int().cpu().tolist()
            annotated_frame = results[0].plot()
            detections = sv.Detections.from_ultralytics(results[0])

            for box, track_id in zip(boxes, track_ids):
                x, y, w, h = box
                track = track_history[track_id]
                current_center = (float(x), float(y))
                track.append(current_center)

                # Keep only the last 30 tracks for smoothing
                if len(track) > 30:
                    track.pop(0)

                # Calculate speed if there is enough history
                if len(track) > 1:
                    prev_center = track[-2]
                    speed = calculate_speed(prev_center[0], prev_center[1], current_center[0], current_center[1], fps, distance_scale)
                    vehicle_speeds[track_id] = speed

                # Check if object crosses the counting line
                if START_LINE.x < x < END_LINE.x and abs(y - START_LINE.y) < 5:
                    if track_id not in crossed_objects:
                        crossed_objects[track_id] = True
                        vehicle_type = results[0].names[results[0].boxes.cls.int().cpu().tolist()[0]]
                        vehicle_counts[vehicle_type] += 1
                        print(f"Vehicle {track_id} ({vehicle_type}) crossed the line")

                    # Color the bounding box
                    color = (0, 255, 0) if vehicle_type == 'car' else (0, 0, 255) if vehicle_type == 'bus' else (255, 255, 0)
                    cv2.rectangle(annotated_frame, (int(x - w / 2), int(y - h / 2)), (int(x + w / 2), int(y + h / 2)), color, 2)

                # Lane change detection within the single large lane boundary
                if LANE_START.x < x < LANE_END.x:
                    if x < (LANE_START.x + LANE_END.x) / 2 and lane_changes[track_id] == 0:
                        lane_changes[track_id] = 1  # Left half of the lane
                        print(f"Vehicle {track_id} entered Left Half of Lane")
                    elif x > (LANE_START.x + LANE_END.x) / 2 and lane_changes[track_id] == 1:
                        lane_changes[track_id] = 2  # Right half of the lane (lane change detected)
                        print(f"Vehicle {track_id} changed to Right Half of Lane")
                        cv2.putText(annotated_frame, f"Lane Change: {track_id}", (int(x), int(y - 30)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

                # Speed alert if vehicle exceeds 80 km/h
                if track_id in vehicle_speeds and vehicle_speeds[track_id] > 80:
                    cv2.putText(annotated_frame, f"Speeding: {int(vehicle_speeds[track_id])} km/h", (int(x), int(y - 10)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

                # Draw the counting line and single lane boundary on the frame
                cv2.line(annotated_frame, (START_LINE.x, START_LINE.y), (END_LINE.x, END_LINE.y), (0, 255, 0), 2)
                cv2.line(annotated_frame, (LANE_START.x, LANE_START.y), (LANE_END.x, LANE_END.y), (255, 0, 0), 2)

            # Display count of different vehicle types
            count_text = f"Cars: {vehicle_counts['car']} Buses: {vehicle_counts['bus']} Trucks: {vehicle_counts['truck']}"
            cv2.putText(annotated_frame, count_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)

            # Write the annotated frame to the output video
            sink.write_frame(annotated_frame)

# Release video capture
cap.release()


0: 384x640 2 cars, 1 truck, 135.2ms
Speed: 3.2ms preprocess, 135.2ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
Results saved to [1mruns/detect/track3[0m

0: 384x640 2 cars, 1 truck, 135.8ms
Speed: 3.1ms preprocess, 135.8ms inference, 1.1ms postprocess per image at shape (1, 3, 384, 640)
Results saved to [1mruns/detect/track3[0m

0: 384x640 1 car, 1 bus, 2 trucks, 135.6ms
Speed: 4.5ms preprocess, 135.6ms inference, 1.1ms postprocess per image at shape (1, 3, 384, 640)
Results saved to [1mruns/detect/track3[0m

0: 384x640 2 cars, 2 trucks, 130.9ms
Speed: 4.3ms preprocess, 130.9ms inference, 1.1ms postprocess per image at shape (1, 3, 384, 640)
Results saved to [1mruns/detect/track3[0m

0: 384x640 2 cars, 1 truck, 131.1ms
Speed: 3.7ms preprocess, 131.1ms inference, 1.1ms postprocess per image at shape (1, 3, 384, 640)
Results saved to [1mruns/detect/track3[0m

0: 384x640 1 car, 2 trucks, 146.1ms
Speed: 3.5ms preprocess, 146.1ms inference, 1.1ms postproces