In [2]:
import random
import shutil
import torch
import yaml
import os

from pathlib import Path

from ultralytics import YOLO

from sklearn.model_selection import train_test_split

In [136]:
def create_yaml(output_yaml_path, train_image_dir, val_image_dir, test_image_dir=' ', nc=11):
    # проба - соединить все классы в одну сетку !!!!!!!!!!
    names = ['heart', 'lungs', 'trachea', 'bronchitis', 'pneumonia/bronchopneumonia', 'pulmonary edema', 
             'hydrothorax', 'pneumothorax', 'tracheal collapse', 'neoplasm', 'atelectasis']
    
    yaml_data = {
        'names': names,
        'nc': nc,
        'train': train_image_dir,
        'val': val_image_dir,
        'test': test_image_dir
    }

    with open(output_yaml_path, 'w') as j:
        yaml.dump(yaml_data, j, default_flow_style=False)

In [None]:
def get_random_subset_from_two_datasets(dataset1, dataset2, percentage_or_count):
    """
    Выбирает случайное подмножество из двух наборов данных одинаковой длины, используя индексы из первого набора.

    Args:
        dataset1: Первый набор данных (определяет индексы).
        dataset2: Второй набор данных.
        percentage_or_count: Процент (от 0 до 100) или количество элементов.

    Returns:
        Кортеж из двух списков: случайное подмножество dataset1 и dataset2, выбранное по общим индексам.
        Возвращает None, если ввод некорректен или наборы данных имеют разную длину.
    """
    if len(dataset1) != len(dataset2):
        print("Наборы данных должны иметь одинаковую длину.")
        return None

    data_len = len(dataset1)
    subset_indices = get_random_indices(data_len, percentage_or_count)

    if subset_indices is None:
        return None

    subset1 = [dataset1[i] for i in subset_indices]
    subset2 = [dataset2[i] for i in subset_indices]

    return subset1, subset2


def get_random_indices(data_len, percentage_or_count):
    """
    Выбирает случайные индексы, основываясь на проценте или количестве элементов.  Вспомогательная функция.
    """
    if 0 <= percentage_or_count <= 100:
        sample_size = int(data_len * (percentage_or_count / 100))
        if sample_size == 0 and data_len > 0:
            sample_size = 1
        return random.sample(range(data_len), sample_size) if data_len > 0 else []

    elif isinstance(percentage_or_count, int) and 0 <= percentage_or_count <= data_len:
        return random.sample(range(data_len), percentage_or_count) if data_len > 0 else []

    else:
        print("Некорректный ввод. Введите процент (от 0 до 100) или количество элементов.")
        return None

In [None]:
def split_dataset(dataset_dir: str, experiment_path: str, percentage: int):
    image_paths = []
    label_paths = []

    for root, _, files in os.walk(dataset_dir):
        if 'images' in root and 'train' not in root and 'val' not in root:
            for file in files:
                if file.endswith('.jpg'):
                    image_paths.append(os.path.join(root, file))
                    label_paths.append(os.path.join(root.replace('images', 'labels'), file.replace('.jpg', '.txt')))
    train_img_paths, val_img_paths, train_label_paths, val_label_paths = train_test_split(image_paths, 
                                                                                          label_paths,
                                                                                          test_size=0.2,
                                                                                          random_state=42)
    
    train_img_paths, train_label_paths = get_random_subset_from_two_datasets(train_img_paths, train_label_paths, percentage)

    train_images_dir = os.path.join('data', experiment_path, 'train', 'images')
    val_images_dir = os.path.join('data', experiment_path, 'val', 'images')
    train_labels_dir = os.path.join('data', experiment_path, 'train', 'labels')
    val_labels_dir = os.path.join('data', experiment_path, 'val', 'labels')

    os.makedirs(train_images_dir, exist_ok=True)
    os.makedirs(val_images_dir, exist_ok=True)
    os.makedirs(train_labels_dir, exist_ok=True)
    os.makedirs(val_labels_dir, exist_ok=True)

    for train_img_path, train_label_path in zip(train_img_paths, train_label_paths):
        shutil.copy2(train_img_path, train_img_path.replace(r'dataset\images', experiment_path + r'\train\images'))
        shutil.copy2(train_label_path, train_label_path.replace(r'dataset\labels', experiment_path + r'\train\labels'))

    for val_img_path, val_label_path in zip(val_img_paths, val_label_paths):
        try:
            shutil.copy2(val_img_path, val_img_path.replace(r'dataset\images', experiment_path + r'\val\images'))
            shutil.copy2(val_label_path, val_label_path.replace(r'dataset\labels', experiment_path + r'\val\labels'))
        except Exception as e:
            print(e)
            print('a2', val_img_path)
            print('b2', val_img_path.replace(r'datase\images', experiment_path + r'\val\images'))
    
    yaml_path = os.path.join('data', experiment_path, 'data.yaml')
    train_path = os.path.join('train', 'images')
    val_path = os.path.join('val', 'images')
    create_yaml(yaml_path, train_path, val_path)

In [None]:
dataset_dir = os.getcwd() + r'\data\dataset'

In [None]:
# создание полного датасета
path_to_full_train = r'train_set\full_train'
split_dataset(
    dataset_dir=dataset_dir,
    experiment_path=path_to_full_train,
    percentage=100
)

In [None]:
# создание датасета в 20% от полного объема
path_to_full_train = r'train_set\20_train'
split_dataset(
    dataset_dir=dataset_dir,
    experiment_path=path_to_full_train,
    percentage=20
)

In [None]:
# создание датасета в 10% от полного объема
path_to_full_train = r'train_set\10_train'
split_dataset(
    dataset_dir=dataset_dir,
    experiment_path=path_to_full_train,
    percentage=10
)

In [None]:
# создание датасета в 1% от полного объема
path_to_full_train = r'train_set\1_train'
split_dataset(
    dataset_dir=dataset_dir,
    experiment_path=path_to_full_train,
    percentage=1
)

Обучение модели YOLOv10-M

In [None]:
print('CUDA available ->', torch.cuda.is_available())

Обучение модели без активного обучения

In [None]:
def train_yolo_model(main_experiment: str, name: str, seed: int):
    model = YOLO("yolov9e-seg.pt")

    ABS_PATH = os.getcwd()
    results = model.train(
        data = os.path.join(ABS_PATH, 'data', main_experiment, name, 'data.yaml'),
        project = 'results',
        name = name,
        epochs = 25,
        patience = 0,
        batch = 5,
        imgsz = 640,
        seed=seed
    )

In [None]:
train_yolo_model(main_experiment='train_set', name=f'full_train', seed=0)

In [None]:
for i in range(5):
    train_yolo_model(main_experiment='train_set', name=f'1_train', seed=(i + 1) * 1)

In [None]:
for i in range(5):
    train_yolo_model(main_experiment='train_set', name=f'10_train', seed=(i + 1) * 2)

In [None]:
for i in range(5):
    train_yolo_model(main_experiment='train_set', name=f'20_train', seed=(i + 1) * 3)

1 алгоритм Least Confidence:

In [176]:
def create_least_confidence_dataset(
        paths: list,
        percentage_from_top: int,
        percentage_from_bottom: int,
        alg_name: str,
        experiment_name: str):
    num_samples = int(len(paths) * (percentage_from_top / 100))
    
    paths_to_train_images = paths[:num_samples]
    paths_to_train_images = [path[path.find('/data'):] for path in paths_to_train_images]
    
    train_images_paths = [Path(path) for path in paths_to_train_images]

    train_labels_paths = [Path(str(Path(path).parent).replace('images', 'labels')) /
                          Path(str(Path(path).name).replace('.jpg', '.txt'))
                          for path in paths_to_train_images]
    
    paths_to_val_images = paths[-(int(len(paths) * (percentage_from_bottom / 100)) + 1):]
    paths_to_val_images = [path[path.find('/data'):] for path in paths_to_val_images]

    val_images_paths = [Path(path) for path in paths_to_val_images]

    val_labels_paths = [Path(str(Path(path).parent).replace('images', 'labels')) /
                        Path(str(Path(path).name).replace('.jpg', '.txt'))
                        for path in paths_to_val_images]
    
    project_path = Path(r'C:/Users/neutron/Desktop/ITaS')
    train_images_dir = os.path.join('data', alg_name, experiment_name, 'train', 'images')
    val_images_dir = os.path.join('data', alg_name, experiment_name, 'val', 'images')
    train_labels_dir = os.path.join('data', alg_name, experiment_name, 'train', 'labels')
    val_labels_dir = os.path.join('data', alg_name, experiment_name, 'val', 'labels')

    os.makedirs(train_images_dir, exist_ok=True)
    os.makedirs(val_images_dir, exist_ok=True)
    os.makedirs(train_labels_dir, exist_ok=True)
    os.makedirs(val_labels_dir, exist_ok=True)

    for train_image_path, train_label_path in zip(train_images_paths, train_labels_paths):
        train_image_path = project_path.joinpath('data', 'dataset', 'images', train_image_path.name)
        train_label_path = project_path.joinpath('data', 'dataset', 'labels', train_label_path.name)

        shutil.copy2(train_image_path,
                     Path(str(train_image_path).replace(r'dataset\images', os.path.join(alg_name, experiment_name, 'train', 'images'))))

        shutil.copy2(train_label_path,
                     Path(str(train_label_path).replace(r'dataset\labels', os.path.join(alg_name, experiment_name, 'train', 'labels'))))
        
    for val_image_path, val_label_path in zip(val_images_paths, val_labels_paths):
        val_image_path = project_path.joinpath('data', 'dataset', 'images', val_image_path.name)
        val_label_path = project_path.joinpath('data', 'dataset', 'labels', val_label_path.name)

        shutil.copy2(val_image_path,
                     Path(str(val_image_path).replace(r'dataset\images', os.path.join(alg_name, experiment_name, 'val', 'images'))))

        shutil.copy2(val_label_path,
                     Path(str(val_label_path).replace(r'dataset\labels', os.path.join(alg_name, experiment_name, 'val', 'labels'))))


In [177]:
paths_to_images_least_confidence = []
with open(Path('data/least_confidence/least_confidence.txt'), 'r', encoding='utf-8') as file:
    paths_to_images_least_confidence = [line.strip() for line in file.readlines()]

In [178]:
create_least_confidence_dataset(paths_to_images_least_confidence, 1, 20, 'least_confidence', '1_train')
create_least_confidence_dataset(paths_to_images_least_confidence, 10, 20, 'least_confidence', '10_train')
create_least_confidence_dataset(paths_to_images_least_confidence, 20, 20, 'least_confidence', '20_train')

In [179]:
create_yaml('data/least_confidence/1_train/data.yaml', 'train/images', 'val/images')
create_yaml('data/least_confidence/10_train/data.yaml', 'train/images', 'val/images')
create_yaml('data/least_confidence/20_train/data.yaml', 'train/images', 'val/images')

2 алгоритм Margin Sampling

Алгоритм используется для увеличение качества распознования таких как классов как pneumonia/bronchopneumonia и pulmonary edema, так как эти заболевания довольно похожи друг на друга.

In [None]:
def create_margin_sampling_dataset(
        paths: list,
        percentage_from_top: int,
        percentage_from_bottom: int,
        alg_name: str,
        experiment_name: str):
    
    if percentage_from_top == 1:
        num_samples = 13
    elif percentage_from_top == 10:
        num_samples = 138
    elif percentage_from_top == 20:
        num_samples = 276
    else:
        print('Выберите один из следующих вариантов: 1, 10, 20!')
        return None

    paths_to_train_images = paths[:num_samples]
    paths_to_train_images = [path[path.find('/data'):] for path in paths_to_train_images]

    train_images_paths = [Path(path) for path in paths_to_train_images]

    train_labels_paths = [Path(str(Path(path).parent).replace('images', 'labels')) /
                          Path(str(Path(path).name).replace('.jpg', '.txt'))
                          for path in paths_to_train_images]
    
    paths_to_val_images = paths[-277:]
    paths_to_val_images = [path[path.find('/data'):] for path in paths_to_val_images]

    val_images_paths = [Path(path) for path in paths_to_val_images]

    val_labels_paths = [Path(str(Path(path).parent).replace('images', 'labels')) /
                        Path(str(Path(path).name).replace('.jpg', '.txt'))
                        for path in paths_to_val_images]
    
    project_path = Path(r'C:/Users/neutron/Desktop/ITaS')
    train_images_dir = os.path.join('data', alg_name, experiment_name, 'train', 'images')
    val_images_dir = os.path.join('data', alg_name, experiment_name, 'val', 'images')
    train_labels_dir = os.path.join('data', alg_name, experiment_name, 'train', 'labels')
    val_labels_dir = os.path.join('data', alg_name, experiment_name, 'val', 'labels')

    os.makedirs(train_images_dir, exist_ok=True)
    os.makedirs(val_images_dir, exist_ok=True)
    os.makedirs(train_labels_dir, exist_ok=True)
    os.makedirs(val_labels_dir, exist_ok=True)

    for train_image_path, train_label_path in zip(train_images_paths, train_labels_paths):
        train_image_path = project_path.joinpath('data', 'dataset', 'images', train_image_path.name)
        train_label_path = project_path.joinpath('data', 'dataset', 'labels', train_label_path.name)

        shutil.copy2(train_image_path,
                     Path(str(train_image_path).replace(r'dataset\images', os.path.join(alg_name, experiment_name, 'train', 'images'))))

        shutil.copy2(train_label_path,
                     Path(str(train_label_path).replace(r'dataset\labels', os.path.join(alg_name, experiment_name, 'train', 'labels'))))
    
    for val_image_path, val_label_path in zip(val_images_paths, val_labels_paths):
        val_image_path = project_path.joinpath('data', 'dataset', 'images', val_image_path.name)
        val_label_path = project_path.joinpath('data', 'dataset', 'labels', val_label_path.name)

        shutil.copy2(val_image_path,
                     Path(str(val_image_path).replace(r'dataset\images', os.path.join(alg_name, experiment_name, 'val', 'images'))))

        shutil.copy2(val_label_path,
                     Path(str(val_label_path).replace(r'dataset\labels', os.path.join(alg_name, experiment_name, 'val', 'labels'))))

In [181]:
paths_to_images_margin_sampling = []
with open(Path('data/margin_sampling/margin_sampling.txt'), 'r', encoding='utf-8') as file:
    paths_to_images_margin_sampling = [line.strip() for line in file.readlines()]

In [182]:
create_margin_sampling_dataset(paths_to_images_margin_sampling, 1, 20, 'margin_sampling', '1_train')
create_margin_sampling_dataset(paths_to_images_margin_sampling, 10, 20, 'margin_sampling', '10_train')
create_margin_sampling_dataset(paths_to_images_margin_sampling, 20, 20, 'margin_sampling', '20_train')

['/kaggle/input/itas-2024/data/dataset/images/1.1.2.1.1008.jpg', '/kaggle/input/itas-2024/data/dataset/images/1.1.2.1.1063.jpg', '/kaggle/input/itas-2024/data/dataset/images/2.1.9.1.1064.jpg', '/kaggle/input/itas-2024/data/dataset/images/1.1.6.1.1022.jpg', '/kaggle/input/itas-2024/data/dataset/images/2.1.9.1.1068.jpg', '/kaggle/input/itas-2024/data/dataset/images/4.2.7.1.1126.jpg', '/kaggle/input/itas-2024/data/dataset/images/1.2.8.1.1055.jpg', '/kaggle/input/itas-2024/data/dataset/images/2.1.2.1.1141.jpg', '/kaggle/input/itas-2024/data/dataset/images/3.1.1.1.33.jpg', '/kaggle/input/itas-2024/data/dataset/images/1.2.8.1.1013.jpg', '/kaggle/input/itas-2024/data/dataset/images/2.1.3.1.1017.jpg', '/kaggle/input/itas-2024/data/dataset/images/1.2.2.1.1086.jpg', '/kaggle/input/itas-2024/data/dataset/images/1.1.2.1.1017.jpg']
['/kaggle/input/itas-2024/data/dataset/images/1.1.2.1.1008.jpg', '/kaggle/input/itas-2024/data/dataset/images/1.1.2.1.1063.jpg', '/kaggle/input/itas-2024/data/dataset/im

In [None]:
create_yaml('data/least_confidence/1_train/data.yaml', 'train/images', 'val/images')
create_yaml('data/least_confidence/10_train/data.yaml', 'train/images', 'val/images')
create_yaml('data/least_confidence/20_train/data.yaml', 'train/images', 'val/images')

In [None]:

Функция margin_sampling(model, unlabeled_data, num_samples):
    # Входные параметры:
    # model - обученная модель
    # unlabeled_data - список неразмеченных данных
    # num_samples - количество образцов, которые нужно выбрать

    selected_samples = []  # Список для выбранных образцов

    Для каждого image_path в unlabeled_data:
        # Загрузить изображение
        image = load_image(image_path)

        # Получить предсказания модели (например, вероятности классов)
        predictions = model.predict(image)

        # Получить вероятности классов и отсортировать их
        sorted_probabilities = sort(predictions)  # Сортировка вероятностей по убыванию

        # Вычислить разницу между двумя наиболее вероятными классами
        margin = sorted_probabilities[0] - sorted_probabilities[1]

        # Добавить в список выбранных образцов с соответствующим значением margin
        selected_samples.append({'image_path': image_path, 'margin': margin})

    # Выбрать num_samples образцов с наименьшими значениями margin
    selected_samples.sort(key=lambda x: x['margin'])  # Сортировка по margin
    selected_samples = selected_samples[:num_samples]  # Выбор первых num_samples

    # Вернуть список выбранных образцов
    return selected_samples

In [142]:
model = YOLO('best.pt')

result = model.predict('data/least_confidence/1_train/train/images/1.1.2.1.1018.jpg')
print(1. in result[0].boxes.cls and 2. in result[0].boxes.cls)


image 1/1 c:\Users\neutron\Desktop\ITaS\data\least_confidence\1_train\train\images\1.1.2.1.1018.jpg: 736x928 1 lungs, 1 trachea, 83.6ms
Speed: 1.6ms preprocess, 83.6ms inference, 10.0ms postprocess per image at shape (1, 3, 736, 928)
True
