In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import re
import seaborn as sns
from tqdm import tqdm
from statannotations.Annotator import Annotator
from scipy.stats import mannwhitneyu
from plottable import ColumnDefinition, Table
from plottable.plots import bar
from plottable.cmap import normed_cmap, centered_cmap
from matplotlib import pyplot as plt
from matplotlib_venn import venn2, venn2_circles
import matplotlib as mpl
import seaborn as sns
import gseapy as gp
from plottable import ColumnDefinition, Table
from plottable.cmap import normed_cmap
from plottable.formatters import decimal_to_percent, tickcross
from plottable.plots import circled_image
import matplotlib
import matplotlib.colors as mcolors
import pathlib

# Setup path and params

In [None]:
path_genetics_1 = "E:/YandexDisk/Work/bbd/fmba/exome"
path_genetics_2 = "E:/YandexDisk/Work/bbd/fmba/lesnoy_2025/results/tapes_annotations_2025"
path_main = 'E:/YandexDisk/Work/pydnameth/draft/13_fmba_cvd_dnam/data/120_1'

sheet_name_1 = "top200"
sheet_name_2 = "Top 300"
prob_thld = 0.8

path_save = f"{path_main}/genetics/{prob_thld}"
pathlib.Path(f"{path_save}/gsea_libs").mkdir(parents=True, exist_ok=True)

# Generate and save mutations dataframe from first batch

In [None]:
df_gen_1 = pd.read_excel(f"{path_genetics_1}/Образцы экзомы Лесной секвенирование 100 шт.xlsx", index_col='Sample#')
df_gen_1['ID образца крови'] = df_gen_1['ID образца крови'].astype(str)


all_results = []
for file in (pbar := tqdm(os.listdir(path_genetics_1))):
    pbar.set_description(f"{file}")
    
    if file.endswith(".xlsx"):
        file_path = os.path.join(path_genetics_1, file)

        try:
            df = pd.read_excel(file_path, sheet_name=sheet_name_1)
            
            # Убедимся, что нужные колонки присутствуют
            required_cols = [
                "Ген", "Вероятность патогенности", "ACMG_класс", "Ассоциация с заболеванием"
            ]
            if not all(col in df.columns for col in required_cols):
                print(f"Пропущен файл {file_path} — нет нужных колонок.")
                continue
            
            # Фильтрация
            filtered = df[df["Вероятность патогенности"] >= prob_thld].copy()
            
            sample_number_raw = file.split("-")[0]
            sample_number = df_gen_1.at[sample_number_raw, 'ID образца крови']
            if not filtered.empty:
                
                filtered["Номер образца"] = sample_number

                all_results.append(filtered[required_cols + ["Номер образца"]])
            else:
                print(sample_number)

        except Exception as e:
            print(f"Ошибка при обработке файла {file_path}: {e}")

# Объединяем в один датафрейм
if all_results:
    mutations_df = pd.concat(all_results, ignore_index=True)

    # Переименование колонок
    mutations_df.rename(columns={
        "Ген": "Gene",
        "Вероятность патогенности": "Pathogenicity Probability",
        "ACMG_класс": "ACMG Classification",
        "Ассоциация с заболеванием": "Disease Description",
        "Номер образца": "Sample"
    }, inplace=True)
else:
    print("Подходящих данных не найдено.")

mutations_df["number"] = mutations_df["Sample"].astype(str)
mutations_df.to_excel(f"{path_save}/filtered_mutations_1.xlsx", index=False)

# Generate and save mutations dataframe from second batch

In [None]:
all_results = []
for file in (pbar := tqdm(os.listdir(path_genetics_2))):
    pbar.set_description(f"{file}")
    
    if file.endswith(".vcf.xlsx"):
        file_path = os.path.join(path_genetics_2, file)

        try:
            df = pd.read_excel(file_path, sheet_name=sheet_name_2)
            
            # Убедимся, что нужные колонки присутствуют
            required_cols = [
                "Ген", "Вероятность патогенности", "Классификация ACMG", "Описание заболевания"
            ]
            if not all(col in df.columns for col in required_cols):
                print(f"Пропущен файл {file_path} — нет нужных колонок.")
                continue
            
            # Фильтрация
            filtered = df[df["Вероятность патогенности"] >= prob_thld].copy()
            
            sample_number = file.split(".")[0]
            if not filtered.empty:
                
                filtered["Номер образца"] = sample_number

                all_results.append(filtered[required_cols + ["Номер образца"]])
            else:
                print(sample_number)

        except Exception as e:
            print(f"Ошибка при обработке файла {file_path}: {e}")

# Объединяем в один датафрейм
if all_results:
    mutations_df = pd.concat(all_results, ignore_index=True)

    # Переименование колонок
    mutations_df.rename(columns={
        "Ген": "Gene",
        "Вероятность патогенности": "Pathogenicity Probability",
        "Классификация ACMG": "ACMG Classification",
        "Описание заболевания": "Disease Description",
        "Номер образца": "Sample"
    }, inplace=True)
else:
    print("Подходящих данных не найдено.")

mutations_df["number"] = mutations_df["Sample"].astype(str)
mutations_df.to_excel(f"{path_save}/filtered_mutations_2.xlsx", index=False)

# Load generated mutations dataframe

In [None]:
mutations_df_1 = pd.read_excel(f"{path_save}/filtered_mutations_1.xlsx")
mutations_df_2 = pd.read_excel(f"{path_save}/filtered_mutations_2.xlsx")
samples_1 = mutations_df_1["Sample"].unique()
samples_2 = mutations_df_2["Sample"].unique()
print(set(samples_1) & set(samples_2))
df_samples = pd.DataFrame(index=list(set(samples_1) | set(samples_2)))
df_samples.loc[samples_1, 'Batch'] = 1
df_samples.loc[samples_2, 'Batch'] = 2
df_samples.to_excel(f"{path_save}/df_samples.xlsx")

mutations_df = pd.concat([mutations_df_1, mutations_df_2], ignore_index=True)

mutations_df["number"] = mutations_df["Sample"].astype(str)

# Merge genetics and pheno dataframes

In [None]:
patient_groups_df = pd.read_excel(f"{path_main}/pheno_funnorm.xlsx", index_col=0)
patient_groups_df["number"] = patient_groups_df.index.astype(str)
patient_groups_df["Status"] = patient_groups_df["Special Status"]
display(patient_groups_df["Status"].value_counts())

samples = list(set(mutations_df['number'].unique()).intersection(set(patient_groups_df['number'].unique())))
patient_groups_df = patient_groups_df[patient_groups_df["number"].isin(samples)]
display(patient_groups_df["Status"].value_counts())

# Объединяем по колонке number
merged_df = mutations_df.merge(patient_groups_df, on="number")

# Среднее число мутаций на пациента в каждой группе
display(merged_df.groupby("Status")["Gene"].count() / merged_df.groupby("Status")["Sample"].nunique())


# Average mutations count in groups

In [None]:
df_grouped = merged_df.groupby(["Status", "Sample"])["Gene"].count().reset_index(name="Mutation count")

sns.set_theme(style='ticks')
fig, ax = plt.subplots(figsize=(4, 4), layout="constrained")
violin = sns.violinplot(
    data=df_grouped,
    x='Status',
    y='Mutation count',
    palette={'Control': 'chartreuse', 'Case': 'red'},
    scale='width',
    order=['Control', 'Case'],
    saturation=0.75,
)
violin.set_xlabel(f"")
violin.set_ylabel("Average number of mutations per sample")
mw_pval = mannwhitneyu(
    df_grouped.loc[df_grouped['Status'] == 'Control', f'Mutation count'].values,
    df_grouped.loc[df_grouped['Status'] == 'Case', f'Mutation count'].values,
    alternative='two-sided').pvalue
pval_formatted = [f'{mw_pval:.2e}']
annotator = Annotator(
    violin,
    pairs=[('Control', 'Case')],
    data=df_grouped,
    x='Status',
    y=f'Mutation count',
    order=['Control', 'Case']
)
annotator.set_custom_annotations(pval_formatted)
annotator.configure(loc='outside')
annotator.annotate()
fig.savefig(f"{path_save}/violin_mutation_count.png", bbox_inches='tight', dpi=200)
fig.savefig(f"{path_save}/violin_mutation_count.pdf", bbox_inches='tight')
plt.close()

# Average mutations count in groups by ACMG Classification

In [None]:
df_grouped_acmg = merged_df.groupby(["Status", "Sample", "ACMG Classification"])["Gene"].count().reset_index(name="Mutation count")

acmg_colors = {
    'Likely Benign': 'dodgerblue',
    'VUS': 'gold',
    'Likely Pathogenic': 'orangered',
    'Pathogenic': 'firebrick'
}

sns.set_theme(style='ticks')
fig = plt.figure(
    figsize=(6, 5),
    layout="constrained"
)
axs = fig.subplot_mosaic(
    [
        ['table'],
        ['violin'],
    ],
    height_ratios=[1, 5],
    # width_ratios=[3, 1.5],
    gridspec_kw={
        # "bottom": 0.0,
        # "top": 1.00,
        # "left": 0.1,
        # "right": 0.5,
        #"wspace": 0.33,
        #"hspace": 0.01,
    },
)

ds_table = pd.DataFrame(index=['Control VS Case'], columns=list(acmg_colors.keys()))
for acmg_group in acmg_colors:
    mw_pval = mannwhitneyu(
        df_grouped_acmg.loc[(df_grouped_acmg['Status'] == 'Control') & (df_grouped_acmg['ACMG Classification'] == acmg_group), f'Mutation count'].values,
        df_grouped_acmg.loc[(df_grouped_acmg['Status'] == 'Case') & (df_grouped_acmg['ACMG Classification'] == acmg_group), f'Mutation count'].values,
        alternative='two-sided'
    ).pvalue
    ds_table.at['Control VS Case', acmg_group] = f'{mw_pval:.2e}'
col_defs = [
    ColumnDefinition(
        name="index",
        title='Mann-Whitney p-value',
        textprops={"ha": "left"},
        width=4.5,
    ),
]
for acmg_group in acmg_colors:
    col_defs.append(
            ColumnDefinition(
            name=acmg_group,
            title=acmg_group,
            textprops={"ha": "center"},
            width=2.0,
        )
    )
table = Table(
    ds_table,
    column_definitions=col_defs,
    row_dividers=True,
    footer_divider=False,
    ax=axs['table'],
    textprops={"fontsize": 7},
    row_divider_kw={"linewidth": 1, "linestyle": (0, (1, 1))},
    col_label_divider_kw={"linewidth": 1, "linestyle": "-"},
    column_border_kw={"linewidth": 1, "linestyle": "-"},
)

violin = sns.violinplot(
    data=df_grouped_acmg,
    x='Status',
    y='Mutation count',
    hue="ACMG Classification",
    palette=acmg_colors,
    hue_order=list(acmg_colors.keys()),
    order=['Control', 'Case'],
    scale='width',
    saturation=0.75,
    ax=axs['violin']
)
axs['violin'].set_xlabel(f"")
axs['violin'].set_ylabel("Average number of mutations per sample")
fig.savefig(f"{path_save}/violin_mutation_acmg_count.png", bbox_inches='tight', dpi=200)
fig.savefig(f"{path_save}/violin_mutation_acmg_count.pdf", bbox_inches='tight')
plt.close()

# Frequency of genomic variants in the 50 most frequently mutated genes

In [None]:
# Подсчитываем количество мутаций для каждого гена в обеих группах
mutation_counts = merged_df.groupby(["Status", "Gene"]).size().unstack(fill_value=0)

# Считаем общее количество мутаций для каждого гена (по обеим группам)
total_mutation_counts = mutation_counts.sum(axis=0)

# Выбираем топ-50 генов с наибольшим количеством мутаций
top_50_genes = total_mutation_counts.nlargest(50).index

# Отбираем данные для этих топ-50 генов
mutation_counts_filtered = mutation_counts.loc[:, top_50_genes]


df_heatmap = mutation_counts_filtered.T[['Control', 'Case']]

sns.set_theme(style='ticks')
fig, ax = plt.subplots(figsize=(4, 15) , layout='constrained')
heatmap = sns.heatmap(
    df_heatmap,
    annot=True,
    fmt="d",
    # cmap='coolwarm',
    linewidth=0.1,
    linecolor='black',
    # annot_kws={"fontsize": 15},
    cbar_kws={
        'orientation': 'horizontal',
        'location': 'top',
        'pad': 0.025,
        'aspect': 30
    },
    ax=ax
)
heatmap_pos = heatmap.get_position()
ax.figure.axes[-1].set_title('Frequency of genomic variants in\nthe 50 most frequently mutated genes', fontsize=10)
ax.figure.axes[-1].tick_params()
for spine in ax.figure.axes[-1].spines.values():
    spine.set_linewidth(1)
ax.set_xlabel('')
ax.set_ylabel('')
plt.savefig(f"{path_save}/heatmap_top50.png", bbox_inches='tight', dpi=200)
plt.savefig(f"{path_save}/heatmap_top50.pdf", bbox_inches='tight')
plt.close(fig)

# GSEA

## Prepare gene lists and libs

In [None]:
# Получаем множества генов для каждой группы
genes_ctrl = set(merged_df[merged_df['Status'] == 'Control']['Gene'].dropna())
genes_case = set(merged_df[merged_df['Status'] == 'Case']['Gene'].dropna())

genes_ctrl_only = genes_ctrl - genes_case
genes_case_only = genes_case - genes_ctrl
genes_cmn = genes_ctrl & genes_case

gsea_libs_trgt = [
    'GO_Biological_Process_2025',
    'GO_Molecular_Function_2025',
    'GO_Cellular_Component_2025',
    'KEGG_2021_Human',
    'Reactome_2022',
    'Reactome_Pathways_2024',
    'Jensen_DISEASES',
    'Jensen_DISEASES_Curated_2025',
    'Jensen_DISEASES_Experimental_2025',
]

genes_set = {
    'ctrl_only': genes_ctrl_only,
    'case_only': genes_case_only,
    'cmn': genes_cmn
}

fig, ax = plt.subplots(figsize=(6, 6))
venn = venn2(
    subsets=(genes_ctrl, genes_case),
    set_labels = ('Control', 'Case'),
    set_colors=('chartreuse', 'red'),
    alpha = 0.8
)
venn2_circles(
    subsets=(genes_ctrl, genes_case),
)
plt.tight_layout()
plt.savefig(f"{path_save}/venn_genes_intersection.png", bbox_inches='tight', dpi=400)
plt.savefig(f"{path_save}/venn_genes_intersection.pdf", bbox_inches='tight', dpi=400)
plt.clf()

# Печать статистики
print(f"Гены только у Control: {len(genes_ctrl_only)}")
print(f"Гены только у Case: {len(genes_case_only)}")
print(f"Общие гены: {len(genes_cmn)}")

## Download enrichment

In [None]:
for gene_set_name, gene_set in genes_set.items():
    dfs_enrichr = []
    for gsea_lib in (pbar := tqdm(gsea_libs_trgt)):
        pbar.set_description(f"Processing {gsea_lib}")
        enr = gp.enrichr(
            gene_list=list(gene_set),
            gene_sets=gsea_lib,
            organism='Human',
            outdir=None,
            cutoff=1.00,
            verbose=False,
            no_plot=True
        )
        dfs_enrichr.append(enr.results)
    df_enrichr = pd.concat(dfs_enrichr, ignore_index=True)
    df_enrichr.to_excel(f"{path_save}/enrichr_{gene_set_name}.xlsx", index=True)

## Read enrichment from file

In [None]:
dfs_enr = {}
for gene_set_name in genes_set:
    dfs_enr[gene_set_name] = pd.read_excel(f"{path_save}/enrichr_{gene_set_name}.xlsx", index_col=0)

## Terms analysis

In [None]:
for gsea_lib in (pbar := tqdm(gsea_libs_trgt)):
    pbar.set_description(f"Processing {gsea_lib}")
    
    terms_ctrl_only = set(dfs_enr['ctrl_only'].loc[(dfs_enr['ctrl_only']['Adjusted P-value'] < 0.05) & (dfs_enr['ctrl_only']['Gene_set'] == gsea_lib), 'Term'])
    terms_case_only = set(dfs_enr['case_only'].loc[(dfs_enr['case_only']['Adjusted P-value'] < 0.05) & (dfs_enr['case_only']['Gene_set'] == gsea_lib), 'Term'])
    terms_cmn = set(dfs_enr['cmn'].loc[(dfs_enr['cmn']['Adjusted P-value'] < 0.05) & (dfs_enr['cmn']['Gene_set'] == gsea_lib), 'Term'])
    
    terms_all = list(terms_ctrl_only.union(terms_case_only, terms_cmn))
    max_letters_in_terms = max([len(x) for x in terms_all])
    
    term_presence = {
        'Control only': [1 if term in terms_ctrl_only else 0 for term in terms_all],
        'Case only': [1 if term in terms_case_only else 0 for term in terms_all],
        'Common': [1 if term in terms_cmn else 0 for term in terms_all]
    }
    
    df_term_presence = pd.DataFrame(term_presence, index=terms_all).sort_index()
    
    f_cmap = sns.color_palette("Spectral_r", as_cmap=True)
    f_norm = mcolors.Normalize(vmin=0, vmax=1)
    sm = plt.cm.ScalarMappable(cmap=f_cmap, norm=f_norm).to_rgba
    
    col_defs = [
        ColumnDefinition(
            name="index",
            title='Terms',
            textprops={"ha": "left"},
            width=3.0 + 0.08 * max_letters_in_terms,
        ),
        ColumnDefinition(
            name='Control only',
            title='Control only',
            formatter=tickcross,
            textprops={"ha": "center"},
            group=gsea_lib,
            cmap=sm,
            width=1.7,
        ),
        ColumnDefinition(
            name='Case only',
            title='Case only',
            formatter=tickcross,
            textprops={"ha": "center"},
            group=gsea_lib,
            cmap=sm,
            width=1.7,
        ),
        ColumnDefinition(
            name='Common',
            title='Common',
            formatter=tickcross,
            textprops={"ha": "center"},
            group=gsea_lib,
            cmap=sm,
            width=1.7,
        ),
    ]
    
    fig, ax = plt.subplots(figsize=(6.0 + 0.08 * max_letters_in_terms, 4 + 0.2 * df_term_presence.shape[0]))
    table = Table(
        df_term_presence,
        column_definitions=col_defs,
        row_dividers=True,
        footer_divider=False,
        odd_row_color="#ffffff",
        even_row_color="#f0f0f0",
        ax=ax,
        # textprops={"fontsize": 10},
        row_divider_kw={"linewidth": 1, "linestyle": (0, (1, 1))},
        col_label_divider_kw={"linewidth": 1, "linestyle": "-"},
        column_border_kw={"linewidth": 1, "linestyle": "-"},
    ).autoset_fontcolors(colnames=['Control only', 'Case only', 'Common'])
    fig.savefig(f"{path_save}/gsea_libs/{gsea_lib}.png", bbox_inches='tight', dpi=200)
    fig.savefig(f"{path_save}/gsea_libs/{gsea_lib}.pdf", bbox_inches='tight')
    plt.close(fig)