In [2]:
!pip install ultralytics supervision pyttsx3 pygame opencv-python numpy --quiet

import cv2, time, torch, numpy as np, threading, math, os
from ultralytics import YOLO
import supervision as sv
import pyttsx3, pygame
from tkinter import Tk, filedialog
from queue import Queue

print("‚úî Packages Loaded")
print("CUDA Available:", torch.cuda.is_available())

# =============== TTS QUEUE (thread-safe) ===============
tts_queue = Queue()
tts_active = False

def tts_worker():
    global tts_active
    while True:
        text = tts_queue.get()
        tts_active = True
        try:
            tts_engine.say(text)
            tts_engine.runAndWait()
        except Exception as e:
            print("TTS error:", e)
        tts_active = False
        tts_queue.task_done()
# =======================================================

# === USER CONFIG ===
use_webcam = False   # set True for live mode
audio_feedback = True
tts_feedback = True

# === STATIC VIDEO INPUT ===
video_path = "sample3.mp4"
if not use_webcam:
    print("üé¨ Source:", video_path)
    cap = cv2.VideoCapture(video_path)
else:
    print("üì∑ Webcam Mode")
    cap = cv2.VideoCapture(0)

if not cap.isOpened():
    raise RuntimeError(f"‚ùå Failed to open: {video_path if not use_webcam else 'Webcam'}")

# =============== AUDIO (three mp3s) ====================
print("Initializing Audio...")
slow_beep = long_beep = rapid_beep = None
try:
    pygame.mixer.init(frequency=22050, size=-16, channels=2)
    slow_beep  = pygame.mixer.Sound("nearby.mp3")            # CAUTION_NEARBY
    long_beep  = pygame.mixer.Sound("danger_probable.mp3")   # DANGER_PROBABLE
    rapid_beep = pygame.mixer.Sound("danger_imminent.mp3")   # DANGER_IMMINENT
    print("‚úî Pygame Audio Ready")
except Exception as e:
    print("‚ö† Audio init failed:", e)

def stop_all_sounds():
    try:
        pygame.mixer.stop()
    except Exception:
        pass

def play_alert_tone(state, pan_x=None, frame_w=None):
    """Loop sound for state; pan left/right for IMMINENT if position known."""
    if not audio_feedback:
        return
    stop_all_sounds()
    try:
        ch = pygame.mixer.find_channel(True)
        if state == "CAUTION_NEARBY" and slow_beep:
            ch.play(slow_beep, loops=-1)
        elif state == "DANGER_PROBABLE" and long_beep:
            ch.play(long_beep, loops=-1)
        elif state == "DANGER_IMMINENT" and rapid_beep:
            # Directional panning
            if pan_x is not None and frame_w:
                pan = max(0.0, min(1.0, pan_x / frame_w))
                ch.set_volume(1.0 - pan, pan)
            ch.play(rapid_beep, loops=-1)
        # SAFE or unknown -> silence (already stopped)
    except Exception as e:
        print("Audio play error:", e)
# =======================================================

# =============== TTS ======================
tts_engine = pyttsx3.init()
tts_engine.setProperty('rate', 175)
# start worker ONCE
threading.Thread(target=tts_worker, daemon=True).start()

def speak(txt):
    if not tts_feedback:
        return
    while not tts_queue.empty():
        try: tts_queue.get_nowait()
        except: pass
    tts_queue.put(txt)

# =========================================

# Load Model
device = "cuda" if torch.cuda.is_available() else "cpu"
model = YOLO("yolov8m.pt").to(device)   
CLASS_NAMES = model.model.names
tracker = sv.ByteTrack()
print("‚úî YOLOv8 Loaded on", device.upper())

TARGET_CLASS_IDS = [0,1,2,3,5,6,7,9]
VEHICLE_CLASS_IDS = [1,2,3,5,6,7]

object_history = {}
TTS_COOLDOWN = 1
last_alert_time = 0
last_announced = "SAFE"
STATE_STABLE_FRAMES = 3

def box_center_area(box):
    x1,y1,x2,y2 = box
    return (x1+x2)/2, (y1+y2)/2, max(1,(x2-x1)*(y2-y1))

YOLO_CONF = 0.35

stable_counter = 0
stable_state = "SAFE"
last_audio_state = "SAFE"   
frame_index = 0
fps_time = time.time()

while True:
    ret, frame = cap.read()
    if not ret:
        break

    frame_index += 1
    h,w,_ = frame.shape

    # Use polygon zones only for demo video, NOT webcam mode
    if not use_webcam and frame_index == 1:
        danger_pts = np.array([
            (int(w*0.1),h),(int(w*0.9),h),(int(w*0.70),int(h*0.875)),(int(w*0.30),int(h*0.875))
        ])
        caution_pts = np.array([
            (int(w*-0.2),h),(int(w*1.2),h),(int(w*0.80),int(h*0.75)),(int(w*0.20),int(h*0.75))
        ])
        aware_pts = np.array([
            (int(w*-0.5),h),(int(w*1.5),h),(int(w*0.90),int(h*0.65)),(int(w*0.10),int(h*0.65))
        ])
        danger_zone = sv.PolygonZone(danger_pts)
        caution_zone = sv.PolygonZone(caution_pts)
        aware_zone   = sv.PolygonZone(aware_pts)
        dz_anno = sv.PolygonZoneAnnotator(danger_zone, color=sv.Color.RED)
        cz_anno = sv.PolygonZoneAnnotator(caution_zone, color=sv.Color.YELLOW)
        az_anno = sv.PolygonZoneAnnotator(aware_zone,   color=sv.Color.GREEN)

    # === YOLO DETECT ===
    res = model(frame, conf=YOLO_CONF, verbose=False)[0]
    det = sv.Detections.from_ultralytics(res)
    det = det[np.isin(det.class_id, TARGET_CLASS_IDS)]
    tracked = tracker.update_with_detections(det)

    labels=[]; vehicles=[]
    for xyxy,cls,conf,tid in zip(tracked.xyxy,tracked.class_id,tracked.confidence,tracked.tracker_id):
        x1,y1,x2,y2 = map(float,xyxy)
        cx,cy,area = box_center_area((x1,y1,x2,y2))

        if tid not in object_history:
            object_history[tid] = [cx,cy,area,0,frame_index]
        else:
            px,py,pa,pcnt,_ = object_history[tid]
            moving = math.hypot(cx-px,cy-py) > 4
            if area > pa*1.04: pcnt = min(10,pcnt+1)
            else: pcnt = max(0,pcnt-1)
            approaching = pcnt>=3
            object_history[tid] = [cx,cy,area,pcnt,frame_index]
            labels.append(f"{CLASS_NAMES[int(cls)]} {conf:.2f}" +
                          (" moving" if moving else "") +
                          (" approaching" if approaching else ""))
            if cls in VEHICLE_CLASS_IDS:
                vehicles.append({"cx":cx,"moving":moving,"approach":approaching})

    stale=[tid for tid,v in object_history.items() if frame_index-v[4]>30]
    for s in stale: del object_history[s]

    # === Dynamic proximity for webcam mode ===
    proximity_score = 0
    
    if use_webcam:
        for xyxy in tracked.xyxy:
            x1,y1,x2,y2 = xyxy
            box_area = (x2-x1)*(y2-y1)
            frame_area = w*h
            area_ratio = box_area / frame_area  # Bigger = closer
            bottom_pos = y2 / h  # Lower in frame = closer to feet
            
            # Weighted score
            score = area_ratio*1.2 + bottom_pos*0.8
            proximity_score = max(proximity_score, score)


    # === Override hazard with wearable logic when webcam ===
    if use_webcam:
        if proximity_score > 0.40:
            state, msg = "DANGER_IMMINENT", "Immediate obstacle ahead!"
        elif proximity_score > 0.22:
            state, msg = "DANGER_PROBABLE", "Obstacle very close"
        elif proximity_score > 0.10:
            state, msg = "CAUTION_NEARBY", "Object nearby"
        else:
            state, msg = "SAFE", None

    
    # === Hazard Logic (Fixed) ===
    obstacles = tracked[np.isin(tracked.class_id,TARGET_CLASS_IDS)]
    
    if use_webcam:
        # Webcam mode: proximity logic already computed above
        # state & msg already computed, do NOT override them
        ctx = None  # no directional info yet for wearable mode
    
    else:
        # ‚úÖ Video/demo mode: use polygon zones
        in_d = danger_zone.trigger(obstacles)
        in_c = caution_zone.trigger(obstacles)
        in_a = aware_zone.trigger(obstacles)
    
        state = "SAFE"; msg=None; ctx=None
    
        if np.any(in_d):
            if stable_state in ["DANGER_PROBABLE","DANGER_IMMINENT"]:
                state="DANGER_IMMINENT"; msg="Collision imminent!"
            else:
                state="DANGER_PROBABLE"; msg="Caution, possible collision!"
            cx,_,_ = box_center_area(obstacles[in_d].xyxy[0]); ctx = cx
    
        elif np.any(in_c):
            state="DANGER_PROBABLE"; msg="Too close!"
    
        elif np.any(in_a):
            state="CAUTION_NEARBY"; msg="Obstacle near"
    
        else:
            state="SAFE"; msg=None; ctx=None



    # Stability smoothing
    stable_counter = stable_counter+1 if state == stable_state else 0
    stable_state = state
    stable_enough = (stable_counter >= STATE_STABLE_FRAMES) or (state=="DANGER_IMMINENT")

    now = time.time()
    if stable_enough and state != last_announced and now-last_alert_time>TTS_COOLDOWN and msg:
        print(f"[{frame_index}] ALERT ‚Üí {state}: {msg}")
        # AUDIO: start/adjust looping tone on state change
        if state != last_audio_state:
            play_alert_tone(state, pan_x=ctx, frame_w=w)
            last_audio_state = state
        # TTS queued (no overlap)
        if tts_feedback and not tts_active:
            tts_queue.put(msg)
        last_alert_time = now
        last_announced = state
    # If we returned to SAFE and audio was playing, stop it
    if state == "SAFE" and last_audio_state != "SAFE":
        stop_all_sounds()
        last_audio_state = "SAFE"

    # === DRAW ===
    if not use_webcam:
        frame = dz_anno.annotate(frame,"DANGER")
        frame = cz_anno.annotate(frame,"CAUTION")
        frame = az_anno.annotate(frame,"SAFE")


    # boxes + labels
    frame = sv.BoxAnnotator().annotate(scene=frame, detections=tracked)
    la = sv.LabelAnnotator(text_scale=0.5, text_thickness=1)
    if len(labels) != len(tracked):
        labels = [labels[i] if i < len(labels) else "" for i in range(len(tracked))]
    frame = la.annotate(scene=frame, detections=tracked, labels=labels)

    # Status text (ASCII)
    if state == "DANGER_IMMINENT":
        status_text, color = "DANGER: COLLISION IMMINENT", (0,0,255)
    elif state == "DANGER_PROBABLE":
        status_text, color = "WARNING: POSSIBLE COLLISION", (0,255,255)
    elif state == "CAUTION_NEARBY":
        status_text, color = "CAUTION: OBSTACLE NEAR", (0,255,255)
    else:
        status_text, color = "SAFE", (0,255,0)
    cv2.putText(frame, status_text, (40,60), cv2.FONT_HERSHEY_SIMPLEX, 1.2, color, 3, cv2.LINE_AA)

    if use_webcam:
        cv2.putText(frame, f"Proximity: {proximity_score:.2f}",
                    (40,100), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2)


    cv2.imshow("Assistive Vision", frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    

# cleanup
stop_all_sounds()
cap.release()
cv2.destroyAllWindows()
print("‚úÖ Finished")



[notice] A new release of pip is available: 24.0 -> 25.3
[notice] To update, run: C:\Users\bhikr\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


pygame 2.6.1 (SDL 2.28.4, Python 3.11.9)
Hello from the pygame community. https://www.pygame.org/contribute.html
‚úî Packages Loaded
CUDA Available: True
üé¨ Source: sample3.mp4
Initializing Audio...
‚úî Pygame Audio Ready
[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8m.pt to 'yolov8m.pt': 100% ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ 49.7MB 6.4MB/s 7.8s7.5s<0.1ss8s
‚úî YOLOv8 Loaded on CUDA
[4] ALERT ‚Üí DANGER_PROBABLE: Too close!
[577] ALERT ‚Üí DANGER_IMMINENT: Collision imminent!
[629] ALERT ‚Üí DANGER_PROBABLE: Too close!
‚úÖ Finished
