In [None]:
from fcsparser import parse
import os
import re
import shutil
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
import umap
import time
from scipy.stats import norm
from transliterate import translit
import warnings
import pickle
from matplotlib.patches import Polygon
from openpyxl import load_workbook
from matplotlib.path import Path
from matplotlib.backends.backend_pdf import PdfPages
from scipy.interpolate import griddata

In [None]:
"""

Верисия 10, стабильная
Обновлён гейт для гранулоцитов.
Добавил возможность настроить количество эпох при обучении классификатора.
Больше эпох - дальше расходятся кластеры, но нужно адаптировать гейты.
Добавил возможность переносить данные из .csv в файл заключения.
Координаты для переноса данных тоже ставятся вручную.

Версия 9, стабильна
Исправлен гейт для мусора. Добавлены гейты для CD117, но нужно доработать
Неудачно проведён поиск гейтов для каппа и лямбда.

Версия 8 - решены проблемы с совместимостью. Необходимо добавить актуальные 
версии библиотек.С данными библиотеками популяции распознаются одинаково.

Версия 7 - Готов скрипт для процессинга данных, обучения и сохранения 
классификаторов,загрузки валидационных данных, разметки популяций на юмап, 
и применения юмап к тестовым данным

Необходимо:Завершить разметкуprocess_test_data функцию доработать до выдачи 
количественной экспрессии антигенов за вычетом мусораСоставить функцию для 
формирования отчёта и занесения данных в excel-бланкиПроверить, будет ли применять 
process_test_data нужный классификатор к пробирке с соответствующим названием. 
По идее, долженВынести функции для работы с тестом в отдельный файл
Упаковать в докер или хотя бы .py-скрипт для быстрого запуска
"""

In [None]:
# path = r"C:\Users\vsemis\files\Flow_cyt_robot" # внимание комп
path = r"D:\NovoExpress Data"  # очередной комп

# Используемые функции

In [None]:
def log_transp_znorm_data(data):
    """
    Выполняет логарифмирование всего датасета с событиями
    После логарифмирования происходит транспонирование и z-нормализация,
    после чего - повторное транспонирование для последующего анализа
    Эффективность этих действий подтверждена предшествующими наблюдениями
    """
    data_clipped = data.clip(lower=1)  # Клипуем данные, чтобы избежать log(0)
    data_log = data_clipped.apply(np.log2)  # Логарифмирование
    data_log = data_log.T  # Транспонирование
    data_log_transp_znorm = (data_log - data_log.mean()) / data_log.std()  # z-нормализация
    data_log_transp_znorm = data_log_transp_znorm.T  # Транспонирование
    
    return data_log_transp_znorm

In [None]:
def remove_outliers_percentile(df, columns=None, lower_quantile=0.0025, upper_quantile=0.99):
    """
    Удаляет выбросы в датафрейме df, обрезая значения за заданными перцентилями.
    
    Параметры:
    ----------
    df : pd.DataFrame
        Исходный датафрейм.
    columns : list or None
        Список столбцов, по которым ищем выбросы.
        Если None, то берём все числовые столбцы.
    lower_quantile : float
        Нижняя граница (от 0 до 1), по умолчанию 0.0025 (0.25%).
    upper_quantile : float
        Верхняя граница (от 0 до 1), по умолчанию 0.99 (99%).
    
    Возвращает:
    -----------
    pd.DataFrame
        Датафрейм без выбросов.
    """
    
    # Если столбцы не указаны, берём все числовые
    if columns is None:
        columns = df.select_dtypes(include='number').columns.tolist()
    
    # Создадим копию, чтобы не менять исходный датафрейм
    df_clean = df.copy()
    
    for col in columns:
        if col not in df_clean.columns:
            # Если в списке оказался столбец, которого нет в датафрейме
            continue
        
        # Вычислим перцентили
        low_val = df_clean[col].quantile(lower_quantile)
        high_val = df_clean[col].quantile(upper_quantile)
        
        # Фильтруем выбросы
        df_clean = df_clean[(df_clean[col] >= low_val) & (df_clean[col] <= high_val)]
    
    return df_clean

In [None]:
def log_arcsinh_transform_data(data, cofactor_mapping=None):  # Исправить название!!!
    """
    Применяет преобразования к датафрейму.
    
    Для столбцов, в названии которых встречается одно из ключевых слов 
    ('CD'), применяется arcsinh-преобразование с заданным ко-фактором.
    Для остальных столбцов выполняется клиппинг (чтобы избежать log(0)) и логарифмирование.
    
    Параметры:
    ----------
    data : pd.DataFrame
        Исходный датафрейм.
    cofactor_mapping : dict, optional
        Словарь, сопоставляющий ключевые слова и их ко-факторы.
        По умолчанию: {'CD': 5}
    
    Возвращает:
    -----------
    pd.DataFrame
        Датафрейм после преобразований.
    """
    if cofactor_mapping is None:
        cofactor_mapping = {'CD': 3, 'Ig': 3}  # хорошие: 5
    
    data_transformed = data.copy()
    
    for col in data_transformed.columns:
        # Если имя столбца содержит одно из ключевых слов,
        # применяем arcsinh-преобразование с соответствующим ко-фактором.
        applied_arcsinh = False
        for key, cofactor in cofactor_mapping.items():
            if key in col:
                data_transformed[col] = np.arcsinh(data_transformed[col] / cofactor)
                applied_arcsinh = True
                break  # Если нашли соответствие, дальнейшие проверки не требуются.
                
        # Если столбец не содержит заданных ключевых слов, применяем стандартное преобразование:
        if not applied_arcsinh:
            # Клиппинг для предотвращения log(0)
            data_transformed[col] = data_transformed[col].clip(lower=1)
            # Логарифмирование по основанию 2
            data_transformed[col] = np.log2(data_transformed[col])
            
    return data_transformed

In [None]:
def train_umap_data_organiser(dirs, tube_list, size=10000, random_state=42):
    """
    Данная функция проходит по указанным директориям, ищет файлы, соответствующие
    именам из tube_list, загружает их, обрабатывает и сохраняет отдельные файлы 
    с названием train_data_<имя файла>.csv без расширения .fcs.
    """
    for tube in tube_list:  # Проходим по каждому файлу из списка
        combined_data = pd.DataFrame()  # Создаём пустой датафрейм

        for folder in dirs:
            print(f"\n📂 Обрабатываем главную папку: {folder}")

            for subfolder in os.listdir(folder):  # Проходим по подпапкам
                subfolder_path = os.path.join(folder, subfolder)
                if os.path.isdir(subfolder_path):
                    print(f"   📁 Внутренняя папка: {subfolder}")

                    for file in os.listdir(subfolder_path):
                        if file == tube:  # Проверяем, соответствует ли имя файла текущему tube
                            file_path = os.path.join(subfolder_path, file)
                            print(f"      📄 Найден файл: {file}")

                            meta, data = parse(file_path)
                            del meta  # Не нужно, удаляем

                            if len(data) > size:
                                data = data.sample(n=size, random_state=random_state)  # Берём нужное количество строк
                            
                            # Удаляем ненужные столбцы
                            data = data.drop(columns=[col for col in data.columns if col.endswith(('-A', 'Width', 'Time'))])

                            # Удаляем выбросы
                            data = remove_outliers_percentile(data, columns=['FSC-H', 'SSC-H'])
                            
                            # Применяем нормализацию
                            data = log_arcsinh_transform_data(data)  # Цитометрически-ориентированный подход
                            # data = (data - data.mean()) / data.std()  # z-нормализация
                            
                            # Добавляем информацию о типе образца
                            data["sample_type"] = os.path.basename(folder)

                            # Добавляем колонку 'sample_name' с транслитерированным названием subfolder
                            data["sample_name"] = translit(subfolder, reversed=True)

                            # Если датафрейм пуст, добавляем колонки
                            if combined_data.empty:
                                combined_data = pd.DataFrame(columns=data.columns)

                            # Присоединяем данные
                            combined_data = pd.concat([combined_data, data], ignore_index=True)

        if not combined_data.empty:
            # Определяем папку для сохранения (на уровень выше первой директории в dirs)
            base_output_dir = os.path.dirname(dirs[0])
            output_dir = os.path.join(base_output_dir, "train_data")  # Добавляем папку train_data
            if not os.path.exists(output_dir):
                os.makedirs(output_dir)
                print(f"✅ Папка {output_dir} успешно создана!")
            else:
                print(f"⚠️ Папка {output_dir} уже существует.")
            # Формируем имя файла (убираем расширение .fcs)
            output_filename = f"train_data_{os.path.splitext(tube)[0]}.csv"
            output_path = os.path.join(output_dir, output_filename)

            # Сохраняем результат
            combined_data.to_csv(output_path, index=False)
            print(f"\n✅ Данные сохранены в {output_path}")
        else:
            print(f"\n⚠️ Файл {tube} не найден в указанных директориях.")

In [None]:
# Функция для построения и обучения UMAP
def plot_umap_from_dataframe(data, n_neighbors=20, min_dist=0.1, spread=5, metric="manhattan", n_epochs=None):
    """
    Выполняет UMAP для переданного DataFrame и строит UMAP-мап.

    Parameters:
    - data (pd.DataFrame): Исходный DataFrame (может содержать нечисловые данные).
    - n_neighbors (int): Количество соседей для UMAP.
    - min_dist (float): Минимальное расстояние между точками на карте.
    - spread (float): Параметр, влияющий на расстояние между кластерами.
    - metric (str): Метрика расстояния (по умолчанию "manhattan").
    - n_epochs (int, optional): Количество эпох для UMAP (по умолчанию авто).

    Returns:
    - umap_df (pd.DataFrame): DataFrame с координатами UMAP.
    - reducer (umap.UMAP): Обученный UMAP-классификатор.
    """

    # Оставляем только числовые столбцы
    numeric_data = data.select_dtypes(include=[np.number])

    # Проверяем, что после фильтрации остались числовые данные
    if numeric_data.empty:
        raise ValueError("После удаления нечисловых колонок DataFrame оказался пустым!")

    # Подавляем предупреждения
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", message=".*n_jobs value 1 overridden.*")

        # Инициализация и обучение UMAP
        reducer = umap.UMAP(
            n_components=2,
            n_neighbors=n_neighbors,
            random_state=42,
            min_dist=min_dist,
            spread=spread,
            metric=metric,
            n_epochs=n_epochs,  # <-- Добавляем настройку эпох
            verbose=True  # <-- Для логов о процессе обучения
        )
        umap_result = reducer.fit_transform(numeric_data)

    # Создаём DataFrame с результатами UMAP
    umap_df = pd.DataFrame(umap_result, columns=['UMAP1', 'UMAP2'])

    # Визуализация
    plt.figure(figsize=(8, 6))
    plt.scatter(umap_df['UMAP1'], umap_df['UMAP2'], alpha=0.6, s=10)
    plt.title(f'UMAP Map (metric={metric}, n_epochs={n_epochs})')
    plt.xlabel('UMAP Component 1')
    plt.ylabel('UMAP Component 2')
    plt.grid(True)
    plt.show()

    return umap_df, reducer

In [None]:
# Функция для обучения UMAP и построения чарта, а также сохранения результатов UMAP и классификатора
# Обращается к предшествующей функции
def process_and_save_umap(dirs, tube, n_neighbors=25, min_dist=0.001, spread=1, metric="manhattan", n_epochs=None):

    """
    Автоматически проходит по файлам в train_data, строит UMAP и сохраняет результаты в Saved_UMAP_n_csv.

    Parameters:
    - dirs (list): Список директорий (используем для поиска Examples).
    - tube (list): Список файлов, с которыми работали (используем для фильтрации).
    - n_neighbors, min_dist, spread, metric: параметры для UMAP.
    - n_epochs (int, optional): Количество эпох для UMAP (по умолчанию авто).
    - parallel (bool): Использовать ли параллельные вычисления.
    """

    # Определяем базовую папку "Examples"
    examples_folder = os.path.dirname(dirs[0])  # Поднимаемся на уровень вверх
    train_data_folder = os.path.join(examples_folder, "train_data")
    save_folder = os.path.join(examples_folder, "Saved_UMAP_n_csv")

    # Проверяем существование train_data
    if not os.path.exists(train_data_folder):
        print(f"Ошибка: Папка {train_data_folder} не найдена!")
        return  

    # Создаём папку для сохранения, если её нет
    os.makedirs(save_folder, exist_ok=True)

    # Получаем список CSV-файлов в train_data
    files = [f for f in os.listdir(train_data_folder) if f.endswith(".csv")]
    if not files:
        print("В папке train_data нет CSV-файлов!")
        return

    # Проходим только по файлам, которые соответствуют tube
    for file in files:
        # Проверяем, относится ли файл к tube (по названию без train_data_)
        file_base_name = file.replace("train_data_", "").replace(".csv", "")
        if not any(tube_name.startswith(file_base_name) for tube_name in tube):
            print(f"Пропускаем файл {file}, так как он не в tube.")
            continue  

        file_path = os.path.join(train_data_folder, file)
        print(f"\nОбрабатываем файл: {file}")

        # Загружаем данные
        df = pd.read_csv(file_path)

        # Строим UMAP
        umap_df, reducer = plot_umap_from_dataframe(
            df,
            n_neighbors=n_neighbors,
            min_dist=min_dist,
            spread=spread,
            metric=metric,
            n_epochs=n_epochs,  # <-- Передаём epochs
        )

        # Создаём имена файлов для сохранения
        base_name = os.path.splitext(file)[0]  # Убираем расширение .csv
        df_umap_filename = f"{base_name}_umap.csv"
        reducer_filename = f"reducer_{base_name}.pkl"

        # Полные пути
        df_umap_path = os.path.join(save_folder, df_umap_filename)
        reducer_path = os.path.join(save_folder, reducer_filename)

        # Объединяем df и umap_df
        df_umap = pd.concat([df, umap_df], axis=1)

        # Сохраняем данные
        df_umap.to_csv(df_umap_path, index=False)  

        with open(reducer_path, "wb") as f:
            pickle.dump(reducer, f)

        print(f"Данные сохранены:\n - {df_umap_path}\n - {reducer_path}")

In [None]:
# Функция для использования обученного UMAP
def fit_trained_umap(reducer, new_data):
    """
    Использует обученный UMAP для применения к новому набору данных.
    
    Parameters:
    - reducer (umap.UMAP): Обученный UMAP-классификатор.
    - new_data (pd.DataFrame): Новый набор данных для применения UMAP.
    
    Returns:
    - umap_df (pd.DataFrame): DataFrame с результатами UMAP для нового набора данных.
    """
    # Проверяем, что данные являются числовыми
    if not all(new_data.dtypes.apply(lambda x: np.issubdtype(x, np.number))):
        raise ValueError("Все столбцы в DataFrame должны быть числовыми.")
    
    # Применение обученного UMAP
    umap_result = reducer.transform(new_data)

    # Создание DataFrame с результатами
    umap_df = pd.DataFrame(umap_result, columns=[f'UMAP{i+1}' for i in range(reducer.n_components)])

    # Построение UMAP-мапа
    plt.figure(figsize=(8, 6))
    if reducer.n_components >= 2:
        plt.scatter(umap_df['UMAP1'], umap_df['UMAP2'], alpha=0.6, s=10)
        plt.title('UMAP Map (New Data)')
        plt.xlabel('UMAP Component 1')
        plt.ylabel('UMAP Component 2')
    else:
        plt.scatter(range(len(umap_df)), umap_df['UMAP1'], alpha=0.6, s=10)
        plt.title('UMAP Map (1D Projection)')
        plt.xlabel('Samples')
        plt.ylabel('UMAP Component 1')
    plt.grid(True)
    plt.show()

    return umap_df

In [None]:
def process_validation_data(dirs, tube):
    """
    Обрабатывает новые данные из .fcs файлов, применяет к ним функции предварительной обработки,
    загружает соответствующий UMAP-классификатор и строит UMAP-чарт с подписью – названием файла.
    
    Parameters:
    - dirs (list of str): Список директорий, в которых находятся папки с данными (например, папка MM_subpopulations).
    - tube (list of str): Список строк для поиска в названиях файлов (используем префикс до первой точки).
    
    Результат:
    Для каждого найденного .fcs файла, удовлетворяющего условию, строится UMAP-чарт,
    подписанный названием данного файла.
    """
    # Для каждого указанного пути в списке dirs
    for base_dir in dirs:
        # Получаем список папок внутри base_dir
        for folder in os.listdir(base_dir):
            folder_path = os.path.join(base_dir, folder)
            if not os.path.isdir(folder_path):
                continue
            # Сохраним название текущей папки (если нужно для логирования или дальнейшей обработки)
            current_folder_name = folder  
            # Проходим по всем файлам в текущей папке
            for filename in os.listdir(folder_path):
                if not filename.endswith(".fcs"):
                    continue
                # Для каждого элемента из tube, сравниваем префиксы (до первой точки)
                for tube_item in tube:
                    tube_prefix = tube_item.split('.')[0]
                    if tube_prefix in filename:
                        # Если найдено совпадение, формируем полный путь к файлу
                        file_path = os.path.join(folder_path, filename)
                        try:
                            # Открываем .fcs файл с помощью fcsparser
                            meta, data = parse(file_path)
                            del meta  # метаданные не нужны, удаляем
                        except Exception as e:
                            print(f"Ошибка при парсинге файла {file_path}: {e}")
                            continue
                        
                        # Применяем функцию удаления выбросов
                        try:
                            data_no_outliers = remove_outliers_percentile(data, 
                                                                          columns=None, 
                                                                          lower_quantile=0.0025, 
                                                                          upper_quantile=0.99)
                        except Exception as e:
                            print(f"Ошибка при удалении выбросов для файла {filename}: {e}")
                            continue
                        
                        # Применяем логарифмическое преобразование (arcsinh)
                        try:
                            data_normalized = log_arcsinh_transform_data(data_no_outliers, 
                                                                           cofactor_mapping=None)
                        except Exception as e:
                            print(f"Ошибка при нормализации данных для файла {filename}: {e}")
                            continue
                        
                        # Формируем путь к классификаторам:
                        # Из base_dir (например, ...\Examples\MM_subpopulations) поднимаемся на уровень выше (...\Examples)
                        # и заходим в папку Saved_UMAP_n_csv
                        examples_dir = os.path.dirname(base_dir)
                        classifier_dir = os.path.join(examples_dir, "Saved_UMAP_n_csv")
                        
                        # Поиск файла классификатора (.pkl), имя которого содержит tube_prefix
                        classifier_file = None
                        if os.path.isdir(classifier_dir):
                            for clf in os.listdir(classifier_dir):
                                if clf.endswith(".pkl") and tube_prefix in clf:
                                    classifier_file = os.path.join(classifier_dir, clf)
                                    break
                        else:
                            print(f"Папка с классификаторами не найдена: {classifier_dir}")
                            continue
                        
                        if classifier_file is None:
                            print(f"Классификатор для префикса '{tube_prefix}' не найден. Пропуск файла {filename}.")
                            continue
                        
                        # Загружаем UMAP-классификатор
                        try:
                            with open(classifier_file, "rb") as f:
                                reducer = pickle.load(f)
                        except Exception as e:
                            print(f"Ошибка при загрузке классификатора {classifier_file}: {e}")
                            continue
                        
                        # Применяем UMAP-классификатор к нормализованным данным
                        try:
                            umap_result = reducer.transform(data_normalized)
                        except Exception as e:
                            print(f"Ошибка при применении UMAP к данным файла {filename}: {e}")
                            continue
                        
                        # Формируем DataFrame с результатами UMAP
                        umap_df = pd.DataFrame(umap_result, 
                                               columns=[f'UMAP{i+1}' for i in range(reducer.n_components)])
                        
                        # Строим UMAP-чарт
                        plt.figure(figsize=(8, 6))
                        if reducer.n_components >= 2:
                            plt.scatter(umap_df['UMAP1'], umap_df['UMAP2'], alpha=0.6, s=10)
                            plt.xlabel('UMAP Component 1')
                            plt.ylabel('UMAP Component 2')
                        else:
                            plt.scatter(range(len(umap_df)), umap_df['UMAP1'], alpha=0.6, s=10)
                            plt.xlabel('Samples')
                            plt.ylabel('UMAP Component 1')
                        
                        # Подписываем график названием файла
                        plt.title(filename)
                        plt.grid(True)
                        plt.show()
                        
                        # Если файл совпал с одним из tube, то не нужно проверять оставшиеся элементы списка
                        break

In [None]:
# Процессинг тестовых данных
def process_test_data(dataframes, min_events=10000):
    """
    Обрабатывает данные:
    1. Обрезает до min_events строк (если возможно).
    2. Удаляет ненужные столбцы.

    Parameters:
    - dataframes (pd.DataFrame or list of pd.DataFrame): Один или список датафреймов.
    - min_events (int): Минимальное количество событий для обрезки (по умолчанию 100 000).

    Returns:
    - pd.DataFrame или list из DataFrame (если вход был списком).
    """
    single_input = isinstance(dataframes, pd.DataFrame)  # Проверяем, передан ли один DataFrame
    if single_input:
        dataframes = [dataframes]  # Оборачиваем в список для единообразной обработки

    print(f"⚙️  Начинаем процессинг данных...")

    # Обрезаем до нужного количества строк (если возможно)
    processed_data = [
        df.sample(n=min(min_events, len(df)), random_state=42).reset_index(drop=True)
        for df in dataframes
    ]
    print(f"✅ Данные обрезаны (или оставлены без изменений, если строк меньше {min_events})")

    # Удаляем столбцы, оканчивающиеся на '-A', 'Width', 'Time'
    processed_data = [
        df.drop(columns=[col for col in df.columns if col.endswith(('-A', 'Width', 'Time'))], errors="ignore") 
        for df in processed_data
    ]
    print(f"✅ Удалены столбцы: '-A', 'Width', 'Time' (если были)")

    return processed_data[0] if single_input else processed_data  # Возвращаем DataFrame, если был один вход

In [None]:
# Отображаем и колоризуем молекулы CD и проводим фильтрацию
def cd_color_gradients(data, cd='none', sample_type='', sample_name='', transparency=0.5, subsample=0.1):
    """
    Строит scatter plot UMAP1 vs UMAP2. 
    Если указан cd, то градиентно раскрашивает точки.
    Позволяет фильтровать данные по sample_type и sample_name.

    Parameters:
    - data (pd.DataFrame): Датафрейм с колонками 'UMAP1' и 'UMAP2'.
    - cd (str): Название колонки для градиента (по умолчанию 'none' — без окрашивания).
    - sample_type (str): Фильтрация по типу образца (по умолчанию без фильтрации).
    - sample_name (str): Фильтрация по имени образца (по умолчанию без фильтрации).
    - transparency (float): Прозрачность точек (по умолчанию 0.5).
    - subsample (float): Доля точек для отображения (от 0 до 1, по умолчанию 0.1).

    Returns:
    - None (строит график)
    """
    if not (0 < subsample <= 1):
        raise ValueError("subsample должен быть от 0 до 1")

    # Фильтруем по sample_type, если передан параметр
    if sample_type and sample_type in data['sample_type'].unique():
        data = data[data['sample_type'] == sample_type]

    # Фильтруем по sample_name, если передан параметр
    if sample_name and sample_name in data['sample_name'].unique():
        data = data[data['sample_name'] == sample_name]

    # Проверяем, осталось ли что-то после фильтрации
    if data.empty:
        print("❌ После фильтрации данных не осталось. Проверьте параметры sample_type и sample_name.")
        return

    # Выбираем подмножество данных
    n_samples = max(int(len(data) * subsample), 1000)  # Гарантируем минимум 1000 точек
    sampled_data = data.sample(n=n_samples, random_state=42)

    plt.figure(figsize=(8, 6))

    if cd in data.columns:
        # Создаем пользовательскую цветовую карту
        colors = [(0, 'blue'), (0.5, 'yellow'), (1, 'red')]  # Синий -> Желтый -> Красный
        cmap = LinearSegmentedColormap.from_list('CustomMap', colors, N=256)

        plt.scatter(sampled_data['UMAP1'], sampled_data['UMAP2'], 
                    c=sampled_data[cd], cmap=cmap, alpha=transparency)
        plt.colorbar(label=cd)  # Добавляем цветовую шкалу
    else:
        plt.scatter(sampled_data['UMAP1'], sampled_data['UMAP2'], 
                    color='gray', alpha=transparency)

    plt.xlabel("UMAP1")
    plt.ylabel("UMAP2")
    title = f"UMAP Projection {('with ' + cd) if cd in data.columns else ''}"
    if sample_type:
        title += f" | sample_type={sample_type}"
    if sample_name:
        title += f" | sample_name={sample_name}"
    plt.title(title)
    plt.grid(True)
    plt.show()

In [None]:
# Проходим по .csv-файла с сохранёнными данными UMAP, и рассматриваем, как распределена величины флуоресценции в UMAP-кластерах
# Возможна фильтрация
def process_umap_csv_and_plot(dirs, tube, sample_type=False, sample_name=False):
    # Проходим по списку tube
    for sample_name_in_tube in tube:
        # Строим путь к .csv файлу на основе sample_name
        # Заходим на один уровень выше папки dirs[0] и в \Saved_UMAP_n_csv
        saved_umap_dir = os.path.join(os.path.dirname(dirs[0]), 'Saved_UMAP_n_csv')

        # Ищем .csv файл с нужным именем и содержащим слово "umap" в названии
        csv_file = None
        for file in os.listdir(saved_umap_dir):
            # Проверка на наличие префикса и "umap" в названии файла
            if sample_name_in_tube.split('.')[0] in file and 'umap' in file.lower() and file.endswith('.csv'):
                csv_file = os.path.join(saved_umap_dir, file)
                break

        if csv_file is None:
            print(f"Не найден .csv файл для {sample_name_in_tube} в папке {saved_umap_dir}")
            continue

        # Загружаем данные из найденного файла
        data = pd.read_csv(csv_file)

        # Сохраняем список всех столбцов, кроме столбцов для типа образца и 'UMAP1' и 'UMAP2'
        columns_to_plot = [col for col in data.columns if col not in ['sample_type', 'sample_name', 'UMAP1', 'UMAP2']]

        # Если sample_type=True, то перебираем уникальные значения в столбце sample_type
        if sample_type:
            unique_sample_types = data['sample_type'].unique()
            for sample in unique_sample_types:
                print(f"Построение графиков для sample_type: {sample}")
                # Фильтруем данные по текущему значению sample_type
                filtered_data = data[data['sample_type'] == sample]

                # Для каждого столбца строим график
                for col in columns_to_plot:
                    cd_color_gradients(filtered_data, cd=col, sample_type=sample, sample_name=sample_name_in_tube)
        else:
            # Если sample_type=False, строим графики для всех данных без фильтрации
            for col in columns_to_plot:
                cd_color_gradients(data, cd=col, sample_type=sample_type, sample_name=sample_name_in_tube)

In [None]:
# Функция для настройки полигонов
def umap_populations_plot(path, dirs, tube, polygons, subsample=0.1, sample_type=None, sample_name=None):
    # Извлекаем фрагмент из tube до первой точки
    tube_prefix = tube.split('.')[0]
    
    # 1. Строим путь к папке Saved_UMAP_n_csv через первый элемент из dirs
    saved_path = os.path.join(os.path.dirname(dirs[0]), "Saved_UMAP_n_csv")
    
    # 2. Ищем нужный CSV-файл
    matching_files = [f for f in os.listdir(saved_path) if tube_prefix in f and "umap" in f.lower() and f.endswith(".csv")]

    if not matching_files:
        raise FileNotFoundError(f"Файл с фрагментом '{tube_prefix}' не найден в {saved_path}")
    
    csv_file = os.path.join(saved_path, matching_files[0])
    df = pd.read_csv(csv_file)

    # 3. Фильтрация данных, если заданы sample_type и sample_name
    if sample_type is not None:
        df = df[df["sample_type"] == sample_type]
    if sample_name is not None:
        df = df[df["sample_name"] == sample_name]

    # Если после фильтрации данных нет, выбрасываем предупреждение
    if df.empty:
        print(f"⚠️ После фильтрации данных ({sample_type=}, {sample_name=}) не осталось! Возвращаю пустой DataFrame.")
        return pd.DataFrame()

    # 4. Выбираем случайную подвыборку
    df_sampled = df.sample(frac=subsample, random_state=42) if subsample < 1.0 else df
    
    # 5. Строим график
    fig, ax = plt.subplots(figsize=(8, 6))
    ax.scatter(df_sampled["UMAP1"], df_sampled["UMAP2"], s=5, alpha=0.5, label="Data")
    
    # 6. Добавляем полигоны и метки
    res = df.copy()
    legend_entries = set()  # Храним добавленные (имя, цвет)

    for poly in polygons:
        name, color, *coords = poly
        polygon = np.array(coords)
        poly_patch = Polygon(polygon, edgecolor=color, facecolor=color, alpha=0.3)

        # Добавляем в легенду только если такой (имя, цвет) ещё не был добавлен
        if (name, color) not in legend_entries:
            legend_entries.add((name, color))
            poly_patch.set_label(name)  # Добавляем имя только один раз

        ax.add_patch(poly_patch)

        # Проверяем, какие точки попадают внутрь полигона
        path = Path(polygon)
        inside = path.contains_points(df[["UMAP1", "UMAP2"]].values)
        if name in res:
            res[name] |= inside.astype(np.int8)  # Логическое ИЛИ
        else:
            res[name] = inside.astype(np.int8)  # Добавляем как int8

    ax.legend()
    plt.xlabel("UMAP1")
    plt.ylabel("UMAP2")
    plt.title(f"UMAP Populations: {tube}")
    plt.show()

    # Сохранение координат полигонов с индексом n
    polygon_data = []
    polygon_counts = {}  # Считаем количество одинаковых названий
    
    for i, poly in enumerate(polygons):
        name, color, *coords = poly
        if name not in polygon_counts:
            polygon_counts[name] = 1
        else:
            polygon_counts[name] += 1
    
        poly_index = polygon_counts[name]  # Индексируем повторяющиеся названия
    
        for x, y in coords:
            polygon_data.append([name, color, x, y, poly_index])
    
    polygon_df = pd.DataFrame(polygon_data, columns=["name", "color", "x", "y", "n"])
    polygon_df.to_csv(os.path.join(saved_path, f"polygons_{tube_prefix}.csv"), index=False)

    return res

In [None]:
def process_test_data(path, tube, pdf=False, fcs_del=True):
    reports_dir = os.path.join(path, "Reports")
    os.makedirs(reports_dir, exist_ok=True)  # Создаём папку Reports в path

    saved_path = os.path.join(path, "Examples", "Saved_UMAP_n_csv")  # Исправленный путь к классификатору

    # Обходим все папки в корневом каталоге, исключая служебные папки
    for folder_name in sorted(os.listdir(path)):
        if folder_name in ["Reports", "Examples"]:
            continue

        folder_path = os.path.join(path, folder_name)
        if not os.path.isdir(folder_path):
            continue

        # Собираем файлы по порядку tube
        ordered_files = []
        for tube_item in tube:
            tube_prefix = tube_item.split('.')[0]
            files_for_prefix = [f for f in os.listdir(folder_path) if tube_prefix in f and f.endswith(".fcs")]
            ordered_files.extend(files_for_prefix)

        if not ordered_files:
            continue  # Пропускаем папку, если нет подходящих .fcs файлов

        pdf_pages = None
        if pdf:
            pdf_path = os.path.join(reports_dir, f"Report of {folder_name}.pdf")
            pdf_pages = PdfPages(pdf_path)
            csv_path = os.path.join(reports_dir, f"Report_of_{folder_name}.csv")

        print(f"\nОбрабатываю папку: {folder_name}")
        for filename in ordered_files:
            file_path = os.path.join(folder_path, filename)

            # Определяем tube_prefix
            tube_prefix = None
            for tube_item in tube:
                prefix = tube_item.split('.')[0]
                if prefix in filename:
                    tube_prefix = prefix
                    break
            if tube_prefix is None:
                print(f"Префикс для файла {filename} не найден.")
                continue

            try:
                meta, data = parse(file_path)
                del meta
                data_no_outliers = remove_outliers_percentile(data)
                data_normalized = log_arcsinh_transform_data(data_no_outliers)
            except Exception as e:
                print(f"Ошибка при обработке {filename}: {e}")
                continue

            classifier_file = None
            if os.path.isdir(saved_path):
                for clf in os.listdir(saved_path):
                    if clf.endswith(".pkl") and tube_prefix in clf:
                        classifier_file = os.path.join(saved_path, clf)
                        break
            if classifier_file is None:
                print(f"Классификатор для {tube_prefix} не найден в {saved_path}.")
                continue

            try:
                with open(classifier_file, "rb") as f:
                    reducer = pickle.load(f)
                umap_result = reducer.transform(data_normalized)
            except Exception as e:
                print(f"Ошибка при применении UMAP {filename}: {e}")
                continue

            umap_df = pd.DataFrame(umap_result, columns=[f'UMAP{i+1}' for i in range(reducer.n_components)])

            polygon_file = os.path.join(saved_path, f"polygons_{tube_prefix}.csv")
            if not os.path.exists(polygon_file):
                print(f"Файл с полигонами {polygon_file} не найден.")
                continue

            try:
                polygons_df = pd.read_csv(polygon_file)
            except Exception as e:
                print(f"Ошибка при загрузке полигонов {polygon_file}: {e}")
                continue

            # ---- Визуализация UMAP ----
            fig, ax = plt.subplots(figsize=(10, 8))
            ax.scatter(umap_df["UMAP1"], umap_df["UMAP2"], s=5, alpha=0.5, label="Data")

            umap_df_result = umap_df.copy()
            polygon_groups = {}
            added_legend_items = set()

            for (name, n) in polygons_df[["name", "n"]].drop_duplicates().itertuples(index=False):
                poly_data = polygons_df[(polygons_df["name"] == name) & (polygons_df["n"] == n)]
                polygon = np.array(list(zip(poly_data["x"], poly_data["y"])))
                poly_color = poly_data["color"].iloc[0]
                legend_key = (name, poly_color)
                poly_patch = Polygon(polygon, edgecolor=poly_color, facecolor="none", lw=2)
                ax.add_patch(poly_patch)
                if legend_key not in added_legend_items:
                    ax.plot([], [], color=poly_color, label=name)
                    added_legend_items.add(legend_key)
                path_obj = Path(polygon)
                inside = path_obj.contains_points(umap_df[["UMAP1", "UMAP2"]].values).astype(np.int32)
                if name in polygon_groups:
                    polygon_groups[name] |= inside
                else:
                    polygon_groups[name] = inside

            for name, inside_mask in polygon_groups.items():
                umap_df_result[name] = inside_mask

            ax.legend()
            plt.xlabel("UMAP1")
            plt.ylabel("UMAP2")
            plt.title(f"Тестовые данные: {filename}")

            # ---- Подготовка статистики ----
            stats = umap_df_result.describe().T.iloc[2:, :2]
            if "Trash" in umap_df_result.columns:
                total_cells = len(umap_df_result)
                trash_count = (umap_df_result["Trash"] == 1).sum()
                granulocytes_count = (umap_df_result["Granulocytes"] == 1).sum() if "Granulocytes" in umap_df_result.columns else 0
                mononuclears = total_cells - trash_count - granulocytes_count
                stats.loc["Mononuclears", ["count", "mean"]] = [mononuclears, mononuclears / total_cells * 100]
                if "Granulocytes" in umap_df_result.columns:
                    stats.loc["Granulocytes", ["count", "mean"]] = [granulocytes_count, granulocytes_count / (total_cells - trash_count) * 100]
                for pop in polygon_groups.keys():
                    if pop in umap_df_result.columns and pop not in ["Trash", "Granulocytes"]:
                        count = umap_df_result[pop].sum()
                        stats.loc[pop, ["count", "mean"]] = [count, count / mononuclears * 100]

            stats.columns = ['Абс. количество', 'Содержание, %']
            stats['Абс. количество'] = stats['Абс. количество'].astype(np.int32)
            stats['Содержание, %'] = stats['Содержание, %'].astype(np.float32).round(4)
            stats = stats.drop("Trash", errors="ignore")

            print(f"\nСтатистика для пробирки {filename} (пациент: {folder_name}):")
            plt.show()
            display(stats)
            plt.close(fig)

            if pdf and pdf_pages:
                pdf_pages.savefig(fig)
                plt.close(fig)
                fig_table, ax_table = plt.subplots(figsize=(8, 4))
                ax_table.axis('tight')
                ax_table.axis('off')
                table_data = stats.reset_index().values.tolist()
                table_data.insert(0, ["Группа", "Абс. количество", "Содержание, %"])
                ax_table.table(cellText=table_data, loc="center", cellLoc="center")
                plt.title(f"Статистика для образца {folder_name}.")
                pdf_pages.savefig(fig_table)
                plt.close(fig_table)
            if pdf:
                stats.to_csv(csv_path, mode='a', header=not os.path.exists(csv_path))
                print(f"Отчёт сохранён в {csv_path}")

        if pdf and pdf_pages:
            pdf_pages.close()
            print(f"Отчёт сохранён в {pdf_path}")

        if fcs_del:
            shutil.rmtree(folder_path)
            print(f"Удалена папка: {folder_path}")

In [None]:
# Функция для формирования отчётов
def make_report(path, blanks, coordinates, clean=True):
    reports_folder = os.path.join(path, "Reports")
    
    # Получаем список CSV-файлов, убирая "Report_of_" из начала
    report_files = [f for f in os.listdir(reports_folder) if f.endswith(".csv")]
    report_names = [f.replace("Report_of_", "").replace(".csv", "") for f in report_files]

    # Загружаем файл координат, не превращая первый столбец в индекс
    coordinates_file = os.path.join(coordinates, "Coordinates.xlsx")
    df_coords = pd.read_excel(coordinates_file, index_col=None)

    for report_name in report_names:
        csv_path = os.path.join(reports_folder, f"Report_of_{report_name}.csv")

        # Ищем подходящий XLSX-файл в папке blanks (начинается с report_name, затем любые символы)
        matching_files = [f for f in os.listdir(blanks) if re.match(fr"^{re.escape(report_name)}.*\.xlsx$", f)]
        if not matching_files:
            continue

        # Берём первый найденный файл (если их несколько)
        xlsx_path = os.path.join(blanks, matching_files[0])
        if not os.path.exists(csv_path):
            continue

        # Загружаем CSV-файл с учетом хедера
        df_report = pd.read_csv(csv_path, header=0)
        
        # Получаем значение для фильтрации из первого столбца CSV
        filter_value = df_report.iloc[0, 0]
        
        # Фильтруем координаты по столбцу "Col0str1_in_report"
        df_filtered = df_coords[df_coords["Col0str1_in_report"] == filter_value]
        if df_filtered.empty:
            continue

        # Открываем XLSX и сохраняем размеры ячеек
        wb = load_workbook(xlsx_path)
        ws = wb.active
        column_widths = {col: ws.column_dimensions[col].width for col in ws.column_dimensions}
        row_heights = {row: ws.row_dimensions[row].height for row in ws.row_dimensions}

        # Перенос данных с проверкой на отсутствие данных в Data_in_report_position
        for _, row in df_filtered.iterrows():
            blank_cell = str(row["Data_in_blank_position"]).strip()
            if pd.isna(row["Data_in_report_position"]):
                ws[blank_cell] = 0
                ws[blank_cell].number_format = '0.00'
            else:
                report_index = int(row["Data_in_report_position"])
                if report_index < len(df_report):
                    # Берем данные из столбца "Mean, %" (предполагается, что это третий столбец)
                    value_to_copy = df_report.iloc[report_index, 2]
                    ws[blank_cell] = value_to_copy
                    ws[blank_cell].number_format = '0.00'
                    
        # Восстанавливаем размеры ячеек
        for col, width in column_widths.items():
            ws.column_dimensions[col].width = width
        for row, height in row_heights.items():
            ws.row_dimensions[row].height = height

        # Сохраняем XLSX
        wb.save(xlsx_path)
        wb.close()

        # Удаляем CSV, если clean=True
        if clean:
            os.remove(csv_path)

# Процессинг данных, обучение и сохранение UMAP-классификаторов

In [None]:
# Укажем адреса папок с сырыми данными, пробирку и вызовем функцию для организации данных перед обучением UMAP
dirs = [path + r"\Examples\Normal_MM", 
        path + r"\Examples\MM"]
tube = [r"10_56_45_19_138_38.fcs", r"λ_κ_45_117_138_38.fcs", r"xx_79a_45_Ki67_3_xx.fcs"]

In [None]:
%%time
# Организация данных для обучения UMAP
train_umap_data_organiser(dirs, tube, size=10000)

In [None]:
%%time
# Автоматически обучаем классификаторы UMAP по полученным данным, и сохраняем
process_and_save_umap(dirs, 
                      tube, 
                      n_neighbors=25, 
                      min_dist=0.001, 
                      metric="manhattan", 
                      n_epochs=None,            # n_epochs=None по умолчанию. Для более 10к точек это 200
                     )

# Анализ результатов работы UMAP-классификаторов

In [None]:
dirs

In [None]:
%%time
# Вызов функции с передачей нужных директорий и списка tube
process_umap_csv_and_plot(dirs, tube)

In [None]:
%%time
# Вызов функции с передачей нужных директорий и списка tube, и с фильтрацией по типу образца
process_umap_csv_and_plot(dirs, tube, sample_type=True)

# Визуализация отдельных популяций

In [None]:
dirs = [path + r"\Examples\MM_subpopulations"]  # Валидационные данные находятся в отдельной папке

In [None]:
%%time
process_validation_data(dirs, tube=[r"10_56_45_19_138_38.fcs"])

In [None]:
%%time
process_validation_data(dirs, tube=[r"λ_κ_45_117_138_38.fcs"])

In [None]:
%%time
process_validation_data(dirs, tube=[r"xx_79a_45_Ki67_3.fcs"])

# Анализ реального случая

In [None]:
dirs = [path + r"\Examples\Test_MM"]

In [None]:
%%time
process_validation_data(dirs, tube=[r"10_56_45_19_138_38.fcs"])

In [None]:
%%time
process_validation_data(dirs, tube=[r"λ_κ_45_117_138_38.fcs"])

In [None]:
%%time
process_validation_data(dirs, tube=[r"xx_79a_45_Ki67_3.fcs"])

# Эксперименты по разметке

In [None]:
dirs = [path + r"\Examples\Normal_MM", 
        path + r"\Examples\MM"]

In [None]:
%%time
# Настраиваем полигоны
res = umap_populations_plot(
    path=path, dirs=dirs,
    tube="10_56_45_19_138_38.fcs",
    polygons=[
        ["CD138+CD56+CD38+", "gray", (2, 15), (4, 15), (5, 13), (6.5, 11), (5.5, 9.3), (4, 9)],
        ["CD138+CD38+", "red", (2, 15), (4, 15), (5, 13), (6.5, 11), (5.5, 9.3), (4, 9)],
        ["Plasma cells", "orange", (7.5, 8), (8.5, 9), (9, 8.6), (9.5, 8), (10, 6), (8.5, 6)],
        ["Plasma cells", "orange", (4, 9), (6.8, 9.5), (5, 6)],
        ["CD138+", "red", (2, 15), (4, 15), (5, 13), (6.5, 11), (5.5, 9.3), (4, 9)],
        ["CD138+", "red", (7.5, 8), (8.5, 9), (9, 8.6), (9.5, 8), (10, 6), (8.5, 6)],
        ["CD38+", "brown", (7.5, 8), (8.5, 9), (9, 8.6), (9.5, 8), (10, 6), (8.5, 6)],
        ["CD138+CD38+", "red", (7.5, 8), (8.5, 9), (9, 8.6), (9.5, 8), (10, 6), (8.5, 6)],
        ["CD38+", "brown", (2, 15), (4, 15), (5, 13), (6.5, 11), (5.5, 9.3), (4, 9)],
        ["CD10+", "green", (6.5, 11.5), (7.5, 13), (8, 12), (7.5, 11)],
        ["CD56+", "yellow", (2, 15), (4, 15), (5, 13), (6.5, 11), (5.5, 9.3), (4, 9)],
        ["Lymphocytes", "teal", (5, -2), (9, 0), (12, -5)],
        ["CD56+", "yellow", (6, -2), (8.5,-1), (9, -4)],
        ["CD19+", "blue", (8.5, -1), (9, -0.5), (9.5, -1.5), (8.65, -1.75)],
        ["CD19+", "blue", (4, 9), (6.8, 9.5), (5, 6)],
        ["CD38+", "brown", (7, -2.3), (7, -1.3), (8.5, -1.3), (8.5, -2.3)],
        ["Monocytes", "blue", (10, 5), (10.5, 7.5), (11, 7.5), (11, 5), (11.5, 4), (11, 3), (10.2, 4.2)],
        ["Granulocytes", "orange", (12, 9), (18, 9), (18, 1.5), (16.5, 1.5), (15.5, 5), (13, 7.3), (11.5, 8)],
        ["Granulocytes", "yellow", (11.5, 8), (13, 7.3), (15.5, 5), (16.5, 1.5), (12.5, 2), (11.5, 4), (11, 5)],
        ["Trash", "black", (-7, 15), (0, 15), (3, 7), (5, 6), (7, 10), (8, 12), (9, 16), 
        (12.5, 16), (10, 12.5), (8, 10), (7.5, 8), (8, 7), (9.7, 3), (5, -2), (5, -8), (-7, -7)]
    ],
    subsample=0.1
)

In [None]:
res.columns

In [None]:
res['Trash'].value_counts()

In [None]:
res.describe().T

In [None]:
types = res.sample_type.unique().tolist()
samples = res.sample_name.unique().tolist()

In [None]:
%%time
for i in types:
    for j in samples:
        res = umap_populations_plot(
            path=path, dirs=dirs,
            tube="10_56_45_19_138_38.fcs",
            sample_type=i,
            sample_name=j,
            polygons=[
                ["CD138+CD56+CD38+", "gray", (2, 15), (4, 15), (5, 13), (6.5, 11), (5.5, 9.3), (4, 9)],
                ["CD138+CD38+", "red", (2, 15), (4, 15), (5, 13), (6.5, 11), (5.5, 9.3), (4, 9)],
                ["Plasma cells", "orange", (7.5, 8), (8.5, 9), (9, 8.6), (9.5, 8), (9.7, 7), (10, 6), (8.5, 6)],
                ["Plasma cells", "orange", (4, 9), (6.8, 9.5), (5, 6)],
                ["CD138+", "red", (2, 15), (4, 15), (5, 13), (6.5, 11), (5.5, 9.3), (4, 9)],
                ["CD138+", "red", (7.5, 8), (8.5, 9), (9, 8.6), (9.5, 8), (10, 6), (8.5, 6)],
                ["CD38+", "brown", (7.5, 8), (8.5, 9), (9, 8.6), (9.5, 8), (10, 6), (8.5, 6)],
                ["CD138+CD38+", "red", (7.5, 8), (8.5, 9), (9, 8.6), (9.5, 8), (10, 6), (8.5, 6)],
                ["CD38+", "brown", (2, 15), (4, 15), (5, 13), (6.5, 11), (5.5, 9.3), (4, 9)],
                ["CD10+", "green", (6.5, 11.5), (7.5, 13), (8, 12), (7.5, 11)],
                ["CD56+", "yellow", (2, 15), (4, 15), (5, 13), (6.5, 11), (5.5, 9.3), (4, 9)],
                ["Lymphocytes", "teal", (5, -2), (9, 0), (12, -5)],
                ["CD56+", "yellow", (6, -2), (8.5,-1), (9, -4)],
                ["CD19+", "blue", (8.5, -1), (9, -0.5), (9.5, -1.5), (8.65, -1.75)],
                ["CD19+", "blue", (4, 9), (6.8, 9.5), (5, 6)],
                ["CD38+", "brown", (7, -2.3), (7, -1.3), (8.5, -1.3), (8.5, -2.3)],
                ["Monocytes", "blue", (10, 5), (10.5, 7.5), (11, 7.5), (11, 5), (11.5, 4), (11, 3), (10.2, 4.2)],
                ["Granulocytes", "orange", (12, 9), (18, 9), (18, 1.5), (16.5, 1.5), (15.5, 5), (13, 7.3), (11.5, 8)],
                ["Granulocytes", "yellow", (11.5, 8), (13, 7.3), (15.5, 5), (16.5, 1.5), (12.5, 2), (11.5, 4), (11, 5)],
                ["Trash", "black", (-7, 15), (0, 15), (3, 7), (5, 6), (7, 10), (8, 12), (9, 16), 
                (12.5, 16), (10, 12.5), (8, 10), (7.5, 8), (8, 7), (9.7, 3), (5, -2), (5, -8), (-7, -7)]
            ],
            subsample=1
        )

        print(i)
        print(j)
        if res is not None and not res.empty:
            print(res.describe().T.tail(9).iloc[:, :2])

In [None]:
%%time
# Настраиваем полигоны
res = umap_populations_plot(
    path=path, dirs=dirs,
    tube="λ_κ_45_117_138_38.fcs",
    polygons=[
        ["CD138+CD38+", "gray", (5, 12), (7, 10), (6.8, 8.3), (6, 7.7), (7.3, 5), (7.5, 4.5), (2, 7)],
        ["CD117+", "violet", (7.3, 10.9), (8.8, 9.6), (8.8, 7.5), (7, 10),
        (6.8, 8.3), (6, 7.7), (5.8, 9), (6, 9.8), (7, 10)],
        ["kappa+", "pink", (5, 12), (6, 10), (6, 9), (6, 7.8), (5, 8.25), (4, 10)],
        ["lambda+", "green", (4, 10.2), (5, 8.25), (6, 7.8), (7.5, 4.5), (2, 7)],
        ["Plasma cells", "orange", (5, 12), (7, 10), (6.8, 8.3), (6, 7.7), (7.5, 4.5), (2, 7)],
        ["Lymphocytes", "teal", (11, -2), (16, -1), (10, -8)],
        ["Monocytes", "blue", (6.8, 8.3), (8, 8), (9, 7), (9.3, 5), (7.8, 6), (7, 6.7), (6.3, 6.7), (6, 7.7)],
        ["Granulocytes", "orange", (10, 10), (13.5, 12.5), (18, 8), (18, 1.5), (15.5, 3.5), (12.5, 2.5), (10, 6)],
        ["Trash", "black", (7.5, 4.8), (8, 5.3), (9, 5), (9, -10), (-5, -10), (-5, 15), (6, 15),
        (7.5, 12), (7, 10), (5, 12), (2, 7), (5, 2.5)]
    ],
    subsample=0.1
)

In [None]:
%%time
for i in types:
    for j in samples:
        res = umap_populations_plot(
            path=path, dirs=dirs,
            tube="λ_κ_45_117_138_38.fcs",
            sample_type=i,
            sample_name=j,
            polygons=[
                ["CD138+CD38+", "gray", (5, 12), (7, 10), (6.8, 8.3), (6, 7.7), (7.3, 5), (7.5, 4.5), (2, 7)],
                ["CD117+", "violet", (7.3, 10.9), (8.8, 9.6), (8.8, 7.5), (7, 10),
                (6.8, 8.3), (6, 7.7), (5.8, 9), (6, 9.8), (7, 10)],
                ["kappa+", "pink", (5, 12), (6, 10), (6, 9), (6, 7.8), (5, 8.25), (4, 10)],
                ["lambda+", "green", (4, 10.2), (5, 8.25), (6, 7.8), (7.5, 4.5), (2, 7)],
                ["Plasma cells", "orange", (5, 12), (7, 10), (6.8, 8.3), (6, 7.7), (7.5, 4.5), (2, 7)],
                ["Lymphocytes", "teal", (11, -2), (16, -1), (10, -8)],
                ["Monocytes", "blue", (6.8, 8.3), (8, 8), (9, 7), (9.3, 5), (7.8, 6), (7, 6.7), (6.3, 6.7), (6, 7.7)],
                ["Granulocytes", "orange", (10, 10), (13.5, 12.5), (18, 8), (18, 1.5), (15.5, 3.5), (12.5, 2.5), (10, 6)],
                ["Trash", "black", (7.5, 4.8), (8, 5.3), (9, 5), (9, -10), (-5, -10), (-5, 15), (6, 15),
                (7.5, 12), (7, 10), (5, 12), (2, 7), (5, 2.5)]
            ],
            subsample=1
        )

        print(i)
        print(j)
        if res is not None and not res.empty:
            print(res.describe().T.tail(9).iloc[:, :2])

In [None]:
%%time
# Настраиваем полигоны
res = umap_populations_plot(
    path=path, dirs=dirs,
    tube="xx_79a_45_Ki67_3_xx.fcs",
    polygons=[
        ["Plasma cells", "orange", (11, 7.5), (12, 10), (14, 7.5), (13, 5), (11, 2.6), (10, 6), (10, 7)],
        ["Lymphocytes", "teal", (7, 4), (10, 4), (10, 1), (7.5, -1.5), (7, -1.5)],
        ["Lymphocytes", "teal", (9.5, 6), (10, 6), (10, 5), (9, 4.5)],
        ["CD79a+", "yellow", (9.5, 6), (10, 6), (10, 5), (9, 4.5)],
        ["CD3+", "red", (7, 4), (10, 4), (10, 1), (7.5, -1.5), (7, -1.5)],
        ["Granulocytes", "yellow", (14, 7.5), (18, 10), (18, 0), (7, -15), (4, -5), (7, -1.5), (7.5, -1.5), (10, 1), (13, 5)],
        ["Trash", "black", (-6, 22), (18, 22), (11, 10), (9, 5), (7, 4), (5, 3), 
        (3, -12), (-6, -12)],
        ["Trash", "black", (12, -10), (18, -10), (18, -15), (12, -15)]
    ],
    subsample=0.1
)

In [None]:
%%time
for i in types:
    for j in samples:
        res = umap_populations_plot(
            path=path, dirs=dirs,
            tube="xx_79a_45_Ki67_3_xx.fcs",
            sample_type=i,
            sample_name=j,
            polygons=[
                ["Plasma cells", "orange", (11, 7.5), (12, 10), (14, 7.5), (13, 5), (11, 2.6), (10, 6), (10, 7)],
                ["Lymphocytes", "teal", (7, 4), (10, 4), (10, 1), (7.5, -1.5), (7, -1.5)],
                ["Lymphocytes", "teal", (9.5, 6), (10, 6), (10, 5), (9, 4.5)],
                ["CD79a+", "yellow", (9.5, 6), (10, 6), (10, 5), (9, 4.5)],
                ["CD3+", "red", (7, 4), (10, 4), (10, 1), (7.5, -1.5), (7, -1.5)],
                ["Granulocytes", "yellow", (14, 7.5), (18, 10), (18, 0), (7, -15), (4, -5), (7, -1.5), (7.5, -1.5), (10, 1), (13, 5)],
                ["Trash", "black", (-6, 22), (18, 22), (11, 10), (9, 5), (7, 4), (5, 3), 
                (3, -12), (-6, -12)],
                ["Trash", "black", (12, -10), (18, -10), (18, -15), (12, -15)]
            ],
            subsample=1
        )

        print(i)
        print(j)
        if res is not None and not res.empty:
            print(res.describe().T.tail(9).iloc[:, :2])

# Анализ тестовых файлов

In [None]:
%%time
process_test_data(path, tube, pdf=True)

# Обращение к .csv-файлу отчёта и перекладка данных в бланк

In [None]:
%%time
make_report(path,  # Корневая папка, в которой лежит папка Reports с новыми отчётами.
            blanks=r'C:\Users\vsevo\Downloads',  # Папка с заключениями 
            coordinates=r'D:\NovoExpress Data\Examples\Saved_UMAP_n_csv',  # Координаты для связи репорта и бланка заключения
            clean=False)  # Если True (по умолчанию), отчётные файлы будут уладены