# Установка библиотек

In [None]:
import os
import time
import random
import shutil
from pathlib import Path

import numpy as np

import matplotlib.pyplot as plt
import matplotlib.image as mpimg
%matplotlib inline

from IPython import display as IPdisplay
from PIL import Image
import cv2

#import pickle
import zipfile
import yaml

In [None]:
!pip install ultralytics

In [None]:
import ultralytics
from ultralytics import YOLO, settings, SAM
from ultralytics.models.sam import Predictor as SAMPredictor

ultralytics.checks()

# Подготовка датасета и выборок

## Скачивание данных и разметки

In [None]:
# Скачиваем датасеты и аннотации по расшаренным ссылкам с Google-диска
!gdown 1YSV*************************ZsxI
!gdown 1CsT*************************_rbd
!gdown 18gN*************************K_8Y

In [None]:
# Распаковываем все скачанные архивы, удаляем служебные файлы и папки
TEMP_PATH = "/content/temp"
!rm -fr {TEMP_PATH}
!mkdir -p {TEMP_PATH}
!unzip -qo All_annotations.zip -d {TEMP_PATH}/labels
!unzip -qo Correct_image.zip -d {TEMP_PATH}/images
!unzip -qo correct_airport.zip -d {TEMP_PATH}/images
!rm -fr {TEMP_PATH}/images/__MACOSX
!find $TEMP_PATH -type f -name '.*' -exec rm -f {} +

In [None]:
# Сливаем все метки в одну папку
!mkdir -p /content/temp/labels/all
!cp /content/temp/labels/base_dataset/* /content/temp/labels/all
!cp /content/temp/labels/airport_dataset/* /content/temp/labels/all
!echo "Всего меток: $(find /content/temp/labels/all -maxdepth 1 -type f | wc -l)"

In [None]:
# Сливаем все изображения в одну папку
!mkdir -p /content/temp/images/all
!find /content/temp/images/Correct_image -type f -exec cp {} /content/temp/images/all \;
!find /content/temp/images/correct_airport -type f -exec cp {} /content/temp/images/all \;
!echo "Всего изображений: $(find /content/temp/images/all -maxdepth 1 -type f | wc -l)"

## Константы, формирование датасета

In [None]:
# Пути для датасета и разметки
BASE_PATH = '/content'                                      # /content
DATASET_PATH = os.path.join(BASE_PATH, 'dataset')           # /content/dataset

# Временные пути, данные "в кучу", используются только при подготовке датасета
TEMP_PATH = os.path.join(BASE_PATH, 'temp')                 # /content/temp
SRC_DATA_PATH = os.path.join(TEMP_PATH, 'images', 'all')    # /content/temp/images/all
SRC_LABELS_PATH = os.path.join(TEMP_PATH, 'labels', 'all')  # /content/temp/labels/all

# Основные префиксы путей к данным и разметке
OUT_DATA_PATH = os.path.join(DATASET_PATH, 'images')        # /content/dataset/images
OUT_LABELS_PATH = os.path.join(DATASET_PATH, 'labels')      # /content/dataset/labels

# Пути к папкам изображений
TRAIN_IMAGES_PATH = os.path.join(OUT_DATA_PATH, 'train')
TEST_IMAGES_PATH = os.path.join(OUT_DATA_PATH, 'test')
VAL_IMAGES_PATH = os.path.join(OUT_DATA_PATH, 'val')

# Пути к папкам разметки
TRAIN_LABELS_PATH = os.path.join(OUT_LABELS_PATH, 'train')
TEST_LABELS_PATH = os.path.join(OUT_LABELS_PATH, 'test')
VAL_LABELS_PATH = os.path.join(OUT_LABELS_PATH, 'val')

# Некоторые гиперпараметры
VAL_SIZE = 0.15            # Размер проверочной выборки, доля от размера датасета
TEST_SIZE = 0.1            # Размер тестовой выборки, доля от размера датасета
RANDOM_STATE = 5           # Значение для повторения случайной выборки

# YOLO параметры и пути
YOLO_PROJECT = 'xray_yolov8s'
YOLO_DETECT = 'detect'
YOLO_VAL = 'val'
YOLO_TEST = 'test'
YOLO_PREDICT = 'predict'

# Пути для записи результатов
YOLO_PATH_DETECT = os.path.join(BASE_PATH, YOLO_PROJECT, YOLO_DETECT)
YOLO_PATH_VAL = os.path.join(BASE_PATH, YOLO_PROJECT, YOLO_VAL)
YOLO_PATH_TEST = os.path.join(BASE_PATH, YOLO_PROJECT, YOLO_TEST)
YOLO_PATH_PREDICT = os.path.join(BASE_PATH, YOLO_PROJECT, YOLO_PREDICT)

YOLO_YAML_FILE = os.path.join(BASE_PATH, f'{YOLO_PROJECT}.yaml')    # Файл с данными для обучения
YOLO_PATH_WEIGHT = os.path.join(YOLO_PATH_DETECT, 'weights')        # Путь к весам частично обученной модели

# Путь для сохранения частично обученной модели на Google Disk
GDRIVE_SAVE_PATH = '/content/drive/My Drive/УИИ Стажировка Рентген/YOLO8 Results'

In [None]:
# Опционально: Удаление дублирующих меток на разметке, предотвращает YOLO Warning: Duplicate labels removed
# !rm -fr /content/temp/labels/all_cleaned

# Локальные константы: не используются нигде, кроме этой ячейки
CLEANED_LABELS_PATH = os.path.join(TEMP_PATH, 'labels', 'all_cleaned')
ORIG_LABELS_PATH = os.path.join(TEMP_PATH, 'labels', 'all_orig')

updated_files = []

if not os.path.exists(CLEANED_LABELS_PATH):
    os.makedirs(CLEANED_LABELS_PATH)

for filename in os.listdir(SRC_LABELS_PATH):
    src_file_path = os.path.join(SRC_LABELS_PATH, filename)

    if os.path.isfile(src_file_path):
        unique_lines = set()
        # new_lines = []

        with open(src_file_path, 'r') as src_file:
            lines = src_file.readlines()

        # Удаляем дублирующиеся строки внутри файла разметки
        new_lines = [unique_lines.add(line.strip()) or line for line in lines if line.strip() not in unique_lines]

        with open(os.path.join(CLEANED_LABELS_PATH, filename), 'w') as dest_file:
            dest_file.writelines(new_lines)

        if len(lines) != len(new_lines):
            updated_files.append(filename)


if updated_files:
    print('Исправленных файлов разметки (удалены дублирующие метки):', len(updated_files))
    #print(updated_files)


# Переименовываем (меняем местами) папки, теперь:
# /content/temp/labels/all - новые файлы без дублей меток
# /content/temp/labels/all_orig - исходные файлы разметки
!mv {SRC_LABELS_PATH} {ORIG_LABELS_PATH}
!mv {CLEANED_LABELS_PATH} {SRC_LABELS_PATH}

In [None]:
#!zip -rq labels_all.zip /content/temp/labels/all
#!zip -rq labels_all_cleaned.zip /content/temp/labels/all_cleaned

In [None]:
# Создаем структуру папок и меток датасета
!rm -fr {DATASET_PATH}
!mkdir -p {TRAIN_IMAGES_PATH} {TEST_IMAGES_PATH} {VAL_IMAGES_PATH}
!mkdir -p {TRAIN_LABELS_PATH} {TEST_LABELS_PATH} {VAL_LABELS_PATH}

In [None]:
# Подготовка обучающей, проверочной и тестовой выборок для всех существующих меток/аннотаций

# Получаем список всех меток (без расширения файла)
all_labels = sorted([file[:-4] for file in os.listdir(SRC_LABELS_PATH)])

# Размеры выборок в абсолютных числах
val_size_qty = int(len(all_labels) * VAL_SIZE)
test_size_qty = int(len(all_labels) * TEST_SIZE)

# Устанавливаем seed для генерации повторяющейся случайной выборки
random.seed(RANDOM_STATE)

# Создаем случайную выборку валидационных данных и копируем пары данные/метки в выходные папки
for _ in range(val_size_qty):
    randlabel = all_labels.pop(random.randint(0, len(all_labels) - 1))
    shutil.copy(f'{SRC_LABELS_PATH}/{randlabel}.txt', f'{OUT_LABELS_PATH}/val')
    shutil.copy(f'{SRC_DATA_PATH}/{randlabel}.tif', f'{OUT_DATA_PATH}/val')

# Создаем случайную выборку тестовых данных и копируем пары данные/метки в выходные папки
for _ in range(test_size_qty):
    randlabel = all_labels.pop(random.randint(0, len(all_labels) - 1))
    shutil.copy(f'{SRC_LABELS_PATH}/{randlabel}.txt', f'{OUT_LABELS_PATH}/test')
    shutil.copy(f'{SRC_DATA_PATH}/{randlabel}.tif', f'{OUT_DATA_PATH}/test')

# В списке осталась обучающая выборка, копируем пары данные/метки в выходные папки
for label in all_labels:
    shutil.copy(f'{SRC_LABELS_PATH}/{label}.txt', f'{OUT_LABELS_PATH}/train')
    shutil.copy(f'{SRC_DATA_PATH}/{label}.tif', f'{OUT_DATA_PATH}/train')


# Вывод проверочных значений
print(f'Размеры обучающей/проверочной/тестовой выборок: {len(all_labels)} / {val_size_qty} / {test_size_qty}')
print('Общий размер датасета:', len(all_labels) + val_size_qty + test_size_qty, '\n')

# Чтение файлов в папках и сверка соответствия изображений и меток
for folder, name in {'train':'обучающей', 'val':'проверочной', 'test':'тестовой'}.items():
    data = sorted([file[:-4] for file in os.listdir(f'{OUT_DATA_PATH}/{folder}')])
    labels = sorted([file[:-4] for file in os.listdir(f'{OUT_LABELS_PATH}/{folder}')])
    print(f'Данные и метки {name} выборки', 'совпали!' if data == labels else 'РАЗЛИЧАЮТСЯ, ОШИБКА!')

# Визуальная проверка датасета и разметки

## Функции отображения и наложения разметки

In [None]:
# Читает файл разметки и возвращает список словарей вида:
# labels = [{'cls':1, 'x':0.632161, 'y':0.609871, 'w':0.111198, 'h':0.030485}, ...]
def get_labels(label_file):
    labels = []

    with open(label_file, 'r') as file:
        for line in file:
            values = [int(line.split()[0])] + [float(val) for val in line.split()[1:]]
            labels.append(dict(zip(['cls', 'x', 'y', 'w', 'h'], values)))

    return labels

In [None]:
# Накладывает рамки разметки на изображение
def impose_labels(image, labels):
    # Конвертируем изображение в цветное для видимости рамок разметки
    labeled_image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)

    img_height, img_width = image.shape

    for label in labels:
        x, y, w, h = label['x'], label['y'], label['w'], label['h']

        # Заменяем координаты центра рамки на левый верхний угол
        x -= w/2
        y -= h/2

        # Денормализуем float в координаты пикселей
        x, y = int(x * img_width), int(y * img_height)
        w, h = int(w * img_width), int(h * img_height)

        # Цвет рамки в зависимости от класса - красный, жёлтый
        color = (255, 0, 0) if label['cls'] == 0 else (255, 255, 0)

        # Рисуем прямоугольную рамку на изображении
        cv2.rectangle(labeled_image, (x, y), (x + w, y + h), color, thickness=2)

    return labeled_image

In [None]:
# Выводит рядом пару изображений: оригинальное и с наложенной разметкой
def view_pair_images(image_file, label_file, figsize=(10, 10)):
    # Загрузка исходного изображения
    original_image = mpimg.imread(image_file)

    # Создание фигуры, заголовок
    plt.figure(figsize=figsize)
    title = os.path.basename(image_file)

    # Исходное изображения слева
    plt.subplot(1, 2, 1)
    plt.imshow(original_image, cmap='gray')
    plt.title(title)
    plt.axis('off')

    # Накладываем рамки разметки на изображение
    labeled_image = impose_labels(original_image, get_labels(label_file))

    # Изображение с разметкой справа
    plt.subplot(1, 2, 2)
    plt.imshow(labeled_image)
    plt.title(f'{title} (labeled)')
    plt.axis('off')

    # Отображение обеих подграфиков
    plt.tight_layout()
    plt.show()

## Просмотр изображений датасета с разметкой

In [None]:
IMAGES_TO_VIEW = 4      # Сколько пар изображений вывести

# Выбираем случайные имена изображений из тренировочной выборки
all_names = sorted([file[:-4] for file in os.listdir(TRAIN_IMAGES_PATH)])
names = random.sample(all_names, IMAGES_TO_VIEW)

for name in names:
    view_pair_images(os.path.join(TRAIN_IMAGES_PATH, f'{name}.tif'), os.path.join(TRAIN_LABELS_PATH, f'{name}.txt'))

## Проверка размерностей изображений датасета

In [None]:
# Проверка размерностей изображений датасета
widths, heights, colors = [], [], []

for folder_path in [TRAIN_IMAGES_PATH, TEST_IMAGES_PATH, VAL_IMAGES_PATH]:
    tif_files = [f for f in os.listdir(folder_path) if f.endswith('.tif') or f.endswith('.tiff')]

    # Обходим каждый файл в текущей папке
    for filename in tif_files:
        img = Image.open(os.path.join(folder_path, filename))

        # Получаем информацию о изображении и добавляем ее в списки
        width, height = img.size
        widths.append(width)
        heights.append(height)
        colors.append(img.mode)

print(f'Разброс ширины изображений: {min(widths)}...{max(widths)}')
print(f'Разброс высоты изображений: {min(heights)}...{max(heights)}')
print(f'Все уникальные значения глубины цвета: {list(set(colors))} \n(д.б. "L", т.е. градации серого)')

# Обучение модели

## Подготовка к обучению и функции промежуточного сохранения

In [None]:
# Подключаем Google-диск для сохранения частично обученной модели
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Сохраняет частично обученную модель last.pt и архив с результатами обучения на гугл-диск (для запуска с параметром resume=True позднее)
def save_models_and_results():
    suffix = time.strftime('%Y-%m-%d_%H-%M')

    # Копируем автоматически сохраненную Yolo модель с весами на свой гугл-диск
    shutil.copy(os.path.join(YOLO_PATH_WEIGHT, 'best.pt'), os.path.join(GDRIVE_SAVE_PATH, f'best_{suffix}.pt'))
    shutil.copy(os.path.join(YOLO_PATH_WEIGHT, 'last.pt'), os.path.join(GDRIVE_SAVE_PATH, f'last_{suffix}.pt'))

    # Сохраняем результаты xray_yolov8s/detect/*.* (без подпапки /weight) в zip-архив на гугл-диск
    with zipfile.ZipFile(os.path.join(GDRIVE_SAVE_PATH, f'results_{suffix}.zip'), 'w', zipfile.ZIP_DEFLATED) as zipf:
        # Проходим по всем файлам в папке и добавляем их в архив
        for root, _, files in os.walk(YOLO_PATH_DETECT):
            for file in files:
                file_path = os.path.join(root, file)
                # Без подпапок! - в подпапке /weight "тяжелые" модели с весами, мы ее скопировали выше отдельно
                if root == YOLO_PATH_DETECT:
                    arcname = os.path.relpath(file_path, YOLO_PATH_DETECT)
                    zipf.write(file_path, arcname=arcname)


# Копирует самую новую частично обученную модель best.pt с гугл-диска в текущую среду и возвращает полный путь к файлу
def get_best_model_file():
    # Получает имя файла best_*.pt в папке с самыми новыми датой/временем
    modelfile = max((f for f in os.listdir(GDRIVE_SAVE_PATH) if f.startswith('best_') and f.endswith('.pt')),
                    key=lambda f: os.path.getctime(os.path.join(GDRIVE_SAVE_PATH, f)), default=None)

    # Копируем файл частично обученной модели с гугл-диска в текущую среду
    shutil.copy(os.path.join(GDRIVE_SAVE_PATH, modelfile), modelpath := os.path.join(BASE_PATH, modelfile))

    return modelpath

# Копирует самую новую частично обученную модель last.pt с гугл-диска в текущую среду и возвращает полный путь к файлу
def get_last_model_file():
    # Получает имя файла last_*.pt в папке с самыми новыми датой/временем
    modelfile = max((f for f in os.listdir(GDRIVE_SAVE_PATH) if f.startswith('last_') and f.endswith('.pt')),
                    key=lambda f: os.path.getctime(os.path.join(GDRIVE_SAVE_PATH, f)), default=None)

    # Копируем файл частично обученной модели с гугл-диска в текущую среду
    shutil.copy(os.path.join(GDRIVE_SAVE_PATH, modelfile), modelpath := os.path.join(BASE_PATH, modelfile))

    return modelpath

## Гиперпараметры и данные для YOLO

In [None]:
# Гиперпараметры обучения
IMGSZ = 640
BATCH_SIZE = 30
EPOCHS = 100

In [None]:
# Данные для YOLO
yaml_data = {
    'path': DATASET_PATH,                                       # /content/dataset
    'train': os.path.relpath(TRAIN_IMAGES_PATH, DATASET_PATH),  # images/train
    'val': os.path.relpath(VAL_IMAGES_PATH, DATASET_PATH),      # images/val
    'test': os.path.relpath(TEST_IMAGES_PATH, DATASET_PATH),    # images/test
    'nc': 2,                                                    # Кол-во классов
    'names': ['Опасно', 'Внимание']
    }

with open(YOLO_YAML_FILE, 'w+') as f:
    yaml.dump(yaml_data, f, default_flow_style=None, sort_keys=False)

In [None]:
# Удаление рассчетов YOLO с предыдущих тренировок (опционально, если нужно перезапустить обучение)
# (строки с установкой локали раскомментировать, если команда rm выдает ошибку)
# import locale
# locale.getpreferredencoding = lambda: "UTF-8"
#!rm -fr /content/xray_yolov8s

## Этап 1 (первичный)
*Этап был прерван вручную после 40 эпох для дообучения позднее*

In [None]:
# Загружаем предобученную модель
model = YOLO('yolov8s.pt')
model.info()

YOLOv8s summary: 225 layers, 11166560 parameters, 0 gradients


(225, 11166560, 0, 0.0)

In [None]:
results = model.train(data=YOLO_YAML_FILE,
                      batch=BATCH_SIZE, epochs=EPOCHS, imgsz=IMGSZ,
                      project=YOLO_PROJECT, name=YOLO_DETECT)

In [None]:
# Сохраняем частично обученную модель и результаты на Google-диск
save_models_and_results()

## Этап 2 (дообучение с весами предыдущего этапа)

In [None]:
# Загружаем частично обученную модель с гугл-диска
model = YOLO(get_last_model_file())
model.info()

Model summary: 225 layers, 11136374 parameters, 0 gradients


(225, 11136374, 0, 0.0)

In [None]:
results = model.train(data=YOLO_YAML_FILE,
                      batch=BATCH_SIZE, epochs=EPOCHS, imgsz=IMGSZ,
                      project=YOLO_PROJECT, name=YOLO_DETECT,
                      resume=True)                # Возобновляем обучение с прерванного места

In [None]:
# Сохраняем последнюю обученную модель и результаты на Google-диск
save_models_and_results()

# Проверка обученной модели

## На валидационном наборе

In [None]:
# Загружаем частично обученную модель с гугл-диска
trained_model = YOLO(model_file := get_best_model_file())
print('Загружен файл модели:', model_file)

results_val = trained_model.val(data=YOLO_YAML_FILE, project=YOLO_PROJECT, name=YOLO_VAL)

In [None]:
# Выводим метрики
# dir(results_val.box)
print('Метрики на валидационном наборе:')
print(f"{'map50':<21} {results_val.box.map50}")
print(f"{'map75':<21} {results_val.box.map75}")
print(f"{'map':<21} {results_val.box.map}")
print(f"{'map по классам':<21} {results_val.box.maps}")
print(*[f"{key+':':<21} {val}" for key, val in results_val.results_dict.items()], sep='\n')

In [None]:
# Выводим YOLO изображения с метриками
for image_file in sorted([f for f in os.listdir(YOLO_PATH_VAL) if f.endswith(('.png', '.jpg'))]):
    IPdisplay.display(IPdisplay.Image(filename=os.path.join(YOLO_PATH_VAL, image_file), width=900))

## На тестовом наборе

In [None]:
# Загружаем частично обученную модель с гугл-диска
trained_model = YOLO(model_file := get_best_model_file())
print('Загружен файл модели:', model_file)

results_test = trained_model.val(data=YOLO_YAML_FILE, project=YOLO_PROJECT, name=YOLO_TEST)

In [None]:
# Выводим метрики
print('Метрики на тестовом наборе:')
print(f"{'map50':<21} {results_test.box.map50}")
print(f"{'map75':<21} {results_test.box.map75}")
print(f"{'map':<21} {results_test.box.map}")
print(f"{'map по классам':<21} {results_test.box.maps}")
print(*[f"{key+':':<21} {val}" for key, val in results_test.results_dict.items()], sep='\n')

In [None]:
# Выводим YOLO изображения с метриками
for image_file in sorted([f for f in os.listdir(YOLO_PATH_TEST) if f.endswith(('.png', '.jpg'))]):
    IPdisplay.display(IPdisplay.Image(filename=os.path.join(YOLO_PATH_TEST, image_file), width=900))

# Предсказание на тестовом наборе

In [None]:
# Загружаем частично обученную модель с гугл-диска
trained_model = YOLO(model_file := get_best_model_file())
print('Загружен файл модели:', model_file)

results_predict = trained_model.predict(source=TEST_IMAGES_PATH,
                                        project=YOLO_PROJECT,
                                        name=YOLO_PREDICT,
                                        save=True,                # Сохранение изображений с результатами
                                        save_conf=True,           # Сохранение результатов с показателями достоверности
                                        save_txt=True,            # Сохранить результаты .txt файл
                                        conf=0.25,                # Порог достоверности объекта для обнаружения
                                        iou=0.7                   # Пересечение над объединением
                                        )

In [None]:
# Выводим случайную выборку изображений с результатами предсказания

ROW_SIZE = 3            # Количество изображений в каждом ряду
IMAGES_TO_VIEW = 18     # Сколько изображений вывести (рекомендуется кратное ROW_SIZE)

# Выбираем IMAGES_TO_VIEW случайных имен изображений из папки с результатами предсказания
image_files = [f for f in os.listdir(YOLO_PATH_PREDICT) if f.endswith('.tif')]
image_files = random.sample(image_files, IMAGES_TO_VIEW)

# Делим список изображений на порции по ROW_SIZE элементов
image_groups = [image_files[i:i + ROW_SIZE] for i in range(0, len(image_files), ROW_SIZE)]

# Выведите изображения
for image_group in image_groups:
    plt.figure(figsize=(15, 12))  # Размер графика
    for i, image_file in enumerate(image_group):
        img = Image.open(os.path.join(YOLO_PATH_PREDICT, image_file))
        plt.subplot(1, ROW_SIZE, i + 1)
        plt.imshow(img)
        plt.axis('off')         # Отключаем оси
        plt.title(image_file)   # Заголовок с именем файла
    plt.show()
