# Autonomous detect
Данный блокнот предоставляет возможность запустить модель автономно, без доступа к библиотеке `headliners_CXRForeignViewer`. Он полностью повторяет аналогичный запуск с использованием библиотеки, но использует обученную командой модель напрямую, если по какой-то причине загрузка библиотеки целиком невозможно или затруднительна (например, при запуске в google.Colab). Это может быть удобно, если блокнот `detect.ipynb` не видит импорт библиотеки

### Установка и импорт необходимых библиотек

In [None]:
!pip install ultralytics
!pip install tqdm --upgrade

In [31]:
import os
import shutil
import random
import ultralytics
import pydicom as dicom
import numpy as np
from PIL import Image
import cv2
import re
import pandas as pd
from pathlib import Path
from tqdm.notebook import tqdm
from ultralytics.models import YOLO

In [3]:
ultralytics.checks()

Ultralytics 8.3.221  Python-3.12.4 torch-2.9.0+cpu CPU (12th Gen Intel Core i7-12650H)
Setup complete  (16 CPUs, 15.7 GB RAM, 429.8/457.5 GB disk)


### Директория тестовых данных

Для того, чтобы запустить модель на наборе тестовых данных, поместите пусть к директории с тестовыми DICOM изображениями в переменную `TEST_FILES_PATH`

In [None]:
TEST_FILES_PATH = "./headliners_CXRForeignViewer/demo_files/"
TEMP_TEST_DATA_PATH = "tmp_test/"
OUTPUT_PATH = "runs/detect/predict/"
OUTPUT_LABELS_PATH = "runs/detect/predict/labels/"
RESULT_PATH = "./result/"
MODEL_PATH = "headliners_CXRForeignViewer/model/weights/best.onnx"
test_path = "./yolo_data/test"

### Препроцессинг
Функции ниже уже включены в состав библиотеки `headliners_CXRForeignViewer`, однако их присутствие здесь необходимо для автономного запуска.

In [23]:
def get_unique_filename(dest_path: str) -> str:
    """
    Генерирует уникальное имя файла, добавляя (2), (3) и т.д. при конфликтах
    """
    if not os.path.exists(dest_path):
        return dest_path
    
    base_dir = os.path.dirname(dest_path)
    filename = os.path.basename(dest_path)
    name, ext = os.path.splitext(filename)
    
    counter = 2
    while True:
        new_filename = f"{name} ({counter}){ext}"
        new_dest_path = os.path.join(base_dir, new_filename)
        if not os.path.exists(new_dest_path):
            return new_dest_path
        counter += 1

def prepare_and_copy_dicom(
        src_base_dir: str, # Папка с данными / Folder containing data
        dest_base_dir: str # Папка, куда нужно скопировать данные / Folder to copy data
        ) -> None:
    
    '''
        Предобработать и копировать файлы
        из исходной директории в целевую

        Preprocess and copy files
    '''

    os.makedirs(dest_base_dir, exist_ok=True)

    for root, dirs, files in os.walk(src_base_dir):
        for file in files:
            if file.lower().endswith('.dcm'):
                src_path = os.path.join(root, file)
                filename_without_ext = os.path.splitext(file)[0]
                dest_path = os.path.join(dest_base_dir, f"{filename_without_ext}.jpg")
                dest_path = get_unique_filename(dest_path)  # Получаем уникальное имя
                convert_dcm_to_jpg(src_path, dest_path, preproc=True)


def copy_dicom(
        src_base_dir: str, # Папка с данными / Folder containing data
        dest_base_dir: str # Папка, куда нужно скопировать данные / Folder to copy data
        ) -> None:
    
    '''
        Копировать файлы
        из исходной директории в целевую

        Copy files without preprocessing
    '''
    
    os.makedirs(dest_base_dir, exist_ok=True)

    for root, dirs, files in os.walk(src_base_dir):
        for file in files:
            if file.lower().endswith('.dcm'):
                src_path = os.path.join(root, file)
                filename_without_ext = os.path.splitext(file)[0]
                dest_path = os.path.join(dest_base_dir, f"{filename_without_ext}.jpg")
                dest_path = get_unique_filename(dest_path)  # Получаем уникальное имя
                convert_dcm_to_jpg(src_path, dest_path)


def convert_dcm_to_jpg(
        src_path: str, # Пусть к файлу dicom (.dcm) / Path to .dcm file
        dest_path: str, # Путь для сохранения файла .jpg / Path to save .jpg file
        preproc: bool=False # Нужна ли предобработка / if preprocessing needed
        ) -> None:

    '''
        Конвертировать изображение dicom (.dcm) 
        в формат .jpg с нормализацией

        Convert dcm image to .jpg format
        with normalization
    '''
        
    try:
        if preproc:
            image = prepare_dicom_image(src_path, method='clahe')
        else:
            image = prepare_dicom_image(src_path)

        # Нормализация
        if image.dtype != np.uint8:

            window_center, window_width = get_dicom_window_attributes(image)
            min_val = window_center - window_width // 2
            max_val = window_center + window_width // 2

            if max_val != min_val:
                image = ((image - min_val) / (max_val - min_val) * 255).astype(np.uint8)
            else:
                image = np.zeros_like(image, dtype=np.uint8)

        img = Image.fromarray(image)
        img.save(dest_path, 'JPEG')
        print(f"Конвертировано {src_path} => {dest_path}")
    except Exception as e:
        print(f"Ошибка при обработке {src_path}: {e}")


def get_dicom_window_attributes(
        ds # Считанное изображение dicom / Dicom image
        ) -> tuple:

    '''
        Вернуть атрибуты WindowCenter и WindowWidth
        для изображения в виде кортежа

        Return WindowCenter and WindowWidth attributes
        of the image as a tuple
    '''

    center = getattr(ds, 'WindowCenter', None)
    if center is None:
        window_center = np.mean(ds.pixel_array)
    elif isinstance(center, dicom.multival.MultiValue):
        window_center = int(center[0]) if 'WindowCenter' in ds else np.mean(ds.pixel_array)
    else:
        window_center = int(center) if 'WindowCenter' in ds else np.mean(ds.pixel_array)

    width = getattr(ds, 'WindowWidth', None)
    if width is None:
        window_width = np.max(ds.pixel_array) - np.min(ds.pixel_array)
    elif isinstance(ds.WindowWidth, dicom.multival.MultiValue):
        window_width = int(ds.WindowWidth[0]) if 'WindowWidth' in ds else np.max(ds.pixel_array) - np.min(ds.pixel_array)
    else:
        window_width = int(ds.WindowWidth) if 'WindowWidth' in ds else np.max(ds.pixel_array) - np.min(ds.pixel_array)
    
    return window_center, window_width


def apply_window_level(
        image: np.ndarray, # Изображение / Image
        window_center, # Параметр WindowCenter изображения dicom (.dcm) / WindowCenter parameter of dicom (.dcm) image
        window_width, # Параметр WindowWidth изображения dicom (.dcm) / WindowWidth parameter of dicom (.dcm) image
        photometric,  # Параметр PhotometricInterpretationя изображени dicom (.dcm) / PhotometricInterpretation parameter of dicom (.dcm) image
        method='simple', # Метод предобработки: simple, CLAHE, DCP или combined / Preprocessing method: simple, CLAHE, SCP or combined
        cL=5.0, # Параметр clicklimit для фильтра clahe / Clicklimit parameter for clahe filter
        tile=(8,8), # Параметр tileGridSize для фильтра clahe / TileGridSize parameter for clahe filter
        patch=15, # Параметр размерности для фильтра DCP / Size parameter for DCP filter
        mode='gray' # Тип фильтра: gray или rgb / Filter type: gray or rgb
        ) -> np.ndarray:
    
    '''
        Нормализовать изображение по оконным параметрам,
        применить фильтры,
        вернуть изображение как numpy.ndarray

        Normalize image with window attributes
        apply filters,
        return image as numpy.ndarray    
    '''
        
    img_min = window_center - window_width // 2
    img_max = window_center + window_width // 2
    windowed = image.copy()
    windowed[windowed < img_min] = img_min
    windowed[windowed > img_max] = img_max
    # Нормализация к 0-255 для отображения
    windowed = ((windowed - img_min) / (img_max - img_min) * 255).astype(np.uint8)
    if photometric == "MONOCHROME1":
        windowed = cv2.bitwise_not(windowed)

    if method == 'clahe':
        windowed = clahe_window(windowed,cL,tile,mode)
    elif method == 'clahe':
        windowed = clahe_window(windowed,cL,tile,mode)
    elif method == 'dcp':
        windowed = dcp_window(windowed,patch)
    elif method == 'combined':
        windowed = combined_window(windowed, cL, patch, tile, mode)
    else:
        pass

    return windowed


def prepare_dicom_image(
        image_path: str, # Путь к изображению dicom (.dcm) / Dicom image path
        method: str='simple', # Метод предобработки: simple, CLAHE, DCP или combined / Preprocessing method: simple, CLAHE, SCP or combined
        cL: float=5.0, # Параметр clicklimit для фильтра clahe / Clicklimit parameter for clahe filter
        tile=(8,8), # Параметр tileGridSize для фильтра clahe / TileGridSize parameter for clahe filter
        patch: int=5, # Параметр размерности для фильтра DCP / Size parameter for DCP filter
        mode: str="gray" # Тип фильтра: gray или rgb / Filter type: gray or rgb
        ) -> np.ndarray:
    
    '''
        Предобработать изображение dicom (.dcm)
        для передачи в модель,
        вернуть изображение как numpy.ndarray

        Preprocess dicom (.dcm) image
        before starting the model,
        return image as numpy.ndarray    
    '''
    # Читаем .dcm
    ds=dicom.dcmread(image_path)
    # Переводим в ndarray для обработки с помощью cv2
    dcm_sample = ds.pixel_array.astype(np.float32)

    window_center, window_width = get_dicom_window_attributes(ds)
    dcm_sample = apply_window_level(dcm_sample, window_center, window_width, ds.PhotometricInterpretation, method, cL, tile, patch, mode)

    return dcm_sample

def clahe_window(
        windowed: np.ndarray, # Нормализованное по оконным параметрам изображение / Image normalized with window  attributes
        cL: float, # Параметр clicklimit для фильтра clahe / Clicklimit parameter for clahe filter
        tile, # Параметр tileGridSize для фильтра clahe / TileGridSize parameter for clahe filter
        mode: str="gray" # Тип фильтра: gray или rgb / Filter type: gray or rgb
        ) -> np.ndarray:
    
    '''
        Применить фильтр CLAHE на изображении,
        вернуть изображение в формате numpy.adarray

        Apply CLAHE filter to the image, 
        return image as numpy.ndarray 
    '''

    if mode == "gray":
      clahe = cv2.createCLAHE(clipLimit=cL, tileGridSize=tile)
      windowed = clahe.apply(windowed)
    else:
      if len(windowed.shape) == 2:
          windowed_rgb = cv2.cvtColor(windowed, cv2.COLOR_GRAY2BGR)
      else:
          windowed_rgb = windowed.copy()

      clahe = cv2.createCLAHE(clipLimit=cL, tileGridSize=tile)
      lab = cv2.cvtColor(windowed_rgb, cv2.COLOR_BGR2LAB)
      l, a, b = cv2.split(lab)  # разделяем на каналы
      l2 = clahe.apply(l)
      lab = cv2.merge((l2, a, b))
      windowed = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
      windowed = cv2.cvtColor(windowed, cv2.COLOR_BGR2GRAY)

    return windowed


def dcp_window(
        windowed: np.ndarray, # Нормализованное по оконным параметрам изображение / Image normalized with window  attributes
        patch_size: int # Параметр размерности для фильтра DCP / Size parameter for DCP filter
        ) -> np.ndarray:

    '''
        Применить фильтр DCP на изображении,
        вернуть изображение в формате numpy.adarray

        Apply DCP filter to the image, 
        return image as numpy.ndarray 
    '''
        
    if len(windowed.shape) == 2:
          windowed_rgb = cv2.cvtColor(windowed, cv2.COLOR_GRAY2BGR)
    else:
          windowed_rgb = windowed.copy()

    b, g, r = cv2.split(windowed_rgb)
    min_bg = cv2.min(b, g)
    dark_channel = cv2.min(min_bg, r)
    kernel = np.ones((patch_size, patch_size), np.uint8)
    dark_channel = cv2.erode(dark_channel, kernel, iterations=1)

    return dark_channel


def combined_window(
        windowed_rgb: np.ndarray, # Нормализованное по оконным параметрам изображение в формате RGB / RGB image normalized with window attributes
        cL: float, # Параметр clicklimit для фильтра clahe / Clicklimit parameter for clahe filter
        tile, # Параметр tileGridSize для фильтра clahe / TileGridSize parameter for clahe filter
        patch: int, # Параметр размерности для фильтра DCP / Size parameter for DCP filter
        mode: str="gray" # Тип фильтра: gray или rgb / Filter type: gray or rgb
        ):

    '''
        Применить комбинированный фильтр (CLAHE + DCP) на изображении,
        вернуть изображение в формате numpy.adarray

        Apply combines filter (CLAHE + DCP) to the image, 
        return image as numpy.ndarray 
    '''
        
    if mode == "rgb":
      clahed = clahe_window(windowed_rgb,cL,tile,mode)
    elif mode == "gray":
      clahed = clahe_window(windowed_rgb,cL,tile,mode)
      clahed = cv2.cvtColor(windowed_rgb, cv2.COLOR_GRAY2BGR)
    final = dcp_window(clahed, patch)

    return final

In [24]:
prepare_and_copy_dicom(TEST_FILES_PATH, TEMP_TEST_DATA_PATH)
copy_dicom(TEST_FILES_PATH, TEMP_TEST_DATA_PATH)

Конвертировано ./headliners_CXRForeignViewer/demo_files/1.2.643.5.1.13.13.12.2.77.8252.02100604101413030912070905030707.dcm => tmp_test/1.2.643.5.1.13.13.12.2.77.8252.02100604101413030912070905030707.jpg
Конвертировано ./headliners_CXRForeignViewer/demo_files/1.2.643.5.1.13.13.12.2.77.8252.05080402101215020701121506120008.dcm => tmp_test/1.2.643.5.1.13.13.12.2.77.8252.05080402101215020701121506120008.jpg
Конвертировано ./headliners_CXRForeignViewer/demo_files/1.2.643.5.1.13.13.12.2.77.8252.06090706020808100215070307051202.dcm => tmp_test/1.2.643.5.1.13.13.12.2.77.8252.06090706020808100215070307051202.jpg
Конвертировано ./headliners_CXRForeignViewer/demo_files/1.2.643.5.1.13.13.12.2.77.8252.06140401090500021201021503151502.dcm => tmp_test/1.2.643.5.1.13.13.12.2.77.8252.06140401090500021201021503151502.jpg
Конвертировано ./headliners_CXRForeignViewer/demo_files/1.2.643.5.1.13.13.12.2.77.8252.02100604101413030912070905030707.dcm => tmp_test\1.2.643.5.1.13.13.12.2.77.8252.02100604101413030

### Детекция

In [None]:
model = YOLO(MODEL_PATH)

results = model.predict(
    task='detect',
    mode='predict',
    source=TEMP_TEST_DATA_PATH,
    conf=0.25,
    save_txt=True
)

Loading headliners_CXRForeignViewer/model/weights/best.onnx for ONNX Runtime inference...
Using ONNX Runtime 1.23.2 CPUExecutionProvider

image 1/8 c:\Users\vtsab\Kittens-Xray\tmp_test\1.2.643.5.1.13.13.12.2.77.8252.02100604101413030912070905030707 (2).jpg: 640x640 (no detections), 100.3ms
image 2/8 c:\Users\vtsab\Kittens-Xray\tmp_test\1.2.643.5.1.13.13.12.2.77.8252.02100604101413030912070905030707.jpg: 640x640 (no detections), 105.1ms
image 3/8 c:\Users\vtsab\Kittens-Xray\tmp_test\1.2.643.5.1.13.13.12.2.77.8252.05080402101215020701121506120008 (2).jpg: 640x640 2 foreign items, 98.4ms
image 4/8 c:\Users\vtsab\Kittens-Xray\tmp_test\1.2.643.5.1.13.13.12.2.77.8252.05080402101215020701121506120008.jpg: 640x640 2 foreign items, 99.6ms
image 5/8 c:\Users\vtsab\Kittens-Xray\tmp_test\1.2.643.5.1.13.13.12.2.77.8252.06090706020808100215070307051202 (2).jpg: 640x640 (no detections), 101.0ms
image 6/8 c:\Users\vtsab\Kittens-Xray\tmp_test\1.2.643.5.1.13.13.12.2.77.8252.06090706020808100215070307051

In [58]:
def merge_txt_files(
        source_dir:str # Директория с выводом модели
):
    # Создаем директорию, если она не существует
    dir = Path(RESULT_PATH)
    dir.mkdir(exist_ok=True)
    lib_dir = Path(''.join([RESULT_PATH, 'labels/']))
    lib_dir.mkdir(exist_ok=True)
    
    # Получаем список txt-файлов в текущей директории
    files = list(Path(source_dir).glob("*.txt"))
    
    # Словарь для группировки файлов по базовому имени
    file_groups = {}
    
    # Шаблон для поиска файлов с суффиксом (2)
    pattern = re.compile(r"^(.*?)\s*\(\d+\)\s*$")
    
    for file in files:
        stem = file.stem  # Имя файла без расширения
        
        # Проверяем, является ли файл версией с номером
        match = pattern.match(stem)
        if match:
            base_name = match.group(1)
        else:
            base_name = stem
        
        # Добавляем файл в соответствующую группу
        if base_name not in file_groups:
            file_groups[base_name] = []
        file_groups[base_name].append(file)
    
    # Обрабатываем группы файлов
    for base_name, group in file_groups.items():
        if len(group) > 1:
            # Сортируем: оригинал первый, затем версии с номерами
            group.sort(key=lambda x: x.stem)
            
            # Читаем содержимое всех файлов группы
            content = []
            for file in group:
                with open(file, 'r', encoding='utf-8') as f:
                    content.append(f.read())
            
            # Создаем имя результирующего файла
            output_file = lib_dir / f"{base_name}.txt"
            
            # Записываем объединенное содержимое
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write('\n'.join(content))
            
            # print(f"Объединен файл: {output_file}")

def draw_yolo_boxes(image_path, label_path, output_dir):
    # Создаем директорию для результатов если её нет
    os.makedirs(output_dir, exist_ok=True)
    
    # Загружаем изображение
    image = cv2.imread(image_path)
    if image is None:
        # print(f"Ошибка загрузки изображения: {image_path}")
        return
        
    img_height, img_width = image.shape[:2]
    
    # Читаем файл разметки
    try:
        with open(label_path, 'r') as f:
            lines = f.readlines()
    except FileNotFoundError:
        # print(f"Файл разметки не найден: {label_path}")
        return

    # Обрабатываем каждую метку
    for line in lines:
        data = line.strip().split()
        if len(data) != 5:
            continue
            
        class_id, x_center, y_center, width, height = map(float, data)
        
        # Конвертируем нормализованные координаты в абсолютные
        x_center_abs = x_center * img_width
        y_center_abs = y_center * img_height
        width_abs = width * img_width
        height_abs = height * img_height
        
        # Рассчитываем координаты углов прямоугольника
        x1 = int(x_center_abs - width_abs / 2)
        y1 = int(y_center_abs - height_abs / 2)
        x2 = int(x_center_abs + width_abs / 2)
        y2 = int(y_center_abs + height_abs / 2)
        
        # Рисуем прямоугольник
        color = (0, 255, 0)  # Зеленый цвет
        thickness = 2
        cv2.rectangle(image, (x1, y1), (x2, y2), color, thickness)
        
        # Добавляем подпись класса (опционально)
        label = f"Class {int(class_id)}"
        cv2.putText(image, label, (x1, y1 - 10),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, thickness)

    # Сохраняем результат
    output_path = os.path.join(output_dir, os.path.basename(image_path))
    cv2.imwrite(output_path, image)
    # print(f"Обработано: {output_path}")

def draw_boxes():
    # Укажите пути к вашим данным
    images_dir = TEMP_TEST_DATA_PATH  # Папка с изображениями
    labels_dir = OUTPUT_LABELS_PATH  # Папка с разметкой YOLO
    output_dir = RESULT_PATH  # Папка для результатов
    
    # Обрабатываем каждый файл
    for filename in os.listdir(images_dir):
        if filename.endswith(".jpg") and "(2)" not in filename:
            # Формируем пути к файлам
            image_path = os.path.join(images_dir, filename)
            label_name = os.path.splitext(filename)[0] + ".txt"
            label_path = os.path.join(labels_dir, label_name)
            
            # Обрабатываем изображение
            draw_yolo_boxes(image_path, label_path, output_dir)


def create_file_mapping_table(images_dir: str, labels_dir: str, output_excel: str = "file_mapping.xlsx") -> None:
    """
    Создает Excel таблицу с mapping файлов изображений и соответствующих txt файлов
    
    Args:
        images_dir: Папка с изображениями
        labels_dir: Папка с txt файлами (лейблами)
        output_excel: Путь для сохранения Excel файла
    """
    
    # Получаем списки файлов
    image_files = [f for f in os.listdir(images_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.dcm'))]
    label_files = [f for f in os.listdir(labels_dir) if f.lower().endswith('.txt')]
    
    # Создаем множества имен без расширений для быстрого поиска
    image_names = {os.path.splitext(f)[0] for f in image_files}
    label_names = {os.path.splitext(f)[0] for f in label_files}
    
    # Создаем данные для таблицы
    data = []
    for image_name in sorted(image_names):
        has_label = 1 if image_name in label_names else 0
        data.append({
            'file_id': image_name,
            'has_label': has_label
        })
    
    # Создаем DataFrame
    df = pd.DataFrame(data)
    
    # Сохраняем в Excel
    df.to_excel(output_excel, index=False)
    

# Пример использования
def make_binary():
    images_dir = TEST_FILES_PATH
    labels_dir = ''.join([RESULT_PATH, 'labels/'])
    
    create_file_mapping_table(images_dir, labels_dir, 'result.xlsx')

In [49]:
merge_txt_files(OUTPUT_LABELS_PATH)

In [50]:
draw_boxes()

Обработано: ./result/1.2.643.5.1.13.13.12.2.77.8252.05080402101215020701121506120008.jpg
Обработано: ./result/1.2.643.5.1.13.13.12.2.77.8252.06140401090500021201021503151502.jpg


In [59]:
make_binary()

### Удаление временных папок и файлов

In [None]:
def cleanup_directories(directories: list) -> None:
    """
    Удаляет список директорий
    
    Args:
        directories: Список путей к директориям для удаления
    """
    for dir_path in directories:
        if os.path.exists(dir_path):
            try:
                shutil.rmtree(dir_path)
            except Exception as e:
                print(f"❌ Ошибка при удалении {dir_path}: {e}")
        else:
            print(f"ℹ️ Директория не существует: {dir_path}")

In [None]:
temp_directories = ['runs', TEMP_TEST_DATA_PATH]
cleanup_directories(temp_directories)

✅ Удалено: runs
✅ Удалено: tmp_test
