<a href="https://colab.research.google.com/github/alecseiterr/safe_city/blob/main/Anton_Shalin/Grounding_Dino_Labeling_for_YOLO8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Модуль разбора видео на кадры

В этом модуле автоматически разбираются видеофайлы на отдельные кадры и складываются в папку на примонтированном Google Drive. Эти изображения далее идут в следующий модуль детекции сущностей для формирования датасета в формате Yolo8.

__Параметры:__

step_ms = 250 # Интервал извлечения кадров в мсек

source = 'local' или 'YouTube' # Читаем файл из примонтированной папки Google Drive или из YouTube

capture_path = '/content/drive/MyDrive/UII/Capture/' # Куда сохраняются нарезанные изображения кадров

video_path = '/content/drive/MyDrive/UII/Video/' # Где хранятся разбираемые видео (разберутся все видео, какие лежат в папке, внимательно следите за ее содержимым)

video_url = "https://www.youtube.com/watch?v=5wwH7Hll0xE&pp=ygUOWW9nYSB0aW1lbGFwc2U%3D"  # Пример ссылки на ролик в Youtube, который хотим разобрать (по окончанию работы видеофайл .mp4 будет тоже лежать в video_path)


In [None]:
!pip install pytube

Collecting pytube
  Downloading pytube-15.0.0-py3-none-any.whl (57 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.6/57.6 kB[0m [31m993.7 kB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pytube
Successfully installed pytube-15.0.0


In [None]:
!rm -r sample_data

rm: cannot remove 'sample_data': No such file or directory


In [None]:
import cv2
import os
import shutil
from google.colab import drive
from pytube import YouTube

In [None]:
step_ms = 250 # Интервал извлечения кадров в мсек

# Параметр source определяет откуда будем парсить: из YouTube или сохраненное на Google Disk

#source = 'local' # Читаем файл из Google Drive, иначе из YouTube
source = 'YouTube'

# Монтирование Google Drive
drive.mount('/content/drive')

capture_path = '/content/drive/MyDrive/UII/Capture/'
video_path = '/content/drive/MyDrive/UII/Video/'

#video_url = "https://www.youtube.com/watch?v=5wwH7Hll0xE&pp=ygUOWW9nYSB0aW1lbGFwc2U%3D"  # YouTube Yoga Timelapse
video_url = "https://www.youtube.com/watch?v=0pKQ8E5cFIE"  # Подскальзывания
#video_url = "https://www.youtube.com/watch?v=GZcghScPe6Y" # Подборка разных падений
#video_url = "https://www.youtube.com/watch?v=o-N3ImykVuI" # Падения разные

# Функция очистки папки от предыдущих результатов
def cleaner_folder(path):
    for file_name in os.listdir(path):
        file_path = os.path.join(path, file_name)
        try:
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
            elif os.path.isdir(file_path):
                shutil.rmtree(file_path)
        except Exception as e:
            print('Не удалось удалить %s. Причина: %s' % (file_path, e))

# очистим папку Capture от предыдущих результатов
cleaner_folder(capture_path)

# Проверка существования пути для видео, если нет, то создаем
if not os.path.exists(video_path):
    os.makedirs(video_path)

# Проверка существования пути для кадров, если нет, то создаем
if not os.path.exists(capture_path):
    os.makedirs(capture_path)

# Функция для скачивания видео с YouTube
def download_youtube_video(url, path):
    yt = YouTube(url)
    # Выбор потока с наивысшим разрешением
    video_stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()
    video_stream.download(path)
    return os.path.join(path, video_stream.default_filename)

# Функция выделения кадров из видео
def extract_frames(video_file, step_ms):
    vidcap = cv2.VideoCapture(video_file)
    success, image = vidcap.read()
    count = 0

    print(f"Начало обработки видео: {video_file}")

    while success:
        # Сохранение кадра каждые 'step_ms' миллисекунд
        vidcap.set(cv2.CAP_PROP_POS_MSEC, (count * step_ms))
        success, image = vidcap.read()
        if success:
            # Формирование имени файла кадра
            frame_filename = f"{os.path.splitext(os.path.basename(video_file))[0]}_frame{count}.png"
            cv2.imwrite(os.path.join(capture_path, frame_filename), image)
            count += 1

    print(f"Обработка видео завершена: {video_file}")

if source == 'local':
    # Чтение и обработка каждого файла в папке
    for filename in os.listdir(video_path):
        if filename.endswith(".mp4"): # Проверка, что файл является видео
            extract_frames(os.path.join(video_path, filename), step_ms)
else:
    downloaded_video_path = download_youtube_video(video_url, video_path)
    extract_frames(downloaded_video_path, step_ms)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Начало обработки видео: /content/drive/MyDrive/UII/Video/Люди поскальзываются на неубранном льду.mp4
Обработка видео завершена: /content/drive/MyDrive/UII/Video/Люди поскальзываются на неубранном льду.mp4


## Модуль детекции объектов по текстовому описанию на картинках. Используется Grounding Dino

В этом модуле производится автоматическая детекция сущностей по текстовому промту целиком во всей папке с сырыми изображениями. Результат сохраняется в виде размеченного датасета по структуре Yolo8.

__Параметры:__

BOX_TRESHOLD = 0.43 # Порог вероятности обнаружения искомой сущности

TEXT_TRESHOLD = 0.45 # Тоже параметрический порого grounding dino. Влияния на результат детекции не обнаружил, можно не менять.

TEXT_PROMPT = 'fall' # Текстовое название сущности, которую хотим детектить на изображениях.

capture_path = '/content/drive/MyDrive/UII/Capture/' # Откуда берем картинки на анализ (результат работы Молуля 1 или можно залить по этому пути любые свои картинки)

annotated_path = '/content/drive/MyDrive/UII/Capture/Annotated/' # Куда складываются изображения с обнаруженными сущностями. На картинках наложены прямоугольноки bbox/. На Шаге 2 именно в этой папке вручную удаляем лишние изображения.

yolo_path = '/content/drive/MyDrive/UII/Capture/Yolo8/' # Куда сохранится итоговый датасет в формате Yolo8.

### Шаг подготовки окружения, модели, весов, путей, рабочих функций.

In [None]:
!pip install wget

Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: wget
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9655 sha256=aa9a098f5f1eaf9471f5e8e66229f23f93f6de9a0c448f400a1dfa7109b2fdfc
  Stored in directory: /root/.cache/pip/wheels/8b/f1/7f/5c94f0a7a505ca1c81cd1d9208ae2064675d97582078e6c769
Successfully built wget
Installing collected packages: wget
Successfully installed wget-3.2


In [None]:
import torch
import numpy as np
import random
import os
import shutil
import wget
from pathlib import Path
from PIL import Image
import matplotlib.pyplot as plt
import cv2
from google.colab import drive
drive.mount('/content/drive')
from google.colab.patches import cv2_imshow
from PIL import Image, ImageDraw
%matplotlib inline
import math
import warnings
warnings.filterwarnings("ignore")

Mounted at /content/drive


In [None]:
HOME = os.getcwd()
print(HOME)

/content


In [None]:
%cd {HOME}
!git clone https://github.com/IDEA-Research/GroundingDINO.git

/content
Cloning into 'GroundingDINO'...
remote: Enumerating objects: 421, done.[K
remote: Counting objects: 100% (189/189), done.[K
remote: Compressing objects: 100% (63/63), done.[K
remote: Total 421 (delta 144), reused 126 (delta 126), pack-reused 232[K
Receiving objects: 100% (421/421), 12.85 MiB | 25.41 MiB/s, done.
Resolving deltas: 100% (216/216), done.


In [None]:
%cd {HOME}/GroundingDINO
!pip install -e .

/content/GroundingDINO
Obtaining file:///content/GroundingDINO
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting addict (from groundingdino==0.1.0)
  Downloading addict-2.4.0-py3-none-any.whl (3.8 kB)
Collecting yapf (from groundingdino==0.1.0)
  Downloading yapf-0.40.2-py3-none-any.whl (254 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m254.7/254.7 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting timm (from groundingdino==0.1.0)
  Downloading timm-0.9.12-py3-none-any.whl (2.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m20.0 MB/s[0m eta [36m0:00:00[0m
Collecting supervision (from groundingdino==0.1.0)
  Downloading supervision-0.18.0-py3-none-any.whl (86 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m86.7/86.7 kB[0m [31m12.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: addict, yapf, supervision, timm, groundingdino
  Running setup.py develop for groundi

In [None]:
# загрузка весов GroundingDINO

%cd {HOME}
os.makedirs(os.path.join(HOME, 'weights'), exist_ok=True)
%cd {HOME}/weights
url = 'https://github.com/IDEA-Research/GroundingDINO/releases/download/v0.1.0-alpha/groundingdino_swint_ogc.pth'
wget.download(url, os.getcwd())

/content
/content/weights


'/content/weights/groundingdino_swint_ogc.pth'

In [None]:
# Загрузка модели GroundingDINO

%cd {HOME}/GroundingDINO

from groundingdino.util.inference import load_model, load_image, predict, annotate
import supervision as sv

/content/GroundingDINO


In [None]:
# глобальные параметры и переменные

CONFIG_PATH = os.path.join(HOME, "GroundingDINO/groundingdino/config/GroundingDINO_SwinT_OGC.py")
print(CONFIG_PATH, "; exist:", os.path.isfile(CONFIG_PATH))
WEIGHTS_NAME = "groundingdino_swint_ogc.pth"
WEIGHTS_PATH = os.path.join(HOME, "weights", WEIGHTS_NAME)
print(WEIGHTS_PATH, "; exist:", os.path.isfile(WEIGHTS_PATH))
BOX_TRESHOLD = 0.43
TEXT_TRESHOLD = 0.45
TEXT_PROMPT = 'fall' # убедитесь, что дальше в функции create_yolo_directory_structure указан этот же промт
id_class = 0

/content/GroundingDINO/groundingdino/config/GroundingDINO_SwinT_OGC.py ; exist: True
/content/weights/groundingdino_swint_ogc.pth ; exist: True


Функции для распознавания через grounding dino

In [None]:
def transform_bbox_coords(bbox):
    '''
    Трансформация координат ббокса из формата х_центра, у_центра, ширина, высота в x_min, y_min, x_max, y_max.
    '''
    x_cent, y_cent, width, height = bbox
    x_min = int(x_cent - width/2)
    y_min = int(y_cent - height/2)
    x_max = int(x_cent + width/2)
    y_max = int(y_cent + height/2)
    return [x_min, y_min, x_max, y_max]

In [None]:
def gdino_inference(image_path, text_prompt, bbox_lines_whidth=3, different_bbox_colors_on_image=False, return_image_only=False):
    image_source, image = load_image(image_path)
    boxes, logits, phrases = predict(model=model, image=image, caption=text_prompt, box_threshold=BOX_TRESHOLD, text_threshold=TEXT_TRESHOLD)

    # Проверяем, обнаружен ли объект
    object_detected = len(boxes) > 0
    bbox_data_for_txt = []  # Для сохранения данных bbox для YOLO
    imagePIL = Image.fromarray(image_source)  # Инициализация imagePIL

    if object_detected:
        width, height = imagePIL.size
        bboxes = (boxes * torch.Tensor(imagePIL.size).tile((boxes.size()[0], int(boxes.size()[1]/2)))).to(dtype=torch.int16).tolist()
        color = tuple(np.random.randint((255, 255, 255)))

        if not return_image_only:
            draw = ImageDraw.Draw(imagePIL)

        for bbox in bboxes:
            transformed_bbox = transform_bbox_coords(bbox)
            if not return_image_only:
                if different_bbox_colors_on_image:
                    color = tuple(np.random.randint((255, 255, 255)))
                draw.rectangle(transformed_bbox, outline=color, width=bbox_lines_whidth)

            # Конвертация и сохранение данных bbox для YOLO
            x_cent, y_cent, w, h = bbox
            x_cent, y_cent, w, h = x_cent / width, y_cent / height, w / width, h / height
            bbox_data_for_txt.append(f'{id_class} {x_cent} {y_cent} {w} {h}')

    return imagePIL, object_detected, bbox_data_for_txt

Вспомогательные функции для манипуляций с файлами и структурой yolo

In [None]:
# Функция сохранения данных bbox
def save_bbox_data(filename, bbox_data, save_path):
    txt_filename = filename.replace('.png', '.txt')
    with open(os.path.join(save_path, txt_filename), 'w') as file:
        for line in bbox_data:
            file.write(line + '\n')

# Функция для копирования и создания .txt файлов в yolo_path
def prepare_yolo_dataset():
    for filename in os.listdir(capture_path):
        if filename.endswith(".png") and os.path.exists(os.path.join(annotated_path, filename)):
            # Копирование изображения
            shutil.copy(os.path.join(capture_path, filename), os.path.join(yolo_path, 'train/images', filename))
            # Чтение и копирование данных bbox
            txt_filename = filename.replace('.png', '.txt')
            shutil.copy(os.path.join(capture_path, txt_filename), os.path.join(yolo_path, 'train/labels', txt_filename))

# Функция для создания структуры Yolo8: изображения, метки и файл структуры данных data.yaml
def create_yolo_directory_structure():
    train_path = os.path.join(yolo_path, 'train')
    images_path = os.path.join(train_path, 'images')
    labels_path = os.path.join(train_path, 'labels')

    os.makedirs(images_path, exist_ok=True)
    os.makedirs(labels_path, exist_ok=True)

    data_yaml_content = """
train: ../train/images
val: ../valid/images
test: ../test/images

nc: 1
names: ['fall']
"""
    with open(os.path.join(yolo_path, 'data.yaml'), 'w') as file:
        file.write(data_yaml_content)

# Функция очистки папки от предыдущих результатов
def cleaner_folder(path):
    for file_name in os.listdir(path):
        file_path = os.path.join(path, file_name)
        try:
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
            elif os.path.isdir(file_path):
                shutil.rmtree(file_path)
        except Exception as e:
            print('Не удалось удалить %s. Причина: %s' % (file_path, e))

In [None]:
# Создание экземпляра модели
model = load_model(CONFIG_PATH, WEIGHTS_PATH)

final text_encoder_type: bert-base-uncased


tokenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

### Шаг 1: Обработка исходных изображений и аннотация с наложенными прямоугольниками bbox. Плюс сохранение параметров bbox.

На этом шаге обрабатываются изображения с наложением на них bbox. Сохраняются аннотированные изображения в annotated_path для визуальной оценки на Шаге 2. Одновременно координаты bbox сохраняются в памяти для использования на Шаге 3.

In [None]:
# Задаем рабочие пути
capture_path = '/content/drive/MyDrive/UII/Capture/'
annotated_path = '/content/drive/MyDrive/UII/Capture/Annotated/'
yolo_path = '/content/drive/MyDrive/UII/Capture/Yolo8/'

# Проверка и создание путей для аннотированных изображений
if not os.path.exists(annotated_path):
    os.makedirs(annotated_path)
if not os.path.exists(yolo_path):
    os.makedirs(yolo_path)

total_files = len(os.listdir(capture_path))
processed = 0

cleaner_folder(annotated_path)

# Словарь для хранения данных bbox всех обработанных изображений
bbox_data_dict = {}

# Основной рабочий цикл для аннотации изображений
for filename in os.listdir(capture_path):
    if filename.endswith(".png"):
        image_path = os.path.join(capture_path, filename)
        print(f"Обрабатывается файл {filename}")

        result_image, object_detected, bbox_data = gdino_inference(image_path, TEXT_PROMPT, return_image_only=False)

        if object_detected:
            annotated_image_path = os.path.join(annotated_path, filename)
            # Сохранение аннотированного изображения
            result_image.save(annotated_image_path)
            print(f"Объект обнаружен и сохранен в {annotated_image_path}")

            # Сохранение данных bbox в словарь
            bbox_data_dict[filename] = bbox_data

Обрабатывается файл Люди поскальзываются на неубранном льду_frame0.png
Обрабатывается файл Люди поскальзываются на неубранном льду_frame1.png
Обрабатывается файл Люди поскальзываются на неубранном льду_frame2.png
Обрабатывается файл Люди поскальзываются на неубранном льду_frame3.png
Обрабатывается файл Люди поскальзываются на неубранном льду_frame4.png
Обрабатывается файл Люди поскальзываются на неубранном льду_frame5.png
Объект обнаружен и сохранен в /content/drive/MyDrive/UII/Capture/Annotated/Люди поскальзываются на неубранном льду_frame5.png
Обрабатывается файл Люди поскальзываются на неубранном льду_frame6.png
Объект обнаружен и сохранен в /content/drive/MyDrive/UII/Capture/Annotated/Люди поскальзываются на неубранном льду_frame6.png
Обрабатывается файл Люди поскальзываются на неубранном льду_frame7.png
Объект обнаружен и сохранен в /content/drive/MyDrive/UII/Capture/Annotated/Люди поскальзываются на неубранном льду_frame7.png
Обрабатывается файл Люди поскальзываются на неубранном

### Шаг 2: Ручная очистка аннотированных изображений

Вы вручную удаляете неподходящие изображения из папки Annotated.

### Шаг 3: Создание структуры YOLO и копирование подходящих изображений и параметров bbox.

На этом шаге в /Yolo8/train/images копируются только те изображения из capture_path, которые совпадают по названию с изображениями в annotated_path, и в /Yolo8/train/labels создаются соответствующие .txt файлы с координатами bbox.

In [None]:
# Создание структуры папок YOLO
create_yolo_directory_structure()

# Копирование изображений и создание .txt файлов в папки YOLO
for filename in os.listdir(annotated_path):
    if filename.endswith(".png"):
        original_image_path = os.path.join(capture_path, filename)

        if os.path.exists(original_image_path):
            # Копирование изображения
            shutil.copy(original_image_path, os.path.join(yolo_path, 'train/images', filename))
            print(f"Изображение скопировано для {filename}")

            # Создание и сохранение данных bbox в .txt файл
            if filename in bbox_data_dict:
                save_bbox_data(filename, bbox_data_dict[filename], os.path.join(yolo_path, 'train/labels'))
                print(f"Данные bbox сохранены для {filename}")


Изображение скопировано для Люди поскальзываются на неубранном льду_frame9.png
Данные bbox сохранены для Люди поскальзываются на неубранном льду_frame9.png
Изображение скопировано для Люди поскальзываются на неубранном льду_frame11.png
Данные bbox сохранены для Люди поскальзываются на неубранном льду_frame11.png
Изображение скопировано для Люди поскальзываются на неубранном льду_frame12.png
Данные bbox сохранены для Люди поскальзываются на неубранном льду_frame12.png
Изображение скопировано для Люди поскальзываются на неубранном льду_frame14.png
Данные bbox сохранены для Люди поскальзываются на неубранном льду_frame14.png
Изображение скопировано для Люди поскальзываются на неубранном льду_frame58.png
Данные bbox сохранены для Люди поскальзываются на неубранном льду_frame58.png
Изображение скопировано для Люди поскальзываются на неубранном льду_frame59.png
Данные bbox сохранены для Люди поскальзываются на неубранном льду_frame59.png
Изображение скопировано для Люди поскальзываются на не