In [1]:
# Import
import itertools
from pathlib import Path
import pandas  as pd
import numpy as np
import shutil
from ast import literal_eval
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import LabelEncoder
from SigProfilerExtractor import sigpro as sig
from SigProfilerAssignment import decomposition as decomp
from GenomeSigInfer.utils.helpers import MUTATION_LIST, alphabet_list
from GenomeSigInfer.data.data_processing import Preprocessing
from GenomeSigInfer.nmf.run_nmf import RunNMF

In [2]:
# File to read a decompose file from SigProfilerExtractor
def read_file_decompose(
    file: Path, dataframe: pd.DataFrame, col: str | None = None
) -> None:
    """
    Read the contents of a file and extract signature data.

    Args:
        file (Path): Path to the file to read.
        df (pd.DataFrame): DataFrame to store the signature data.
    """
    sigs = []
    with open(file, "r", encoding="UTF-8") as open_file:
        final_composition = False
        for line in open_file:
            if line.startswith("#################### Final Composition"):
                final_composition = True
            elif final_composition:
                sigs.append(eval(line.strip()))
                final_composition = False
    if col is None:
        dataframe[file.parts[-5]] = sigs
    else:
        dataframe[col] = sigs

In [3]:
# A class for running SigProfilerExtractor to extract genomic signatures.
class RunSig:
    """
    A class for running SigProfilerExtractor to extract genomic signatures.
    Attributes:
        None
    Methods:
        run(matrix, signatures, out): Run SigProfilerExtractor with specified arguments.
    """

    @staticmethod
    def run(
        matrix: pd.DataFrame,
        out: str = "output",
        signatures: int = 1,
    ):
        """
        Run SigProfilerExtractor with specified arguments.

        Args:
            matrix (pd.DataFrame): Genomic data matrix.
            signatures (int): Number of extracted signatures
            out (str): Folder where the results are stored
        """
        df = pd.DataFrame()
        output = Path(out).joinpath(f"result-{signatures}-sig")
        sig.sigProfilerExtractor(
            "matrix",
            output.as_posix(),
            matrix,
            maximum_signatures=signatures,
            minimum_signatures=signatures,
            make_decomposition_plots=False,
        )
        decom_file = (
            output
            / "SBS96"
            / "Suggested_Solution"
            / "COSMIC_SBS96_Decomposed_Solution"
            / "Solution_Stats"
            / "Cosmic_SBS96_Decomposition_Log.txt"
        )
        read_file_decompose(decom_file, df, "sigprofiler")
        shutil.rmtree(output)
        return df

In [4]:
class Decompose:
    """
    A utility class for decomposing genomic data into mutational signatures.
    """

    def __init__(
        self, W, mutations, signatures, output, all_genomes, col_names
    ) -> None:
        """
        Initialize the Decompose class with necessary parameters.

        Args:
            W: The W matrix from NMF decomposition.
            mutations: List of mutation types.
            signatures: Number of signatures.
            output: Output directory for results.
            all_genomes: Dataframe containing genomic data.
            col_names: Column names for genomic data.
        """
        self._W = W
        self._mutations = mutations
        self._signatures = signatures
        self._output = output
        self._all_genomes = all_genomes
        self._columns = alphabet_list(self._signatures, f"SBS{W.shape[0]}")
        self._col_names = col_names[1:]

    def decompose(self, folder: str = "result-nmf") -> None:
        """
        Perform signature decomposition and save results.

        Args:
            folder (str): Output subfolder name.
        """
        df = pd.DataFrame(self._W)
        df.columns = self._columns
        signatures = df.copy()
        outpath = Path(self._output).joinpath(folder.lower())
        outpath.mkdir(parents=True, exist_ok=True)
        df.insert(0, "MutationType", self._mutations)
        filename = outpath.joinpath("result.txt").as_posix()
        df.to_csv(
            filename,
            encoding="utf-8",
            index=False,
            sep="\t",
        )
        self._decompose(outpath=outpath, signatures=signatures)

    def _decompose(self, outpath: Path, signatures: pd.DataFrame) -> None:
        """
        Perform the actual signature decomposition.

        Args:
            outpath: Output path.
            signatures: Signature data.
        """
        genomes = pd.DataFrame(self._all_genomes)
        genomes.index = self._mutations
        genomes.columns = self._col_names
        decomp.spa_analyze(
            genomes,
            outpath.as_posix(),
            signatures=signatures,
            connected_sigs=True,
            decompose_fit_option=True,
            denovo_refit_option=False,
            cosmic_fit_option=True,
            signature_database=None,
            cosmic_version=3.3,
            exome=False,
            export_probabilities=True,
            export_probabilities_per_mutation=False,
            sample_reconstruction_plots=False,
            make_plots=False,
        )

In [5]:
def combinations() -> list[tuple[str, str]]:
    """
    Generate combinations of initialization and beta loss.

    Returns:
        list[tuple[str, str]]: List of tuples representing combinations.
    """
    inits = ["None", "random", "nndsvd", "nndsvda", "nndsvdar"]
    beta_losses = ["frobenius", "kullback-leibler", "itakura-saito"]
    return list(itertools.product(inits, beta_losses))

In [6]:
def run_nmfs(nmf_combs, all_genomes, sigs, matrix, out, decompose_file_name) -> pd.DataFrame:
    """
    Run NMF with different combinations of initialization and beta loss.

    Args:
        nmf_combs (list): List of NMF combinations to try.
        all_genomes (numpy.ndarray): Matrix of all genomes.
        sigs (int): Number of signatures.
        matrix (pandas.DataFrame): Data matrix.
        out (str): Output directory.

    Returns:
        Dataframe with the decomposed results
    """
    for index in range(len(nmf_combs)):
        df_decompose = pd.read_csv(decompose_file_name, sep="\t")
        combination = nmf_combs[index]
        init = combination[0]
        beta_loss = combination[1]
        folder = f"{init}_{beta_loss}"
        if folder in df_decompose.columns:
            continue
        preprocessed = Preprocessing(all_genomes)
        print(f"Now performing NMF with init:'{init}' and beta_loss:'{beta_loss}'")
        nmf_model = RunNMF(
            genomes=preprocessed.norm_genomes,
            signatures=sigs,
            init=init,
            beta_loss=beta_loss,
        )
        nmf_model.fit()
        outpath = Path(out) / folder
        dec = Decompose(
            nmf_model.W_norm,
            matrix[matrix.columns[0]],
            sigs,
            outpath,
            all_genomes,
            matrix.columns,
        )
        dec.decompose()
        decom_file = (
            outpath
            / "result-nmf"
            / "Decompose_Solution"
            / "Solution_Stats"
            / "Cosmic_SBS96_Decomposition_Log.txt"
        )
        read_file_decompose(decom_file, df_decompose)
        shutil.rmtree(outpath)
        df_decompose.to_csv(decompose_file_name, index=False, sep="\t")
    return pd.read_csv(decompose_file_name, sep="\t")

In [7]:
signatures = 48

decompose_file = "../results/sigprofiler.decompose.txt"

folder = Path("../results")
folder.mkdir(parents=True, exist_ok=True)

figure_folder = Path("../figures")
figure_folder.mkdir(parents=True, exist_ok=True)
filename = Path("../SBS/sbs.96.parquet")

df_96 = pd.read_parquet(filename)
df_96 = df_96.set_index("MutationType").reindex(MUTATION_LIST).reset_index()

all_genomes = np.array(df_96.iloc[:, 1:])
df_96.head()

Unnamed: 0,MutationType,ALL::PD3952a,ALL::PD3954a,ALL::PD3955a,ALL::PD3956a,ALL::PD3957a,ALL::PD3958a,ALL::PD3959a,ALL::PD3960a,ALL::PD3961a,...,Thy-AdenoCa::PTC-28C,Thy-AdenoCa::PTC-46C,Thy-AdenoCa::PTC-50C,Thy-AdenoCa::PTC-515C,Thy-AdenoCa::PTC-53C,Thy-AdenoCa::PTC-54C,Thy-AdenoCa::PTC-70C,Thy-AdenoCa::PTC-73C,Thy-AdenoCa::PTC-7C,Thy-AdenoCa::PTC-88C
0,A[C>A]A,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,15.0,8.0,9.0,11.0,4.0,17.0,12.0,8.0,35.0,6.0
1,A[C>A]C,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,...,15.0,2.0,5.0,4.0,1.0,3.0,2.0,2.0,26.0,2.0
2,A[C>A]G,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,9.0,0.0,0.0,3.0,3.0,1.0,2.0,0.0,19.0,2.0
3,A[C>A]T,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,9.0,1.0,2.0,4.0,3.0,0.0,1.0,2.0,20.0,1.0
4,C[C>A]A,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,30.0,13.0,6.0,8.0,4.0,12.0,8.0,3.0,49.0,4.0


In [8]:
def run_all_decompose(filename, signatures, df_96, all_genomes, folder):
    df_decompose = pd.read_csv(filename, sep="\t")
    if "sigprofiler" not in df_decompose.columns:
        df_decompose = RunSig.run(
            matrix=df_96, signatures=signatures, out=folder
        )
        # Temp save it
        df_decompose.to_csv(filename, index=False, sep="\t")
    df_decompose = run_nmfs(
        combinations(),
        all_genomes,
        signatures,
        df_96,
        folder,
        filename,
    )
    return df_decompose

df_decompose = run_all_decompose(decompose_file, signatures, df_96, all_genomes, folder)

In [9]:
df_decompose

Unnamed: 0,sigprofiler,None_frobenius,None_kullback-leibler,None_itakura-saito,random_frobenius,random_kullback-leibler,random_itakura-saito,nndsvd_frobenius,nndsvd_kullback-leibler,nndsvd_itakura-saito,nndsvda_frobenius,nndsvda_kullback-leibler,nndsvda_itakura-saito,nndsvdar_frobenius,nndsvdar_kullback-leibler,nndsvdar_itakura-saito
0,['SBS5'],['SBS5'],['SBS5'],['SBS17b'],['SBS5'],['SBS5'],['SBS28'],['SBS5'],['SBS5'],['SBS5'],['SBS5'],['SBS5'],"['SBS5', 'SBS17b']",['SBS5'],['SBS5'],['SBS5']
1,['SBS5'],"['SBS17b', 'SBS25']","['SBS39', 'SBS51']",['SBS5'],['SBS17b'],"['SBS5', 'SBS17a']","['SBS7d', 'SBS59']","['SBS17b', 'SBS22']","['SBS17b', 'SBS35']","['SBS1', 'SBS3', 'SBS5']","['SBS17b', 'SBS35']","['SBS39', 'SBS51']",['SBS5'],"['SBS17b', 'SBS22']","['SBS17b', 'SBS25']","['SBS1', 'SBS3']"
2,['SBS17b'],['SBS5'],"['SBS5', 'SBS17b']",['SBS35'],['SBS5'],"['SBS13', 'SBS21', 'SBS60', 'SBS91']",['SBS54'],"['SBS1', 'SBS5', 'SBS51']","['SBS1', 'SBS5', 'SBS39', 'SBS51']","['SBS1', 'SBS5', 'SBS51']","['SBS5', 'SBS17b']","['SBS5', 'SBS17b']",['SBS35'],"['SBS1', 'SBS5', 'SBS51', 'SBS86']","['SBS5', 'SBS51', 'SBS86']",['SBS5']
3,['SBS5'],"['SBS5', 'SBS86']","['SBS1', 'SBS32']","['SBS1', 'SBS5', 'SBS39']","['SBS17b', 'SBS22']",['SBS17b'],['SBS86'],"['SBS5', 'SBS17b']","['SBS5', 'SBS17b']","['SBS5', 'SBS17b']","['SBS5', 'SBS86']","['SBS5', 'SBS32', 'SBS34']","['SBS1', 'SBS5', 'SBS39', 'SBS84']","['SBS5', 'SBS17b']",['SBS5'],"['SBS5', 'SBS17b']"
4,['SBS35'],['SBS7c'],['SBS7c'],['SBS39'],['SBS7c'],"['SBS5', 'SBS43']",['SBS5'],['SBS7c'],"['SBS1', 'SBS7c']",['SBS7c'],['SBS39'],['SBS7c'],['SBS5'],['SBS7c'],['SBS7c'],"['SBS7c', 'SBS28', 'SBS58']"
5,['SBS5'],['SBS5'],"['SBS5', 'SBS39']",['SBS7c'],"['SBS5', 'SBS17a']",['SBS39'],['SBS7c'],"['SBS1', 'SBS26']","['SBS54', 'SBS87']",['SBS87'],['SBS17b'],['SBS39'],['SBS7c'],"['SBS1', 'SBS5', 'SBS54']","['SBS5', 'SBS87']","['SBS54', 'SBS87']"
6,['SBS27'],"['SBS5', 'SBS28']","['SBS1', 'SBS5']",['SBS32'],['SBS5'],['SBS5'],"['SBS22', 'SBS57']",['SBS25'],['SBS39'],['SBS39'],['SBS7c'],"['SBS5', 'SBS34']","['SBS1', 'SBS5', 'SBS32']",['SBS5'],['SBS39'],['SBS5']
7,"['SBS5', 'SBS13']","['SBS54', 'SBS87']","['SBS1', 'SBS5', 'SBS17a']","['SBS5', 'SBS87']","['SBS1', 'SBS5', 'SBS21']","['SBS54', 'SBS87']","['SBS1', 'SBS51', 'SBS52']","['SBS10a', 'SBS17b']","['SBS10a', 'SBS17b']",['SBS10a'],"['SBS5', 'SBS87']","['SBS1', 'SBS46']","['SBS1', 'SBS31']","['SBS10a', 'SBS17b']","['SBS5', 'SBS10a']","['SBS1', 'SBS17b']"
8,['SBS5'],"['SBS5', 'SBS10a', 'SBS59']",['SBS57'],['SBS39'],['SBS5'],['SBS5'],"['SBS1', 'SBS33', 'SBS49']","['SBS33', 'SBS57']","['SBS33', 'SBS57']",['SBS33'],"['SBS10c', 'SBS17b', 'SBS59']","['SBS5', 'SBS57']",['SBS5'],"['SBS33', 'SBS57']","['SBS33', 'SBS57']",['SBS33']
9,['SBS31'],['SBS13'],"['SBS1', 'SBS5']","['SBS5', 'SBS17b', 'SBS59']",['SBS5'],"['SBS5', 'SBS22']",['SBS24'],['SBS5'],"['SBS5', 'SBS22']",['SBS5'],"['SBS1', 'SBS5']","['SBS1', 'SBS5']","['SBS1', 'SBS5', 'SBS39']",['SBS5'],['SBS5'],['SBS5']


In [10]:
def most_similarity_decompose(dataframe: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate the most similar column based on cosine similarity.

    Args:
        dataframe (pd.DataFrame): Dataframe containing the data.

    Returns:
        pd.DataFrame: Dataframe with the similarities.
    """
    # Initialize LabelEncoder to transform categorical data
    label_encoder = LabelEncoder()
    # Apply literal_eval to convert string representation of lists to actual lists
    for col in dataframe.columns:
        dataframe[col] = dataframe[col].apply(literal_eval)
        dataframe[col] = dataframe[col].apply(lambda x: " ".join(x))
    # Apply LabelEncoder to transform categorical data into numerical representation
    df_encoded = dataframe.apply(label_encoder.fit_transform)
    # Calculate cosine similarities between transposed DataFrame and a reference column
    cosine_similarities = cosine_similarity(
        df_encoded.T, df_encoded["sigprofiler"].values.reshape(1, -1)
    )
    # Extract relevant information for constructing the resulting DataFrame
    col_array = np.array(dataframe.columns).reshape(-1, 1)
    sorted_indices = np.argsort(cosine_similarities.ravel().astype(float))[::-1][1:]
    values = cosine_similarities[sorted_indices]
    labels = np.array(
        [", ".join(name[0].split("_")) for name in col_array[sorted_indices]]
    )
    return pd.DataFrame({"cosine similarity": values.flatten(), "parameter": labels})
cosine_sim_df = most_similarity_decompose(df_decompose)
cosine_sim_df

Unnamed: 0,cosine similarity,parameter
0,0.835807,"nndsvd, kullback-leibler"
1,0.817534,"None, frobenius"
2,0.809778,"nndsvdar, itakura-saito"
3,0.809591,"nndsvd, itakura-saito"
4,0.794278,"None, kullback-leibler"
5,0.794145,"random, kullback-leibler"
6,0.781797,"nndsvda, frobenius"
7,0.780105,"nndsvda, itakura-saito"
8,0.776159,"nndsvda, kullback-leibler"
9,0.763089,"nndsvd, frobenius"


In [11]:
import seaborn as sns
import matplotlib.pyplot as plt

# Generate a heatmap of the best parameters based on cosine similarity.

# Extract data from the input DataFrame
data = cosine_sim_df.values
values = data[:, 0].astype(float)
labels = data[:, 1]
df = pd.DataFrame({"Cosine Similarity": values}, index=labels)
# Generate heatmap using Seabor
ax = sns.heatmap(
    df,
    linewidth=0.5,
    fmt="",
    annot=values.reshape(-1, 1),
    cmap="crest",
    xticklabels=True,
    yticklabels=True,
    vmin=0,
    vmax=1,
)
plt.tight_layout()
ax.set(ylabel="Parameters: (Init, Beta loss)", title="Cosine similarity between a NMF init and betaloss combination and the sigprofiler decompose")
image_name = figure_folder / "cosine.params.similarity.png"
plt.savefig(image_name, bbox_inches="tight", format="png", dpi=300, pad_inches=0.1)

![Cosine similarity between a NMF init and betaloss combination and the sigprofiler decompose](../figures/cosine.params.similarity.png "Cosine similarity between a NMF init and betaloss combination and the sigprofiler decompose")