In [56]:
from ultralytics import YOLO
from collections import defaultdict
import cv2
import numpy as np
import os
import math

In [57]:
# Загрузка модели YOLOv8
model = YOLO('yolov9c.pt')

# Список цветов для различных классов
colors = [
    (255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (0, 255, 255),
    (255, 0, 255), (192, 192, 192), (128, 128, 128), (128, 0, 0), (128, 128, 0),
    (0, 128, 0), (128, 0, 128), (0, 128, 128), (0, 0, 128), (72, 61, 139),
    (47, 79, 79), (47, 79, 47), (0, 206, 209), (148, 0, 211), (255, 20, 147)
]

# PART 1

## TASK№1

In [58]:
# Функция для обработки изображения
def process_image(image_path):
    image = cv2.imread(image_path)
    results = model(image)[0]
    
    # Получение оригинального изображения и результатов
    image = results.orig_img
    classes_names = results.names
    classes = results.boxes.cls.cpu().numpy()
    boxes = results.boxes.xyxy.cpu().numpy().astype(np.int32)

    grouped_objects = {}

    # Рисование рамок и группировка результатов
    for class_id, box in zip(classes, boxes):
        if class_id==0:    
            class_name = classes_names[int(class_id)]
            color = colors[int(class_id) % len(colors)]  
            if class_name not in grouped_objects:
                grouped_objects[class_name] = []
                
            x1, y1, x2, y2 = box
            center_x = (x1 + x2) // 2
            center_y = (y1 + y2) // 2
            grouped_objects[class_name].append((box, (center_x, center_y)))
            
            cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
            cv2.putText(image, class_name, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
            
    count_of_persons = len([el for el in classes if el == 0])
     
    cv2.putText(image, f"Count of persons: {count_of_persons}", (450,370), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    # Сохранение измененного изображения
    new_image_path = os.path.splitext(image_path)[0] + '_yolo' + os.path.splitext(image_path)[1]
    cv2.imwrite(new_image_path, image)

    # Сохранение данных в текстовый файл
    text_file_path = os.path.splitext(image_path)[0] + '_data.txt'
    with open(text_file_path, 'w') as f:
        for class_name, details in grouped_objects.items():
            f.write(f"{class_name}:\n")
            for box, center in details:
                f.write(f"Coordinates: ({box[0]}, {box[1]}, {box[2]}, {box[3]}) Center: ({center[0]}, {center[1]})\n")
    
   
    
    print(f"Count of persons: {count_of_persons}")
    print(f"Processed {image_path}:")
    print(f"Saved bounding-box image to {new_image_path}")
    print(f"Saved data to {text_file_path}")
    return results, grouped_objects

In [None]:
res, objects = process_image('data/test_imgs/720x.png')

In [None]:
#Получение центров баундбоксов людей и построение групп
centers = np.array([objects['person'][el][1]  for el in range(len(objects['person']))])

groups_of_persons= {}

num_rectangles = centers.shape[0]
distance_matrix = np.zeros((num_rectangles, num_rectangles))
counter=-1
for i in range(num_rectangles):
    for j in range(num_rectangles):
        if i != j:
            distance_matrix[i, j] = np.linalg.norm(centers[i] - centers[j])
            if distance_matrix[i,j]<100:
                all_values = [item for sublist in groups_of_persons.values() for item in sublist]
                if i not in all_values  and j not in all_values:
                    counter+=1
                    groups_of_persons.update({f"Group №{counter}" :[i,  j]})
                else:
                    groups_of_persons[f"Group №{counter}"].extend([i, j])
#Получение людей в группах          
unique_list = []
for group in groups_of_persons.keys(): 
    persons_in_group = np.array(groups_of_persons[group])
    unique_persons = np.unique(persons_in_group)
    unique_list.append(unique_persons)
# Получение координат баундбоксов
list_of_coordinates=[]
for group in range(len(unique_list)): 
    list_of_x=[]
    list_of_y=[]
    for num in unique_list[group]:
        list_of_x.extend([objects["person"][num][0][0], objects["person"][num][0][2]])
        list_of_y.extend([objects["person"][num][0][1], objects["person"][num][0][3]])
    list_of_coordinates.append([list_of_x,list_of_y])
        
boundnoxes = []
for coordinates in list_of_coordinates:
    xmin, xmax = min(coordinates[0]), max(coordinates[0])
    print(coordinates[0])
    ymin, ymax = min(coordinates[1]), max(coordinates[1])
    boundnoxes.append([xmin,ymin,xmax,ymax])

## TASK№2

In [6]:
def drawing_boundboxes(image_path, boundnoxes, unique_list):
    
    image = cv2.imread(image_path)
    class_name = "Group"
    color = colors[int(2) % len(colors)] 
    grouped_objects = {}
    
    for box in boundnoxes:
        x1, y1, x2, y2 = box
        
        grouped_objects[class_name] = []
        grouped_objects[class_name].append(box)
        
        cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
        cv2.putText(image, class_name, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
     
    count_of_groups = len(boundnoxes)
    count_of_persons = [len(unic) for unic in unique_list]
    
    cv2.putText(image, f"Count of Groups: {count_of_groups}", (450,320), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
    cv2.putText(image, f"Count of People in Group: {count_of_persons}", (450,350), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
    
    new_image_path = os.path.splitext(image_path)[0] + '_grouped' + os.path.splitext(image_path)[1]
    cv2.imwrite(new_image_path, image)

    
    text_file_path = os.path.splitext(image_path)[0] + '_data2.txt'
    with open(text_file_path, 'w') as f:
        for class_name, details in grouped_objects.items():
            f.write(f"{class_name}:\n")
            for box in details:
                f.write(f"Coordinates: ({box[0]}, {box[1]}, {box[2]}, {box[3]})\n")

    print(f"Count of groups: {count_of_groups}")
    print(f"Processed {image_path}:")
    print(f"Saved bounding-box image to {new_image_path}")
    print(f"Saved data to {text_file_path}")
    return grouped_objects

In [None]:
grouped_res = drawing_boundboxes('data/test_imgs/720x_yolo.png', boundnoxes, unique_list)

## TASK№3

####  Model for helmets

In [None]:
current_dir = os.getcwd()  

data_path = os.path.join(current_dir, 'data.yaml')
model = YOLO(os.path.join(current_dir, 'yolov9c.pt'))
epochs = 250
batch = 10
image_size = 640

if __name__ =='__main__':
    results = model.train(data = data_path,
                          epochs = epochs,
                          batch = batch,
                          imgsz = image_size,
                          name = 'red',
                          device = 'cuda')

In [74]:
model1 = YOLO('detect/final/weights/best.pt')

In [75]:
def is_inside(box1, box2):
    """
    проверяет, находится ли box2!!! внутри box1!!!
    """
    x1_1, y1_1, x2_1, y2_1 = box1
    x1_2, y1_2, x2_2, y2_2 = box2

    return (x1_1 <= x1_2 <= x2_1 and y1_1 <= y1_2 <= y2_1) or (x1_1 <= x2_2 <= x2_1 and y1_1 <= y2_2 <= y2_1)
    
def process_with_helmets(image_path):
    image = cv2.imread(image_path)
    results = model(image)[0]
    
    image = results.orig_img
    classes_names = results.names
    classes = results.boxes.cls.cpu().numpy()
    boxes = results.boxes.xyxy.cpu().numpy().astype(np.int32)
    
    helmets_results = model1(image)[0]
    image = helmets_results.orig_img
    helmet_classes_names = helmets_results.names
    helmet_classes = helmets_results.boxes.cls.cpu().numpy()
    helmet_boxes = helmets_results.boxes.xyxy.cpu().numpy().astype(np.int32)

    grouped_objects = {}
    unique_helmet_boxes = set()
    count_of_helmets =0
    
    for class_id, box in zip(classes, boxes):
        if class_id == 0:
            class_name = classes_names[int(class_id)]
            color = colors[int(class_id) % len(colors)]
            if class_name not in grouped_objects:
                grouped_objects[class_name] = []
                
            x1, y1, x2, y2 = box
            center_x = (x1 + x2) // 2
            center_y = (y1 + y2) // 2
            grouped_objects[class_name].append((box, (center_x, center_y)))
            
            for h_class_id, h_box in zip(helmet_classes, helmet_boxes):
                if h_class_id == 0:
                    h_class_name = helmet_classes_names[int(h_class_id)]
                    h_color = colors[int(h_class_id) % len(colors)+1]
                    if is_inside(box, h_box):
                        h_box_tuple = tuple(h_box)
                        if h_box_tuple not in unique_helmet_boxes:
                            if h_class_name not in grouped_objects:
                                grouped_objects[h_class_name] = []
                            h_x1, h_y1, h_x2, h_y2 = h_box
                            center_h_x = (h_x1 + h_x2) // 2
                            center_h_y = (h_y1 + h_y2) // 2
                            grouped_objects[h_class_name].append((h_box, (center_h_x, center_h_y)))

                            count_of_helmets += 1
                            unique_helmet_boxes.add(h_box_tuple)

                            cv2.rectangle(image, (h_x1, h_y1), (h_x2, h_y2), h_color, 2)
                            cv2.putText(image, h_class_name, (h_x1, h_y1 - 3), cv2.FONT_HERSHEY_SIMPLEX, 0.4, h_color, 1)
            
            cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
            cv2.putText(image, class_name, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
            
    count_of_persons = len([el for el in classes if el == 0])
     
    cv2.putText(image, f"Count of persons: {count_of_persons}", (440, 365), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
    cv2.putText(image, f"Count of people in helmets: {count_of_helmets}", (440, 380), cv2.FONT_HERSHEY_SIMPLEX, 0.5, h_color, 2)
    cv2.putText(image, f"Count of people without helmets: {count_of_persons - count_of_helmets}", (435, 395), cv2.FONT_HERSHEY_SIMPLEX, 0.5, colors[2], 2)

    new_image_path = os.path.splitext(image_path)[0] + '_yolo_helmet' + os.path.splitext(image_path)[1]
    cv2.imwrite(new_image_path, image)
    print(grouped_objects)

    text_file_path = os.path.splitext(image_path)[0] + '_data_helmet.txt'
    with open(text_file_path, 'w') as f:
        for class_name, details in grouped_objects.items():
            f.write(f"{class_name}:\n")
            for box, center in details:
                f.write(f"Coordinates: ({box[0]}, {box[1]}, {box[2]}, {box[3]}) Center: ({center[0]}, {center[1]})\n")
    
    print(f"Count of persons: {count_of_persons}")
    print(f"Processed {image_path}:")
    print(f"Saved bounding-box image to {new_image_path}")
    print(f"Saved data to {text_file_path}")
    return results, grouped_objects

# PART 2

## TASK№1

In [81]:
def video_processing(image):
        
    results = model.track(image, persist=True, tracker="bytetrack.yaml")[0]
    image = results.orig_img
    classes_names = results.names
    classes = results.boxes.cls.cpu().numpy()
    boxes = results.boxes.xyxy.cpu().numpy().astype(np.int32)
    
    helmets_results = model1.track(image, persist=True, tracker="bytetrack.yaml")[0]
    
    image = helmets_results.orig_img
    helmet_classes_names = helmets_results.names
    helmet_classes = helmets_results.boxes.cls.cpu().numpy()
    helmet_boxes = helmets_results.boxes.xyxy.cpu().numpy().astype(np.int32)

    grouped_objects = defaultdict(list)
    unique_helmet_boxes = set()
    
    for class_id, box, conf, id in zip(classes, boxes, results.boxes.conf, results.boxes.id):
        if class_id == 0 and conf > 0.35:
            class_name = classes_names[int(class_id)]
            color = colors[int(class_id) % len(colors)]
            if class_name not in grouped_objects:
                grouped_objects[class_name] = []
                
            x1, y1, x2, y2 = box
            center_x = (x1 + x2) // 2
            center_y = (y1 + y2) // 2
            grouped_objects[class_name].append((box, (center_x, center_y)))
            
            for h_class_id, h_box, h_conf in zip(helmet_classes, helmet_boxes, helmets_results.boxes.conf):
                if is_inside(box, h_box) and h_conf > 0.45:
                    if h_class_id == 0:
                        h_class_name = helmet_classes_names[int(h_class_id)]
                        h_color = colors[int(h_class_id) % len(colors) + 1]
                        
                        h_box_tuple = tuple(h_box)
                        if h_box_tuple not in unique_helmet_boxes:
                            grouped_objects[h_class_name].append((h_box, (center_x, center_y)))

                            unique_helmet_boxes.add(h_box_tuple)

                            cv2.rectangle(image, (h_box[0], h_box[1]), (h_box[2], h_box[3]), h_color, 2)
                            cv2.putText(image, h_class_name, (h_box[0], h_box[1] - 3), cv2.FONT_HERSHEY_SIMPLEX, 0.4, h_color, 1)
            
            cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
            cv2.putText(image,f"id:{id} " + class_name, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
            
    count_of_persons = len([el for el in classes if el == 0])
    count_of_helmets = len(grouped_objects.get('helmet', []))
     
    cv2.putText(image, f"Count of persons: {count_of_persons}", (1350, 800), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
    cv2.putText(image, f"Count of people in helmets: {count_of_helmets}", (1350, 840), cv2.FONT_HERSHEY_SIMPLEX, 1, colors[1], 2)
    cv2.putText(image, f"Count of people without helmets: {count_of_persons - count_of_helmets}", (1350, 880), cv2.FONT_HERSHEY_SIMPLEX, 1, colors[2], 2)

    return results, grouped_objects, image

In [None]:
video_path = "data/test_imgs/video_tracking.mp4"
cap = cv2.VideoCapture(video_path)

# Проверка успешного открытия видео
if not cap.isOpened():
    print(f"Ошибка открытия {video_path}")
    exit()

# Получение FPS видео
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Настройка VideoWriter для сохранения выходного видео
output_video_path = 'data/test_imgs/video_tracking_yolo.mp4'
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

track_history = defaultdict(lambda: [])
unique_ids = set()

while cap.isOpened():
    success, frame = cap.read()

    if not success:
        print("Конец видео")
        break

    # Обработка кадра с использованием функции video_processing
    results, objects, annotated_frame = video_processing(frame)
    if results and results.boxes and results.boxes.id is not None:
        
        boxes = results.boxes.xyxy.cpu().numpy()
        track_ids = results.boxes.id.int().cpu().tolist()

        helmet_boxes = {tuple(h_box) for h_box, _ in objects['helmet']}
        unique_ids.update(track_ids)

        for box, track_id in zip(boxes, track_ids):
            if all(not is_inside(box, np.array(h_box)) for h_box in helmet_boxes):
                x1, y1, x2, y2 = box
                center_x = (x1 + x2) // 2
                center_y = (y1 + y2) // 2

                if track_history[track_id]:
                    last_center = track_history[track_id][-1]
                    distance = math.dist((center_x, center_y), last_center)
                    if distance > 100:
                        track_history[track_id] = []

                track_history[track_id].append((center_x, center_y))  # добавление координат центра объекта в историю
                if len(track_history[track_id]) > 30:  # ограничение длины истории до 30 кадров
                    track_history[track_id].pop(0)

                points = np.array(track_history[track_id], dtype=np.int32).reshape((-1, 1, 2))
                cv2.polylines(annotated_frame, pts=[points], isClosed=False, color=(230, 230, 230), thickness=2)

        cv2.putText(annotated_frame, f"Count of track ids: {len(unique_ids)}", (1350, 920), cv2.FONT_HERSHEY_SIMPLEX, 1, colors[1], 2)

        out.write(annotated_frame)
    else:
        out.write(frame) 

cap.release()
out.release()