In [17]:
import cv2
import torch
import pyttsx3
import threading
import queue
import time
import numpy as np
from torchvision.transforms import Compose, ToTensor, Normalize
from transformers import DetrImageProcessor, DetrForObjectDetection
from PIL import Image

In [None]:

# Camera Field of View
CAMERA_FOV = 90  
PROCESS_INTERVAL = 0.25  # Process frames every 0.25 seconds

# Object Tracking
tracked_objects = {}  # {ID: (class_name, x_center, y_center, depth)}
spoken_objects = {}  # {class_name: (last_spoken_time, min_distance, direction)}
speech_queue = queue.Queue()

# Lock for speech thread
speech_lock = threading.Lock()

def load_depth_model(device):
    """Load MiDaS depth estimation model."""
    depth_model = torch.hub.load("intel-isl/MiDaS", "MiDaS_small").to(device)
    depth_model.eval()
    transform = Compose([ToTensor(), Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])
    return depth_model, transform

def load_dino_model(device):
    """Load DINO model."""
    processor = DetrImageProcessor.from_pretrained("facebook/detr-resnet-50")
    dino_model = DetrForObjectDetection.from_pretrained("facebook/detr-resnet-50").to(device)
    return processor, dino_model

def speech_worker():
    """Thread worker that speaks detected objects concisely."""
    engine = pyttsx3.init()
    engine.setProperty("rate", 150)
    engine.setProperty("volume", 1.0)

    while True:
        objects_to_speak = speech_queue.get()
        if objects_to_speak == "STOP":
            engine.stop()
            break
        
        with speech_lock:
            sentence = ". ".join(objects_to_speak) + "."
            engine.say(sentence)
            engine.runAndWait()
        
        speech_queue.task_done()

speech_thread = threading.Thread(target=speech_worker, daemon=True)
speech_thread.start()

def speak(detected_objects):
    """Announce each object type only once with the closest detected instance."""
    current_time = time.time()
    speech_items = []
    
    for class_name, (last_time, min_distance, direction) in detected_objects.items():
        if class_name not in spoken_objects or (current_time - spoken_objects[class_name][0] > 5):
            speech_items.append(f"{class_name} at {min_distance:.2f} meters, {direction}")
            spoken_objects[class_name] = (current_time, min_distance, direction)
    
    if speech_items:
        speech_queue.put(speech_items)

def estimate_depth(frame, depth_model, transform, device):
    """Estimate depth using MiDaS model."""
    frame = cv2.resize(frame, (frame.shape[1] // 32 * 32, frame.shape[0] // 32 * 32))
    frame_tensor = transform(frame).unsqueeze(0).to(device)

    with torch.no_grad():
        depth_map = depth_model(frame_tensor)

    depth_map = depth_map.squeeze().cpu().numpy()
    depth_map = (depth_map - depth_map.min()) / (depth_map.max() - depth_map.min())  
    depth_map = cv2.resize(depth_map, (frame.shape[1], frame.shape[0]))

    return depth_map

def calculate_angle(frame_width, x_center):
    """Calculate object direction based on x-center position."""
    relative_position = (x_center - frame_width / 2) / (frame_width / 2)
    angle = relative_position * (CAMERA_FOV / 2)
    return angle

def describe_direction(angle):
    """Convert angle into human-friendly direction."""
    if angle < -30:
        return "far left"
    elif angle < -15:
        return "left"
    elif angle < 15:
        return "center"
    elif angle < 30:
        return "right"
    else:
        return "far right"

def detect_objects(frame, processor, dino_model, depth_map, device):
    """Detect and summarize objects without repetition."""
    image = Image.fromarray(frame)
    inputs = processor(images=image, return_tensors="pt").to(device)

    with torch.no_grad():
        outputs = dino_model(**inputs)

    logits = outputs.logits.softmax(-1)[0]  
    boxes = outputs.pred_boxes[0]  
    
    detected_objects = {}
    id2label = dino_model.config.id2label  

    for i in range(logits.shape[0]):  
        class_id = logits[i].argmax().item()
        class_score = logits[i].max().item()
        if class_score < 0.7:  
            continue

        class_name = id2label.get(class_id, "Unknown Object")  
        x_center, y_center, w, h = boxes[i].detach().cpu().numpy()
        x_center, y_center = x_center * frame.shape[1], y_center * frame.shape[0]

        depth = depth_map[int(y_center), int(x_center)] if 0 <= y_center < depth_map.shape[0] and 0 <= x_center < depth_map.shape[1] else float('nan')
        angle = calculate_angle(frame.shape[1], x_center)
        direction = describe_direction(angle)

        if class_name not in detected_objects or depth < detected_objects[class_name][1]:
            detected_objects[class_name] = (time.time(), depth, direction)
    
    speak(detected_objects)

def main():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"🎯 Using device: {device}")

    processor, dino_model = load_dino_model(device)
    depth_model, transform = load_depth_model(device)

    cap = cv2.VideoCapture("AddVideoFilePath")
    if not cap.isOpened():
        return

    last_process_time = 0  
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        current_time = time.time()
        if current_time - last_process_time >= PROCESS_INTERVAL:  
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            depth_map = estimate_depth(frame_rgb, depth_model, transform, device)
            detect_objects(frame_rgb, processor, dino_model, depth_map, device)
            last_process_time = current_time  

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    speech_queue.put("STOP")  

if __name__ == "__main__":
    main()