# Установка зависимостей

In [None]:
! pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu128
! pip install opencv-python wandb ultralytics 

# Я использую weights and biases для мониторинга экспериментов
! yolo settings wandb=True

# Извлечение изображений

In [None]:
import cv2
from pathlib import Path
import numpy as np
import yaml
import os
from tqdm import tqdm

train = ["1.MOV", "2_1.MOV", "4.MOV"]
val = ["3_1.MOV"]
test = ["3_2.MOV"]

path_to_videos = Path("data/videos_raw")
dataset_path = Path("data/dataset")

with open(dataset_path/"data.yaml", 'r') as f:
    dataset_meta = yaml.safe_load(f)
    # CVAT все еще не может нормально экспортировать yolo формат(
with open(dataset_path/"data.yaml", 'w') as f:
    dataset_meta["train"] = "images/Train"
    dataset_meta["val"] = "images/Validation"
    dataset_meta["test"] = "images/Test"
    dataset_meta["path"] = "data/dataset"
    yaml.safe_dump(dataset_meta, f)

classes = dataset_meta["names"]
print(classes)

In [None]:

def read_yolo_labels(label_path):
    labels = []
    with open(label_path, 'r') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) != 5:
                continue  # пропускаем неправильные строки
            class_id = int(parts[0])
            x_center = float(parts[1])
            y_center = float(parts[2])
            width = float(parts[3])
            height = float(parts[4])
            labels.append({
                'class_id': class_id,
                'x_center': x_center,
                'y_center': y_center,
                'width': width,
                'height': height
            })
    return labels

def save_yolo_labels(label_path, labels):
    with open(label_path, 'w') as f:
        for label in labels:
            line = f"{label['class_id']} {label['x_center']:.6f} {label['y_center']:.6f} {label['width']:.6f} {label['height']:.6f}\n"
            f.write(line)

def flip_frame_and_labels(frame, labels, horizontal=False, vertical=False):
    h, w = frame.shape[:2]

    # Отразим изображение
    if horizontal and vertical:
        frame = cv2.flip(frame, -1)
    elif horizontal:
        frame = cv2.flip(frame, 1)
    elif vertical:
        frame = cv2.flip(frame, 0)

    # Отразим лейблы
    flipped_labels = []
    for label in labels:
        x = label['x_center']
        y = label['y_center']

        if horizontal:
            x = 1.0 - x
        if vertical:
            y = 1.0 - y

        flipped_labels.append({
            'class_id': label['class_id'],
            'x_center': x,
            'y_center': y,
            'width': label['width'],
            'height': label['height']
        })

    return frame, flipped_labels

def draw_boxes(img, labels, class_names):
    img_h, img_w = img.shape[:2]
    def get_color(class_id):
        return (30*class_id % 255, 70*class_id % 255, abs(255 - 50 * class_id) % 255)

    for label in labels:
        # Преобразуем нормализованные координаты в пиксели
        x_center = label['x_center'] * img_w
        y_center = label['y_center'] * img_h
        width = label['width'] * img_w
        height = label['height'] * img_h

        # Координаты бокса (левая верхняя и правая нижняя точки)
        x1 = int(x_center - width / 2)
        y1 = int(y_center - height / 2)
        x2 = int(x_center + width / 2)
        y2 = int(y_center + height / 2)
        class_id = label['class_id']
        color = get_color(class_id)
        # Рисуем прямоугольник
        cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)

        # Подпись с названием класса
        label_text = class_names[class_id] if class_names else str(class_id)
        cv2.putText(img, label_text, (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

    return img

for name, videos in tqdm([("Train", train), ("Validation", val), ("Test", test)]):
    frame_counter = 0
    for video in videos:
        cap = cv2.VideoCapture(path_to_videos / video)
        while True:
            res, frame = cap.read()
            if not res:
                break
            
            # уменьшаем размер в 16 раз, 4к в данный момент нам не нужно
            frame = cv2.resize(frame, None,fx=0.25, fy=0.25) 
            #Opencv зачем то повернуло видео на 90 градусов, разворачиваем обратно
            #print(frame.shape)
            frame = frame.transpose(1,0,2)[:,::-1,:].copy()

            label_path = dataset_path / "labels" / name / f"frame_{frame_counter:06d}.txt"
            labels = read_yolo_labels(label_path)

            # Т.к у нас очень сильный data leakage перевернем изображения из тестового набора вертикально,
            # чтобы иммитировать другой сценарий и убедится что модель не тупо заучила все возможные положения меток
            if name in ("Test"):
                frame, labels = flip_frame_and_labels(frame, labels, vertical=True)
                save_yolo_labels(label_path, labels)

            # используем каждый 5 кадр для ускорения экспериментов
            if frame_counter % 5 == 0:
                if not os.path.exists(dataset_path / "images" / name):
                    os.makedirs(dataset_path / "images" / name)

            cv2.imwrite(dataset_path / "images" / name / f"frame_{frame_counter:06d}.png", frame)

            #Сразу отображаем экспортированные аннотации, что бы убедиться в корректности данных
            draw_boxes(frame, labels, classes)

            frame_counter +=1

            cv2.imshow("frame", frame)
            key = cv2.waitKey(1)
            if ord('q') == key:
                break


cv2.destroyAllWindows()

# Baseline тренировка

In [None]:
from ultralytics import YOLO

In [None]:
model = YOLO("yolo11n.pt") #nano модель что бы не переобучить

# Запустим тренировку на 50 эпох что бы посмотреть как быстро модель переобучится
model.train(
    data="data/dataset/data.yaml",  
    epochs=50,                 
    imgsz=640,                  
    batch=32,               
    project="Zebra-test",      
    name="experiments/baseline",     
)

### Проверяем на отложенной выборке (видео перевернуто, для имитации другого сценария)

In [None]:
model = YOLO("Zebra-test/experiments/baseline/weights/best.pt")

metrics = model.val(data="data/dataset/data.yaml", split="test")
print(metrics.box.map50) 

### Визуализируем предсказания baseline на тестовом видео

In [None]:
os.makedirs("results", exist_ok=True)

video_path = "data/videos_raw/3_2.MOV"
output_path = "results/baseline.mp4"

def save_video(model, video_path, output_path):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    # В обратном порядке, что бы перевернуть видео
    width  = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    height = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame = frame.transpose(1,0,2)[:,::-1,:].copy()

        results = model.predict(source=frame, stream=False, show=False, conf=0.25)[0]
        annotated_frame = results.plot()
        out.write(annotated_frame)

    cap.release()
    out.release()
    print(f"✅ Saved: {output_path}")

save_video(model, video_path, output_path)

# Добавляем аугментаций

In [None]:
# Все еще nano модель
model = YOLO("yolo11n.pt")

# Так же я позволил себе явно указать используемый оптимайзер, и изменить learning rate
# но забыл указать что это сделал в отчете
model.train(
    data="data/dataset/data.yaml",  
    epochs=50,                 
    imgsz=640,                  
    batch=32,
    optimizer = "AdamW", 
    lr0 = 3e-4, #Karpatov magic constant for AdamW             
    project="Zebra-test",      
    name="experiments/augs_tuned", 
    warmup_epochs=3,
    translate = 0.1,
    mosaic= 0.3,
    #close_mosaic= 3,
    mixup= 0.2,
    # Не используем отражение по вертикали, потому что это будет читерство в данном контексте
    #flipud= 0.5, 
    fliplr= 0.5,
    scale= 0.5,
    copy_paste = 0.4,
    erasing = 0.2,
    shear = 15,
    degrees= 90, 
    hsv_h = 0.1,
    hsv_s = 0.1,
    hsv_v = 0.5,
    cos_lr = True, 
)

### Проверяем на отложенной выборке

In [None]:
model = YOLO("Zebra-test/experiments/augs_tuned/weights/best.pt")

metrics = model.val(data="data/dataset/data.yaml", split="test")
print(metrics.box.map50) 

### Сохраняем предсказания улучшенной модели

In [None]:
save_video(model, video_path, "results/tuned.mp4")

# Бонус - как получить наивысшие метрики если вы вообще ничего не понимаете в ML
### Создаем новый датасет из вообще всех размеченных изображений, 20% перекладываем в валидацию, 20% в тест

In [None]:
import shutil
import random
from pathlib import Path


original_dataset = Path("data/dataset")  # Путь к оригинальному датасету
silly_dataset = Path("data/silly_dataset")  # Куда складываем новый датасет
silly_dataset.mkdir(parents=True, exist_ok=True)

# Собираем все изображения
all_images = list(original_dataset.glob("images/*/*.png")) + \
             list(original_dataset.glob("images/*/*.jpg")) + \
             list(original_dataset.glob("images/*/*.jpeg"))

random.shuffle(all_images)

# Разделение
n = len(all_images)
val_split = int(n * 0.2)
test_split = int(n * 0.2)

val_images = all_images[:val_split]
test_images = all_images[val_split:val_split + test_split]
train_images = all_images[val_split + test_split:]

splits = {
    "Train": train_images,
    "Validation": val_images,
    "Test": test_images
}

# Копируем изображения и соответствующие метки
for split_name, image_list in splits.items():
    image_dir = silly_dataset / "images" / split_name
    label_dir = silly_dataset / "labels" / split_name
    image_dir.mkdir(parents=True, exist_ok=True)
    label_dir.mkdir(parents=True, exist_ok=True)

    for img_path in image_list:
        # Копируем изображение
        new_img_path = image_dir / img_path.name
        shutil.copy(img_path, new_img_path)

        # Соответствующий .txt файл
        label_path = original_dataset / "labels" / img_path.parent.name / (img_path.stem + ".txt")
        if label_path.exists():
            shutil.copy(label_path, label_dir / label_path.name)

# Записываем новый data.yaml
yaml_content = f"""path: {silly_dataset.resolve()}
train: images/Train
val: images/Validation
test: images/Test

names:
  0: dish
  1: drink
  2: silverware
  3: garbage/other
"""

with open(silly_dataset / "data.yaml", "w") as f:
    f.write(yaml_content)

print("✅ Silly dataset создан в:", silly_dataset.resolve())

### Берем модель побольше, потому что больше - лучше)))

In [None]:
model = YOLO("yolo11l.pt")

# Train the model
model.train(
    data="data/silly_dataset/data.yaml",  
    epochs=50,                 
    imgsz=640,                  
    batch=32,
    optimizer = "AdamW",
    lr0 = 3e-4, #Karpatov magic constant for AdamW             
    project="Zebra-test",      
    name="experiments/silly", 
    warmup_epochs=3,
    translate = 0.1,
    mosaic= 0.3,
    #close_mosaic= 3,
    mixup= 0.2,
    #flipud= 0.5, # Не используем отражение по вертикали, потому что это будет читерство в данном контексте
    fliplr= 0.5,
    scale= 0.5,
    copy_paste = 0.4,
    erasing = 0.2,
    shear = 15,
    degrees= 90, 
    hsv_h = 0.1,
    hsv_s = 0.1,
    hsv_v = 0.5,
    cos_lr = True, 
)

### Тестируем на отложенной выборке, которая пришла из того же распределения что и тренировочная

In [None]:
model = YOLO("Zebra-test/experiments/silly/weights/best.pt")

metrics = model.val(data="data/dataset/data.yaml", split="test")
print(metrics.box.map50) 

### Наслаждаемся качеством ~~пере~~обучения

In [None]:
save_video(model, video_path, "results/overfitted.mp4")