## importing libraries

In [19]:
# First, run this code in the command line to download the required libraries:
# pip install opencv-python-headless numpy torch torchvision tensorflow pygame ultralytics depth-anything

import cv2
import numpy as np
import torch
import torch.nn.functional as F
from torchvision.transforms import Compose
from depth_anything.dpt import DepthAnything
from depth_anything.util.transform import Resize, NormalizeImage, PrepareForNet
import tensorflow as tf
import time
import pygame
import threading
from ultralytics import YOLO

## training the model

In [20]:

gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        print(e)

def parse_arguments(image_path='./assets/examples/demo1.png', depth_vis='./depth_vis', vitl='vits'):
    return {'image_path': image_path, 'depth_vis': depth_vis, 'vits': vitl,'grayscale': False,'pred_only':False}

args = parse_arguments()

margin_width = 50
caption_height = 60

font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1
font_thickness = 2

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
depth_anything = DepthAnything.from_pretrained('LiheYoung/depth_anything_{}14'.format(args['vits'])).to(DEVICE).eval()
total_params = sum(param.numel() for param in depth_anything.parameters())
print('Total parameters: {:.2f}M'.format(total_params / 1e6))
transform = Compose([
    Resize(
        width=518,
        height=518,
        resize_target=False,
        keep_aspect_ratio=True,
        ensure_multiple_of=14,
        resize_method='lower_bound',
        image_interpolation_method=cv2.INTER_CUBIC,
    ),
    NormalizeImage(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    PrepareForNet(),
])

Total parameters: 24.79M


## Applying depth model on a frame(function)

In [21]:
def depth_img(frame):
    image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) / 255.0

    h, w = image.shape[:2]

    image = transform({'image': image})['image']
    image = torch.from_numpy(image).unsqueeze(0).to(DEVICE)
    with torch.no_grad():
        depth = depth_anything(image)

    depth = F.interpolate(depth[None], (h, w), mode='bilinear', align_corners=False)[0, 0]

    depth = (depth - depth.min()) / (depth.max() - depth.min()) * 255.0
    depth = depth.cpu().numpy().astype(np.uint8)
    return depth


## Downloading yolo model

In [22]:
model = YOLO("yolov8n-pose.pt")

## real time

In [None]:
# Initialize Pygame modules for audio
pygame.init()
pygame.mixer.init()

# Initialize color detection results and sound indexes
res = {"green": 0, "red": 0, "yellow": 0}
index_color = {"green": 2, "red": 0, "yellow": 1}
sounds = {
    "red": pygame.mixer.Sound("sounds/red.wav"),
    "yellow": pygame.mixer.Sound("sounds/yellow.wav"),
    "green": pygame.mixer.Sound("sounds/green.wav")
}

# Variables to manage sound play timing
last_time_played = 0
min_delay_between_sounds = [0.7, 0.5, 0.5]

image = None

# Lock for thread synchronization
lock = threading.Lock()

# Parameters for trapezoidal region
width_division = 2
height_plus = 100
width_diff = 80
top_trapew_dist = 50

# Yellow color detection for trapezoidal region
min_val_yellow = 100
max_val_yellow = 170

def play_audio(s):
    """Play sound associated with color if enough time has passed since the last sound."""
    global last_time_played, last_sound
    current_time = time.time()
    i = index_color[s]
    if current_time - last_time_played > min_delay_between_sounds[i]:
        with lock:
            sounds[s].play()
            last_time_played = current_time

def draw_trapez(color, coord):
    """Draw a semi-transparent trapezoidal overlay on the image."""
    global image
    overlay = image.copy()
    vertices = np.array([coord], dtype=np.int32)
    cv2.fillPoly(overlay, vertices, color=color)
    alpha = 0.3
    image = cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0)

def process_frame(frame):
    """Process each frame to detect colors and manage overlays."""
    global image
    image = frame.copy()
    frame = depth_img(frame)
    h, w = frame.shape[:2]
    w4 = (w // width_division) - top_trapew_dist
    w3 = (w // width_division) + top_trapew_dist
    h1 = (h // 2) + height_plus
    coord_trapezs = [[(w4, h1), (w3, h1), (w - width_diff, h), (width_diff, h)], [], []]
    is_red = False
    is_yellow = False
    for y in range(h1, h):
        if is_red:
            trapez2draw(is_yellow, is_red, coord_trapezs)
            break
        x2 = int((y - h1) / (h - h1) * ((w - width_diff) - w3) + w3)
        x1 = w - x2
        for x in range(x1, x2):
            white = frame[y, x]

            if white > min_val_yellow:
                if not is_yellow and white < max_val_yellow:
                    is_yellow = True
                    coord_trapezs[0][2], coord_trapezs[0][3] = (x2, y), (x1, y)
                    coord_trapezs[1] = [(x1, y), (x2, y), (w - width_diff, h), (width_diff, h)]

                if white >= max_val_yellow:
                    is_red = True
                    if is_yellow:
                        coord_trapezs[1][2], coord_trapezs[1][3] = (x2, y), (x1, y)
                    else:
                        coord_trapezs[0][2], coord_trapezs[0][3] = (x2, y), (x1, y)
                    coord_trapezs[2] = [(x1, y), (x2, y), (w - width_diff, h), (width_diff, h)]
                    break
        if y == h - 1:
            trapez2draw(is_yellow, is_red, coord_trapezs)

    draw_rect(model, coord_trapezs)

def trapez2draw(is_yellow, is_red, coord_trapezs):
    """Draw appropriate trapezoids based on color detection results."""
    if is_yellow:
        draw_trapez((0, 247, 255), coord_trapezs[1])
    if is_red:
        draw_trapez((0, 0, 255), coord_trapezs[2])
    draw_trapez((0, 255, 0), coord_trapezs[0])

def check_intersection(rect, trapez):
    """Check if a rectangle intersects with a trapezoid."""
    if trapez:
        if rect[1][0] < trapez[3][0] or rect[0][0] > trapez[2][0]:
            return False
        if rect[1][1] < trapez[0][1] or rect[0][1] > trapez[3][1]:
            return False
        return True
    return False

def traitement_box(box, coord_trapezs):
    """Check intersection of detected boxes with trapezoids and update color detection results."""
    global res, image
    x, y, width, height = box.xywh.tolist()[0]
    a, b, c, d = int(x - width / 2), int(y - height / 2), int(x + width / 2), int(y + height / 2)
    image = cv2.rectangle(image, (a, b), (c, d), (0, 255, 0), 2)
    for i in range(2, -1, -1):
        if check_intersection([(a, b), (c, d)], coord_trapezs[i]):
            if i == 0 and not any(res.values()):
                res["green"] = 1
            elif i == 1:
                res["green"] = 0
                if res["red"] == 0:
                    res["yellow"] = 1
            else:
                res = {"green": 0, "red": 1, "yellow": 0}
            break

def draw_rect(model, coord_trapezs):
    """Process image with model, handle threading for box processing."""
    global res, image
    info_yolo = model(image)
    results = info_yolo[0]
    boxes = results.boxes
    if boxes:
        with threading.Semaphore(len(boxes)): 
            threads = []
            res = {"green": 0, "red": 0, "yellow": 0}
            for l in range(len(boxes)):
                thread = threading.Thread(target=traitement_box, args=(boxes[l], coord_trapezs))
                threads.append(thread)
                thread.start()

            for thread in threads:
                thread.join()

            if res["red"]:
                s = "red"
            elif res["yellow"]:
                s = "yellow"
            elif res["green"]:
                s = "green"

            if any(res.values()):
                play_audio(s)

# Webcam and display setup
try:
    cam = cv2.VideoCapture(0)
    cv2.namedWindow("resultat")
    while True:
        ret, frame = cam.read()
        if not ret:
            break
        process_frame(frame)
        cv2.imshow("resultat", image)
        if cv2.waitKey(33) == ord('q'):
            break
finally:
    cam.release()
    cv2.destroyAllWindows()
