In [None]:
# import random
# import sys
# import warnings
from pathlib import Path

# import DataSets_contigs as ds
# import kaplanmeier as km
import matplotlib.pyplot as plt

# import numpy as np
# import optuna
import pandas as pd
import polars as pl
import _config as cfg
# import pymysql
# from imblearn.over_sampling import SMOTE
# from sklearn import svm
# from sklearn.metrics import (
#     accuracy_score,
#     f1_score,
#     precision_score,
#     recall_score,
#     roc_auc_score,
# )
# from sklearnex import patch_sklearn

# patch_sklearn()
# warnings.filterwarnings("ignore")
# random.seed(1024)
# np.random.seed(1024)

results_dir = Path("Results")
results_dir.mkdir(parents=True, exist_ok=True)

perf_col = "aroc"

scaffold_old_col = "Id"
var_class_col = "Variant_Classification"
prot_change_col = "Protein_Change"
gene_name_col = "gene_name"

perf_delta_col = "Performance_delta"
scaffold_col = "scaffold"

promisc_new_col = "Promiscuity"
bin_id_col = "Bin_idx"

list_separator = ";"

# Unique colors for variants
variants = [
    "Missense_Mutation",
    "Intron",
    "Silent",
    "Splice_Site",
    "Nonsense_Mutation",
    "Frame_Shift_Del",
    "IGR",
    "3'UTR",
    "5'Flank",
    "5'UTR",
    "RNA",
    "Frame_Shift_Ins",
    "In_Frame_Del",
    "lincRNA",
    "In_Frame_Ins",
    "Start_Codon_SNP",
    "Nonstop_Mutation",
    "De_novo_Start_OutOfFrame",
    "De_novo_Start_InFrame",
    "Stop_Codon_Del",
    "Start_Codon_Del",
    "Start_Codon_Ins",
]
variants_n = len(variants)
variants_colors = {}
if variants_n <= 10:
    c = {variants[i]: plt.color_sequences["tab10"][i] for i in range(variants_n)}
    variants_colors.update(c)
else:
    c = {variants[i]: plt.color_sequences["tab10"][i] for i in range(10)}
    variants_colors.update(c)
    vn_rest = variants_n - 10
    if vn_rest <= 10:
        c = {
            variants[i]: plt.color_sequences["tab20"][j]
            for i, j in zip(range(10, 10 + vn_rest), range(1, 2 * vn_rest, 2))
        }
        variants_colors.update(c)
    else:
        c = {
            variants[i]: plt.color_sequences["tab20"][j]
            for i, j in zip(range(10, 20), range(1, 20, 2))
        }
        variants_colors.update(c)
        vn_rest -= 10
        if vn_rest <= 9:
            c = {
                variants[i]: plt.color_sequences["Pastel1"][j]
                for i, j in zip(range(20, 20 + vn_rest), range(vn_rest))
            }
            variants_colors.update(c)
        else:
            raise ValueError("No colors left, yet")
# Add color for `Other`
variants_colors_other = "0.7"

# Matplotlib config

plt.rcdefaults()

main_font_size = 6
label_font_size = 10

axis_color = "0.15"

plt.rcParams["text.color"] = axis_color
plt.rcParams["axes.labelcolor"] = axis_color
plt.rcParams["xtick.color"] = axis_color
plt.rcParams["ytick.color"] = axis_color
plt.rcParams["axes.edgecolor"] = axis_color

plt.rcParams["font.size"] = main_font_size
plt.rcParams["axes.labelsize"] = main_font_size
plt.rcParams["axes.titlesize"] = main_font_size
plt.rcParams["xtick.labelsize"] = main_font_size
plt.rcParams["ytick.labelsize"] = main_font_size
plt.rcParams["legend.fontsize"] = main_font_size
plt.rcParams["figure.labelsize"] = main_font_size
plt.rcParams["figure.titlesize"] = main_font_size

plt.rcParams["axes.spines.top"] = False
plt.rcParams["axes.spines.right"] = False

plt.rcParams["savefig.transparent"] = True

plt.rcParams["pdf.fonttype"] = 42
plt.rcParams["ps.fonttype"] = 42
plt.rcParams["svg.fonttype"] = "none"

#


def figsize_in_mm(width, height):
    mm = 1 / 25.4  # millimeters in inches
    return (width * mm, height * mm)

In [None]:
# Scaffold level

# Config
config = cfg.configurations[3]
mut_remove_results_file = (
    results_dir / "TEST-mutation-remove-experiments-Scaffold_level.csv"
)
mut_join_col = scaffold_col
promisc_col = "popcov_but_sqrt"
pd_int_cols = {"unique_peptides": "Int64"}
unique_cols = [
    "Chromosome",
    "Start_position",
    "End_position",
    "unique_peptides",
    "popcov_but_sqrt",
]
best_genes_cols = [
    gene_name_col,
    prot_change_col,
    scaffold_col,
    promisc_col,
    perf_delta_col,
]
prot_change_extra_col = prot_change_col + "_extra"
#

model_name = config["plot_label"]

mutset = pd.read_csv(
    config["contig_file"], sep="\t", low_memory=False, dtype=pd_int_cols
)
# Filter the file like in 'DataSets_validation.py' first:
if config["hotspots"]:
    mutset = mutset.loc[mutset["contig"].notna()]
mutset = mutset.dropna(
    axis="index", how="any", subset=unique_cols, ignore_index=True
).drop_duplicates(unique_cols, ignore_index=True)
mutset = pl.DataFrame(mutset)
# Renaming for scaffolds only:
mutset = mutset.rename({scaffold_old_col: scaffold_col})

contigmut = pl.read_csv(mut_remove_results_file)

aroccontigmut = contigmut[perf_col][0]

contig_mutations_with_delta = (
    contigmut.slice(1)
    .with_columns((aroccontigmut - pl.col(perf_col)).alias(perf_delta_col))
    .sort(perf_delta_col, descending=True)
)

# Add model performance info to mutations
#  The `filter` construction might be scaffold/contig-only
mutset = (
    mutset.join(
        contig_mutations_with_delta,
        on=mut_join_col,
        suffix="_dup",
        how="left",
        coalesce=False,
    )
    .filter(pl.col(mut_join_col).is_not_null())
    .sort(perf_delta_col, descending=True)
)

# Deal with some list columns (scaffold/contig-only)
contig_mutations_with_delta = contig_mutations_with_delta.with_columns(
    pl.col(prot_change_col).str.split(list_separator),
    pl.col(gene_name_col).str.split(list_separator),
)
contig_mutations_with_delta = contig_mutations_with_delta.with_columns(
    pl.col(prot_change_col)
    .list.unique()
    .list.set_difference([""])
    .alias(prot_change_extra_col)
).with_columns(
    pl.when(pl.col(prot_change_extra_col).list.len() > 0).then(
        pl.col(prot_change_extra_col)
    )
)

print(
    "Performance delta limits: ("
    f"{contig_mutations_with_delta[perf_delta_col].min()}"
    ", "
    f"{contig_mutations_with_delta[perf_delta_col].max()}"
    ")"
)

In [None]:
mutset = mutset.filter(pl.col(perf_delta_col).is_not_null())

In [None]:
contig_mutations_with_delta[gene_name_col].str.find(";;").is_not_null().any()

In [None]:
# Performance delta distribution

# Config
promisc_boundary = 13
perf_delta_x_range = (0.0, 0.32)

fig_size = figsize_in_mm(94, 72)

top_genes_n = 100
#

base_df = contig_mutations_with_delta
prot_change_test_col = prot_change_extra_col

canonical = base_df.filter(pl.col(prot_change_test_col).is_not_null())
non_canonical = base_df.filter(pl.col(prot_change_test_col).is_null())

canonical_high = canonical.filter(pl.col(promisc_col) > promisc_boundary)
canonical_low = canonical.filter(pl.col(promisc_col) <= promisc_boundary)
non_canonical_high = non_canonical.filter(pl.col(promisc_col) > promisc_boundary)
non_canonical_low = non_canonical.filter(pl.col(promisc_col) <= promisc_boundary)

data_subgroups = {
    "Canonical (High Promiscuity)": canonical_high,
    "Canonical (Low Promiscuity)": canonical_low,
    "Non-Canonical (High Promiscuity)": non_canonical_high,
    "Non-Canonical (Low Promiscuity)": non_canonical_low,
}

# Fig

fig, axes = plt.subplots(2, 2, figsize=fig_size, layout="constrained")
for ax, (title, series) in zip(axes.flatten(), data_subgroups.items()):
    h_out = ax.hist(
        series[perf_delta_col], bins=20, density=False, range=perf_delta_x_range
    )
    ax.set_title(title)
    ax.set_xlabel("Performance Delta")
    ax.set_ylabel("Frequency")
    ax.set_yticks([])
    ax.set_yticklabels([])
    ax.grid(axis="y", alpha=0.5)

    # Save top N genes from each bin:

    bin_n = h_out[0][::-1]
    bin_starts = h_out[1]
    bin_ranges = [
        (bin_starts[i], bin_starts[i + 1]) for i in range(len(bin_starts) - 1)
    ][::-1]
    bin_ids = [i for i in range(len(bin_n))][::-1]

    top_genes = []

    for b_n, (b_s, b_e), b_id in zip(bin_n, bin_ranges, bin_ids):
        if b_n == 0:
            continue
        b_series = series.filter(
            (pl.col(perf_delta_col) >= b_s) & (pl.col(perf_delta_col) < b_e)
        ).select(pl.col(best_genes_cols))
        # Might work only with scaffolds/contigs
        b_series = (
            b_series.explode([gene_name_col, prot_change_col])
            .unique()
            .sort(
                [promisc_col, gene_name_col, prot_change_col, mut_join_col],
                descending=[True, False, False, False],
            )
            .with_columns(pl.lit(b_id).alias(bin_id_col))
        )
        #
        # Take first top N promiscuity values (dense row selection)
        b_series = b_series.with_columns(
            pl.col(promisc_col).rank(method="dense", descending=True).alias("_rank")
        )
        b_series = b_series.filter(
            pl.col(promisc_col).rank(method="dense", descending=True) <= top_genes_n
        )
        #
        top_genes.append(b_series)

    top_genes = pl.concat(top_genes).sort(
        [promisc_col, perf_delta_col, gene_name_col, prot_change_col, mut_join_col],
        descending=[True, True, False, False, False],
    )
    top_genes = top_genes.rename({promisc_col: promisc_new_col})
    print(top_genes)

    title_mod = title.lower().replace("(", "").replace(")", "").replace(" ", "_")
    f_name = f"top-{top_genes_n}-genes-{title_mod}-{model_name.replace(' ', '_')}.csv"
    # top_genes.write_csv(f_name)
for fmt in ("pdf", "png"):
    fig.savefig(
        results_dir
        / f"performance-delta-histograms-{model_name.replace(' ', '_')}.{fmt}",
        dpi=600,
    )

In [None]:
# Variant classifications distribution

# Config
base_df = mutset
prot_change_test_col = prot_change_col

fig_size = figsize_in_mm(70.3, 154.3)
# n_other_groups = [5, 9]
n_other_groups = [3, 0]
#

canonical = base_df.filter(pl.col(prot_change_test_col).is_not_null())
non_canonical = base_df.filter(pl.col(prot_change_test_col).is_null())

groups = [
    ("Canonical Mutations", canonical),
    ("Non-Canonical Mutations", non_canonical),
]

# Fig

fig, axes_pie = plt.subplots(2, 1, figsize=fig_size, layout="constrained")
for ax, (title, group_df), n_other in zip(axes_pie.flatten(), groups, n_other_groups):
    counts = group_df.group_by(var_class_col).len().sort("len", descending=True)
    labels = counts[var_class_col]
    sizes = counts["len"]
    n_labels = len(labels)
    if 0 < n_other < n_labels:
        other_sum = sizes.slice(-n_other).sum()
        sizes = sizes.slice(0, n_labels - n_other).append(
            pl.Series("a", [other_sum], dtype=pl.UInt32)
        )
        other_label = "Other: " + ",\n".join(labels.slice(-n_other).to_list())
        labels = labels.slice(0, n_labels - n_other).append(
            pl.Series("a", [other_label])
        )
        n_labels = len(labels)
    elif n_other >= n_labels:
        raise ValueError("Too much labels compressed into `n_other_groups`")
    colors = [
        variants_colors_other if lab.startswith("Other: ") else variants_colors[lab]
        for lab in labels
    ]
    wedges, _ = ax.pie(
        sizes,
        labels=labels,
        colors=colors,
        autopct=None,
        startangle=90,
        labeldistance=None,
    )
    ax.set_title(title)
    percents = (sizes / sizes.sum() * 100).round(2)
    legend_labels = [f"{lab}--{p}%" for lab, p in zip(labels, percents)]
    ax.legend(wedges, legend_labels, loc="center left", bbox_to_anchor=(1, 0, 0.5, 1))
    ax.axis("equal")
for fmt in ("pdf", "png"):
    fig.savefig(
        results_dir
        / f"variant-classifications-pie-charts-{model_name.replace(' ', '_')}.{fmt}",
        dpi=600,
    )