In [None]:
!pip install ultralytics gtts opencv-python numpy



In [None]:
import cv2
import torch
import numpy as np
import os
from gtts import gTTS
from ultralytics import YOLO


In [None]:
# Load the pre-trained YOLOv10 model
model = YOLO("yolov10s.pt")  # Adjust model size if needed


In [None]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# === STEP 1: Install Dependencies ===
!pip install ultralytics gtts opencv-python numpy moviepy deep_sort_realtime

# === STEP 2: Mount Google Drive ===
from google.colab import drive
drive.mount('/content/drive')

# === STEP 3: Import Required Libraries ===
import cv2
import torch
import numpy as np
import os
from gtts import gTTS
from IPython.display import Audio, display
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort
from moviepy.editor import VideoFileClip, AudioFileClip, CompositeAudioClip, AudioClip

# Clear existing alert audio files
import os

# Path to the directory where mp3 files are stored
directory_path = '/content/'

# Loop through all files in the directory and delete the .mp3 files
for filename in os.listdir(directory_path):
    if filename.endswith(".mp3"):
        file_path = os.path.join(directory_path, filename)
        os.remove(file_path)
        print(f"Deleted: {file_path}")



# === STEP 4: Global Variables and Constants ===
alert_events = []         # To record alert events: list of (timestamp, alert_filename)
free_lane_counters = {}   # For free lane detection (lane index -> consecutive free-frame count)

FREE_LANE_MIN_AREA = 15000    # If a vehicle's bbox area >= this, lane is not considered free
FREE_SPEED_THRESHOLD = 0.05    # If area increases >5% (per frame), the vehicle is approaching
AREA_INCREASE_THRESHOLD = 0.2  # 20% increase for overtaking detection
SMALL_VEHICLE_AREA_THRESHOLD = 20000  # Threshold to decide lateral boundaries based on vehicle size

# === STEP 5: Load YOLOv10 Model ===
model = YOLO("yolov10s")  # Pretrained YOLOv10 small
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
print("Using device:", device)

# === STEP 6: Define Lane Detection Functions ===
def detect_lanes(frame):
    """Detect lane lines using Canny edge detection and HoughLinesP.
       Returns a sorted list of x-coordinates representing lane centers."""
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blur = cv2.GaussianBlur(gray, (5,5), 0)
    edges = cv2.Canny(blur, 50, 150)

    height, width = frame.shape[:2]
    mask = np.zeros_like(edges)
    polygon = np.array([[ (0, height), (width, height), (width, int(height * 0.6)), (0, int(height * 0.6)) ]], np.int32)
    cv2.fillPoly(mask, polygon, 255)
    roi_edges = cv2.bitwise_and(edges, mask)

    lines = cv2.HoughLinesP(roi_edges, 1, np.pi/180, threshold=50, minLineLength=50, maxLineGap=50)
    lane_centers = []
    if lines is not None:
        for line in lines:
            x1, _, x2, _ = line[0]
            lane_centers.append((x1 + x2) // 2)
    return sorted(list(set(lane_centers)))

def divide_lane_regions(lane_centers, frame_width):
    """Divide the frame width into lane regions. If lanes are not clearly detected, default to 3 equal regions."""
    if len(lane_centers) < 2:
        region_width = frame_width // 3
        return [(0, region_width), (region_width, 2*region_width), (2*region_width, frame_width)]
    else:
        region_width = frame_width // 3
        # For simplicity, always use 3 equal regions: left, ego, right.
        return [(0, region_width), (region_width, 2*region_width), (2*region_width, frame_width)]

def get_ego_lane(lane_regions):
    """Assume the ego lane is the middle region if three lanes are defined."""
    return 1 if len(lane_regions) >= 3 else 0

def assign_to_lane(x_center, lane_regions):
    """Return the lane index where x_center falls based on lane regions."""
    for idx, (start, end) in enumerate(lane_regions):
        if start <= x_center <= end:
            return idx
    return -1

# === STEP 7: Initialize DeepSORT Tracker and Tracking Info ===
tracker = DeepSort(max_age=30)
tracked_info = {}  # To store previous frame info: {track_id: {'area': float, 'lane': int, 'x_center': int}}

# === STEP 8: Define Audio Alert Function ===
def play_audio_alert(message, current_time):
    """
    Save TTS audio alert with a unique filename based on the timestamp,
    record the event, and play the alert.
    """
    alert_filename = f"alert_{current_time:.1f}.mp3"
    # Save the alert only if it doesn't already exist (to avoid re-generation in merging)
    if not os.path.exists(alert_filename):
        tts = gTTS(text=message, lang='en')
        tts.save(alert_filename)
    # Record the event for later merging
    alert_events.append((current_time, alert_filename))
    # Play the alert immediately
    display(Audio(alert_filename, autoplay=True))

# === STEP 9: Process Video Function (Merging Free Lane & Overtaking Logic) ===
def process_video(input_path, output_path):
    cap = cv2.VideoCapture(input_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    global tracked_info, free_lane_counters
    frame_idx = 0

    # Dictionaries to store the filled box information with detection timestamp
    overtaking_boxes = {}  # key: track_id, value: (detection_time, (x1, y1, x2, y2))
    free_lane_fills = {}   # key: lane index, value: (detection_time, (start, y_top, end, y_bottom))
    OCCUPY_DURATION = 1.0  # Duration in seconds to keep the fill

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame_idx += 1
        current_time = frame_idx / fps
        print(f"Processing frame {frame_idx} at {current_time:.2f} sec")

        # Lane Detection
        lane_centers = detect_lanes(frame)
        lane_regions = divide_lane_regions(lane_centers, width)
        ego_lane = get_ego_lane(lane_regions)

        # Visualize lane regions (outline only)
        for idx, (start, end) in enumerate(lane_regions):
            color = (0, 255, 0) if idx != ego_lane else (255, 255, 0)
            cv2.rectangle(frame, (start, int(height*0.6)), (end, height), color, 2)
            cv2.putText(frame, f"Lane {idx}", (start+5, int(height*0.6)-10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)

        # Vehicle Detection via YOLOv10
        results = model(frame)
        detections = []
        for r in results:
            for box in r.boxes.data:
                x1, y1, x2, y2, conf, cls = box.tolist()
                if conf < 0.4:
                    continue
                detections.append(([x1, y1, x2, y2], conf, cls))

        # Update DeepSORT Tracker
        tracks = tracker.update_tracks(detections, frame=frame)
        current_info = {}
        for track in tracks:
            if not track.is_confirmed():
                continue
            track_id = track.track_id
            ltwh = track.to_ltwh()  # (left, top, width, height)
            x1, y1, w, h = ltwh
            x2, y2 = x1 + w, y1 + h
            x_center = int(x1 + w/2)
            area = w * h
            lane_assignment = assign_to_lane(x_center, lane_regions)

            # Draw a normal (blue) bounding box for display (for debugging or info)
            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (255, 0, 0), 2)
            cv2.putText(frame, f"ID:{track_id} L:{lane_assignment}", (int(x1), int(y1)-10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)

            current_info[track_id] = {'area': area, 'lane': lane_assignment, 'x_center': x_center}

            # Overtaking Detection: when a vehicle in the ego lane moves to another lane with a significant area increase
            if track_id in tracked_info:
                prev = tracked_info[track_id]
                if prev['area'] > 0:
                    area_ratio = (area - prev['area']) / prev['area']
                else:
                    area_ratio = 0
                if prev['lane'] == ego_lane and lane_assignment != ego_lane and area_ratio > AREA_INCREASE_THRESHOLD:
                    # Set dynamic lateral thresholds based on vehicle size
                    if area < SMALL_VEHICLE_AREA_THRESHOLD:
                        left_threshold = 0.35 * width
                        right_threshold = 0.65 * width
                    else:
                        left_threshold = 0.30 * width
                        right_threshold = 0.70 * width
                    if lane_assignment < ego_lane and x_center < left_threshold:
                        alert = "right lane busy"
                        cv2.putText(frame, alert, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)
                        play_audio_alert(alert, current_time)
                        # Record detection with bounding box coordinates
                        overtaking_boxes[track_id] = (current_time, (int(x1), int(y1), int(x2), int(y2)))
                    elif lane_assignment > ego_lane and x_center > right_threshold:
                        alert = "left lane busy"
                        cv2.putText(frame, alert, (50, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)
                        play_audio_alert(alert, current_time)
                        overtaking_boxes[track_id] = (current_time, (int(x1), int(y1), int(x2), int(y2)))

        # Draw red filled rectangles for overtaking vehicles if within OCCUPY_DURATION
        for tid, (det_time, bbox) in list(overtaking_boxes.items()):
            if current_time - det_time <= OCCUPY_DURATION:
                x1_box, y1_box, x2_box, y2_box = bbox
                cv2.rectangle(frame, (x1_box, y1_box), (x2_box, y2_box), (0, 0, 255), -1)  # Red filled
            else:
                # Remove expired entries
                del overtaking_boxes[tid]

        # Free Lane Detection with 3-second rule and safety check:
        for idx, region in enumerate(lane_regions):
            lane_tracks = {tid: info for tid, info in current_info.items() if info['lane'] == idx}
            lane_free = True
            for tid, info in lane_tracks.items():
                if info['area'] >= FREE_LANE_MIN_AREA:
                    lane_free = False
                    break
                if tid in tracked_info:
                    prev_area = tracked_info[tid]['area']
                    if prev_area > 0:
                        ratio = (info['area'] - prev_area) / prev_area
                        if ratio > FREE_SPEED_THRESHOLD:
                            lane_free = False
                            break
            if lane_free:
                free_lane_counters[idx] = free_lane_counters.get(idx, 0) + 1
            else:
                free_lane_counters[idx] = 0

            if free_lane_counters.get(idx, 0) >= fps * 3:
                if idx < ego_lane:
                    alert = "right lane clear"
                elif idx > ego_lane:
                    alert = "left lane clear"
                else:
                    alert = "both lane clear"
                cv2.putText(frame, alert, (50, 100 + idx*30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 3)
                play_audio_alert(alert, current_time)
                # Record free lane fill region (from y = int(height*0.6) to height)
                start, end = lane_regions[idx]
                free_lane_fills[idx] = (current_time, (start, int(height*0.6), end, height))
                free_lane_counters[idx] = 0

        # Draw green filled rectangles for free lanes if within OCCUPY_DURATION
        for idx, (det_time, region_coords) in list(free_lane_fills.items()):
            if current_time - det_time <= OCCUPY_DURATION:
                start, y_top, end, y_bottom = region_coords
                cv2.rectangle(frame, (start, y_top), (end, y_bottom), (0, 255, 0), -1)  # Green filled
            else:
                del free_lane_fills[idx]

        out.write(frame)
        tracked_info = current_info.copy()

    cap.release()
    out.release()
    cv2.destroyAllWindows()
    print("Video processing complete!")


# === STEP 10: Run Video Processing ===
# Update these paths as per your Drive structure
input_video = "/content/drive/My Drive/deepdrive/freenovrtk.mp4"
temp_output_video = "/content/drive/My Drive/deepdrive/detected_output.avi"
process_video(input_video, temp_output_video)
print("Processing complete! Video saved at:", temp_output_video)

# === STEP 11: Merge Audio with Video Using MoviePy ===
video_clip = VideoFileClip(temp_output_video)
duration = video_clip.duration
def make_silence(t):
    return 0
silent_audio = AudioClip(make_silence, duration=duration, fps=44100)
alert_clips = []
for timestamp, alert_filename in alert_events:
    if os.path.exists(alert_filename):
        alert_audio = AudioFileClip(alert_filename)
        alert_audio = alert_audio.set_start(timestamp)
        alert_clips.append(alert_audio)
if alert_clips:
    composite_audio = CompositeAudioClip([silent_audio] + alert_clips)
else:
    composite_audio = silent_audio
final_output = "/content/drive/My Drive/deepdrive/final_output.mp4"
final_clip = video_clip.set_audio(composite_audio)
final_clip.write_videofile(final_output, codec="libx264", audio_codec="aac")
print("Final video with audio saved at:", final_output)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Deleted: /content/alert_8.8.mp3
Deleted: /content/alert_3.0.mp3
Using device: cuda
Processing frame 1 at 0.05 sec

0: 384x640 (no detections), 31.0ms
Speed: 3.5ms preprocess, 31.0ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)
Processing frame 2 at 0.09 sec

0: 384x640 (no detections), 30.0ms
Speed: 4.4ms preprocess, 30.0ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)
Processing frame 3 at 0.14 sec

0: 384x640 (no detections), 17.4ms
Speed: 3.5ms preprocess, 17.4ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)
Processing frame 4 at 0.18 sec

0: 384x640 (no detections), 19.0ms
Speed: 3.5ms preprocess, 19.0ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)
Processing frame 5 at 0.23 sec

0: 384x640 (no detections), 17.1ms
Speed: 3.6ms preprocess, 17.1ms inference, 0.5ms postprocess per 

Processing frame 67 at 3.05 sec

0: 384x640 (no detections), 12.1ms
Speed: 3.2ms preprocess, 12.1ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)
Processing frame 68 at 3.09 sec

0: 384x640 (no detections), 12.1ms
Speed: 3.3ms preprocess, 12.1ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)
Processing frame 69 at 3.14 sec

0: 384x640 (no detections), 12.1ms
Speed: 3.2ms preprocess, 12.1ms inference, 0.4ms postprocess per image at shape (1, 3, 384, 640)
Processing frame 70 at 3.18 sec

0: 384x640 (no detections), 12.1ms
Speed: 3.1ms preprocess, 12.1ms inference, 0.4ms postprocess per image at shape (1, 3, 384, 640)
Processing frame 71 at 3.23 sec

0: 384x640 (no detections), 12.1ms
Speed: 3.1ms preprocess, 12.1ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)
Processing frame 72 at 3.27 sec

0: 384x640 (no detections), 20.7ms
Speed: 3.8ms preprocess, 20.7ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)
Processing

Processing frame 194 at 8.82 sec

0: 384x640 2 persons, 13 cars, 1 bus, 2 trucks, 1 traffic light, 12.2ms
Speed: 3.8ms preprocess, 12.2ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)
Processing frame 195 at 8.86 sec

0: 384x640 3 persons, 15 cars, 1 bus, 2 trucks, 15.5ms
Speed: 3.7ms preprocess, 15.5ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)
Processing frame 196 at 8.91 sec

0: 384x640 3 persons, 15 cars, 1 bus, 2 trucks, 18.2ms
Speed: 3.7ms preprocess, 18.2ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)
Processing frame 197 at 8.95 sec

0: 384x640 2 persons, 12 cars, 1 bus, 3 trucks, 13.8ms
Speed: 4.0ms preprocess, 13.8ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)
Processing frame 198 at 9.00 sec

0: 384x640 2 persons, 12 cars, 1 bus, 3 trucks, 13.8ms
Speed: 3.7ms preprocess, 13.8ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)
Processing frame 199 at 9.05 sec

0: 384x640 1 person



MoviePy - Done.
Moviepy - Writing video /content/drive/My Drive/deepdrive/final_output.mp4





Moviepy - Done !
Moviepy - video ready /content/drive/My Drive/deepdrive/final_output.mp4
Final video with audio saved at: /content/drive/My Drive/deepdrive/final_output.mp4
