In [None]:
r""" Protease optimization
 _____  _______  _    _ 
|  __ \|__   __|| |  | |
| |  | |  | |   | |  | |
| |  | |  | |   | |  | |
| |__| |  | |   | |__| |
|_____/   |_|   |______|

__authors__ = Marco Reverenna & Konstantinos Kalogeropoulus
__copyright__ = Copyright 2025-2026
__research-group__ = DTU Biosustain (Multi-omics Network Analytics) and DTU Bioengineering
__date__ = 22 Jun 2025
__maintainer__ = Marco Reverenna
__email__ = marcor@dtu.dk
__status__ = Dev
"""

In [None]:
#bsa_theme = [[0.0, "#fee8c8"], [0.7, "#fdbb84"], [0.8, "#ef6548"], [0.9, "#b30000"], [1.0, "#7f0000"]]

import os
import sys

script_dir = os.getcwd()  # get the current working directory
sys.path.append(os.path.join(script_dir, '../src'))

In [None]:
# my modules
import dbg
import greedy_method as greedy
import mapping as map
import consensus as cons
import alignment as align
import clustering as clus
import preprocessing as prep
import compute_statistics as comp_stat

# import libraries
from tqdm import tqdm
from tempfile import mkdtemp
from itertools import combinations
from collections import defaultdict, Counter
from scipy.stats import gaussian_kde
from Bio import SeqIO


import json
import re
import Bio
import shutil
import logging
import importlib
import statistics
import subprocess
import numpy as np
import pandas as pd
import seaborn as sns
import networkx as nx
import plotly.express as px
import matplotlib.pyplot as plt
import plotly.graph_objects as go

In [None]:
def get_sample_metadata(run, chain="", json_path="../json/sample_metadata.json"):
    with open(json_path, "r") as f:
        all_meta = json.load(f)

    if run not in all_meta:
        raise ValueError(f"Run '{run}' not found in metadata.")

    entries = all_meta[run]

    for entry in entries:
        if entry["chain"] == chain:
            return entry

    raise ValueError(f"No metadata found for run '{run}' with chain '{chain}'.")

In [None]:
def get_colors_from_run(cat, is_scaffold=False, json_path="../json/colors.json"):
    if not os.path.exists(json_path):
        raise FileNotFoundError(f"Missing color file: {json_path}")

    with open(json_path, "r") as f:
        colors = json.load(f)

    category = cat.split("_")[0].lower()
    key = "scaffold" if is_scaffold else "contig"

    try:
        return colors[category][key]
    except KeyError:
        raise ValueError(f"Color not defined for category '{category}' and key '{key}'")


In [None]:
def get_combination_name(ass_method, conf, kmer_size, size_threshold, min_overlap, min_identity, max_mismatches):
    if ass_method == "dbg":
        return f"comb_{ass_method}_c{conf}_ks{kmer_size}_ts{size_threshold}_mo{min_overlap}_mi{min_identity}_mm{max_mismatches}"
    else:
        return f"comb_{ass_method}_c{conf}_ts{size_threshold}_mo{min_overlap}_mi{min_identity}_mm{max_mismatches}"


In [None]:
run = "bsa"

meta = get_sample_metadata(run)

protein = meta["protein"]
chain = meta["chain"]
proteases = meta["proteases"]

print(f"Protein: {protein}")
print(f"Chain: {chain}")
print(f"Proteases: {proteases}")

In [None]:
ass_method = 'dbg'
kmer_size = 7
conf = 0.92
size_threshold = 5
min_overlap = 3
min_identity = 0.9
max_mismatches = 12

In [None]:
params = {"ass_method": 'dbg',
          "conf": conf,
          "kmer_size": kmer_size,
          "min_overlap": min_overlap,
          "min_identity": min_identity,
          "max_mismatches": max_mismatches,
          "size_threshold": size_threshold
          }

### Data cleaning

In [None]:
protein_norm = prep.normalize_sequence(protein)

df = pd.read_csv(f"../input/{run}.csv")

In [None]:
df['protease'] = df['experiment_name'].apply(lambda name: prep.extract_protease(name, proteases))

df = prep.clean_dataframe(df)

In [None]:
df['cleaned_preds'] = df['preds'].apply(prep.remove_modifications)

In [None]:
cleaned_psms = df['cleaned_preds'].tolist()

In [None]:
filtered_psms = prep.filter_contaminants(cleaned_psms, run, "../fasta/contaminants.fasta")

In [None]:
df = df[df['cleaned_preds'].isin(filtered_psms)]

In [None]:
df["mapped"] = df["cleaned_preds"].apply(lambda x: "True" if x in protein_norm else "False")

In [None]:
df

In [None]:
def plot_ridgeline_log_kde(df, protease_column='protease', conf_column='conf',
                            protease_list=None, vertical_gap=5, figsize=(12, 12),
                            cmap='viridis', custom_colors=None,
                            title="Confidence score distributions of PSMs per protease in BSA",
                            save_svg_path=None):

    if protease_list is None:
        protease_list = sorted(df[protease_column].dropna().unique())

    x_vals = np.linspace(0, 1, 500)

    # Choose colors
    if custom_colors is not None:
        colors = [custom_colors.get(p, 'gray') for p in protease_list]
    else:
        colors = plt.cm.get_cmap(cmap)(np.linspace(0, 1, len(protease_list)))

    # Compute global minimum for consistent scaling
    all_log_densities = []
    for p in protease_list:
        subset = df[df[protease_column] == p][conf_column].dropna()
        if len(subset) < 2:
            all_log_densities.append(None)
            continue
        kde = gaussian_kde(subset)
        density = kde(x_vals)
        log_density = np.log10(density + 1e-6)
        all_log_densities.append(log_density)

    global_min = np.min([d.min() for d in all_log_densities if d is not None])

    plt.figure(figsize=figsize)
    for i, protease in enumerate(protease_list):
        log_density = all_log_densities[i]
        if log_density is None:
            continue
        log_density_shifted = log_density - global_min
        offset = i * vertical_gap

        plt.plot(x_vals, log_density_shifted + offset, color=colors[i], lw=1.5)
        plt.fill_between(x_vals, offset, log_density_shifted + offset, alpha=0.4, color=colors[i])
        plt.text(1.01, offset + 0.5, protease, va='center', fontsize=10)

    plt.xlabel("Confidence", fontsize=12)
    plt.ylabel("")
    plt.title(title, fontsize=14)
    plt.yticks([])
    plt.grid(False)

    ax = plt.gca()
    ax.spines['bottom'].set_color('black')
    ax.spines['left'].set_color('black')
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)

    plt.tight_layout()

    if save_svg_path:
        plt.savefig(save_svg_path, format='svg')
        print(f"Plot saved to: {save_svg_path}")

    plt.show()


In [None]:
with open("../json/protease_colors.json", "r") as f:
    colors = json.load(f)

plot_ridgeline_log_kde(
    df,
    protease_column='protease',
    conf_column='conf',
    protease_list=proteases,
    custom_colors=colors,
    save_svg_path="confidence_ridgeline.svg"
)


In [None]:
df = df[df['conf'] > conf]

In [None]:
final_psms = df['cleaned_preds'].tolist()

In [None]:
df

In [None]:
final_df = df.copy()

In [None]:
# order proteases by frequency (most frequent first)
ordered_proteases = final_df['protease'].value_counts().index.tolist()

In [None]:
ordered_proteases

In [None]:
def protease_opt_statistics(df, sequence_type, output_folder, reference, **params):

    statistics = {}
    statistics.update(params) 

    df['sequence_length'] = df['end'] - df['start'] + 1

    statistics['reference_start'] = 0
    statistics['reference_end'] = len(reference) + 1

    statistics['total_sequences'] = len(df)
    statistics['average_length'] = df['sequence_length'].mean()
    statistics['min_length'] = df['sequence_length'].min()
    statistics['max_length'] = df['sequence_length'].max()

    covered_positions = set()
    for start, end in zip(df["start"], df["end"]):
        covered_positions.update(range(start - 1, end))  # Convert 1-based to 0-based indexing
    statistics['coverage'] = len(covered_positions) / statistics['reference_end']

    # Mismatch statistics
    statistics['perfect_matches'] = sum(df['mismatches_pos'].apply(len) == 0)  # sequences with no mismatches
    all_mismatches = [pos for mismatches in df['mismatches_pos'] for pos in mismatches]
    statistics['total_mismatches'] = len(set(all_mismatches))

    lengths = sorted(df['sequence_length'], reverse=True)
    total_length = sum(lengths)
    
    cumulative_length = 0
    n50 = None
    n90 = None
    for length in lengths:
        cumulative_length += length
        if n50 is None and cumulative_length >= total_length * 0.5:
            n50 = length
        if n90 is None and cumulative_length >= total_length * 0.9:
            n90 = length
        if n50 is not None and n90 is not None:
            break

    statistics['N50'] = n50
    statistics['N90'] = n90

    # Save JSON file
    file_name = f"{sequence_type}_stats.json"
    output_path = os.path.join(output_folder, file_name)
    with open(output_path, "w") as file:
        json.dump(statistics, file, indent=4)

    return statistics

In [None]:
build_results = []
for i in range(1, len(ordered_proteases) + 1):
    selected_proteases = ordered_proteases[:i]
    filtered_df = final_df[final_df['protease'].isin(selected_proteases)]
    seqs = filtered_df['preds'].tolist()

    kmers = dbg.get_kmers(final_psms, kmer_size=kmer_size)
    edges = dbg.get_debruijn_edges_from_kmers(kmers)
    assembled_contigs = dbg.assemble_contigs(edges)

    assembled_contigs = sorted(assembled_contigs, key=len, reverse=True)
    assembled_contigs = list(set(assembled_contigs))
    assembled_contigs = [seq for seq in assembled_contigs if len(seq) > size_threshold]
    assembled_contigs = sorted(assembled_contigs, key=len, reverse=True)

    mapped_contigs = map.process_protein_contigs_scaffold(assembled_contigs, protein_norm, max_mismatches, min_identity)
    df_contigs_mapped = map.create_dataframe_from_mapped_sequences(data = mapped_contigs)
    protease_opt_statistics(df = df_contigs_mapped, sequence_type='contigs',
                            output_folder = '.', reference = protein_norm, **params)
    
    assembled_scaffolds = dbg.scaffold_iterative(assembled_contigs, min_overlap, size_threshold)

    #
