In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import pearsonr
from kneed import KneeLocator

from biorsp.analysis.find_points import find_foreground_background_points
from biorsp.analysis.rsp_calculations import (
    calculate_rsp_area,
    calculate_differences,
    calculate_rmsd,
    calculate_deviation_score,
)
from biorsp.preprocessing.filtering import filter_dge_matrix
from biorsp.preprocessing.dimensionality_reduction import compute_tsne
from biorsp.preprocessing.clustering import compute_dbscan
from biorsp.visualization.embedding import plot_embedding
from biorsp.visualization.rsp import (
    plot_foreground_background,
    plot_rsp_polar,
    plot_rsp_comparison,
)

In [2]:
def find_knee_point(
    rsp_results_df, column_name, curve="convex", direction="decreasing"
):
    """
    Find the knee point of the specified column in the dataframe using the Kneedle algorithm.
    Parameters:
    - rsp_results_df: DataFrame containing the RSP results.
    - column_name: The name of the column for which to find the knee point.
    - curve: Shape of the curve ('concave' or 'convex'). Adjust based on the plot shape.
    - direction: Direction of the curve ('increasing' or 'decreasing').
    Returns:
    - knee_point: The value of the knee point in the specified column.
    """
    sorted_values = np.sort(rsp_results_df[column_name].values)
    x = np.arange(len(sorted_values))
    y = sorted_values
    knee_locator = KneeLocator(x, y, curve=curve, direction=direction, online=True)
    knee_value = knee_locator.knee_y if knee_locator.knee_y else sorted_values[-1]
    return knee_value

In [3]:
def preprocess_data(dge_matrix_path):
    """
    Preprocess the DGE matrix by filtering low-expression genes and cells.
    """
    dge_matrix = pd.read_csv(dge_matrix_path, sep="\t", index_col=0)
    dge_matrix_filtered = filter_dge_matrix(
        dge_matrix, threshold_umi=500, threshold_gene=1
    )
    print(f"Filtered DGE matrix with shape: {dge_matrix_filtered.shape}")
    return dge_matrix_filtered

In [4]:
def compute_embeddings_and_clusters(dge_matrix_filtered, tsne_path, dbscan_path):
    """
    Compute t-SNE embeddings and DBSCAN clustering.
    """
    tsne_results = compute_tsne(dge_matrix_filtered, perplexity=30, random_state=42)
    dbscan_results = compute_dbscan(tsne_results, eps=4, min_samples=50)

    pd.DataFrame(tsne_results, columns=["x", "y"]).to_csv(tsne_path, index=False)
    dbscan_df = pd.DataFrame(dbscan_results, columns=["cluster"])
    dbscan_df.to_csv(dbscan_path, index=False)

    print(f"Computed t-SNE results with shape: {tsne_results.shape}")
    print(f"Computed DBSCAN results with unique clusters: {set(dbscan_results)}")

    return tsne_results, dbscan_df

In [5]:
def run_biorsp_analysis(dge_matrix_filtered, tsne_results, dbscan_df):
    """
    Run bioRSP analysis for all genes in the filtered DGE matrix.
    """
    rsp_results = []
    for gene in dge_matrix_filtered.index:
        fg_points, bg_points = find_foreground_background_points(
            gene_name=gene,
            dge_matrix=dge_matrix_filtered,
            tsne_results=tsne_results,
            dbscan_df=dbscan_df,
            threshold=1,
        )
        if len(fg_points) / len(bg_points) < 0.05:
            continue

        differences = calculate_differences(
            fg_points,
            bg_points,
            scanning_window=np.pi / 2,
            resolution=1000,
            vantage_point=bg_points.mean(axis=0),
            angle_range=np.array([0, 2 * np.pi]),
            mode="absolute",
        )
        rsp_area = calculate_rsp_area(
            differences, angle_range=[0, 2 * np.pi], resolution=1000
        )
        rmsd = calculate_rmsd(differences)
        deviation_score = calculate_deviation_score(
            rsp_area, differences, 1000, [0, 2 * np.pi]
        )

        rsp_results.append(
            {
                "Gene": gene,
                "RSP_Area": rsp_area,
                "RMSD": rmsd,
                "Deviation_Score": deviation_score,
            }
        )

    return pd.DataFrame(rsp_results)

In [6]:
def filter_significant_genes(
    rsp_results_df, area_threshold=0.1, deviation_threshold=0.5
):
    """
    Filter genes with significant spatial differences based on RSP area and deviation scores.
    """
    significant_genes = rsp_results_df[
        (rsp_results_df["RSP_Area"] < area_threshold)
        & (rsp_results_df["Deviation_Score"] > deviation_threshold)
    ]["Gene"].tolist()
    print(
        f"Identified {len(significant_genes)} candidate genes with significant spatial differences."
    )
    return significant_genes

In [7]:
def gene_to_gene_correlation(dge_matrix_filtered, candidate_genes):
    """
    Perform gene-to-gene correlation analysis for selected candidate genes.
    """
    candidate_expression = dge_matrix_filtered.loc[candidate_genes].T
    correlation_matrix = candidate_expression.corr(method="pearson")
    plt.figure(figsize=(12, 10))
    sns.heatmap(correlation_matrix, cmap="coolwarm", center=0, annot=False)
    plt.title("Gene-to-Gene Correlation among Candidate Genes")
    plt.show()
    return correlation_matrix

In [8]:
def run_full_analysis_workflow(dge_matrix_path, tsne_path, dbscan_path):
    """
    Execute the full analysis workflow for all genes.
    """
    dge_matrix_filtered = preprocess_data(dge_matrix_path)
    tsne_results, dbscan_df = compute_embeddings_and_clusters(
        dge_matrix_filtered, tsne_path, dbscan_path
    )
    rsp_results_df = run_biorsp_analysis(dge_matrix_filtered, tsne_results, dbscan_df)
    print("Completed bioRSP analysis.")

    # area_threshold = find_knee_point(rsp_results_df, "RSP_Area", curve="convex", direction="increasing")
    # deviation_threshold = find_knee_point(rsp_results_df, "Deviation_Score", curve="concave", direction="increasing")
    rsp_areas = rsp_results_df["RSP_Area"].tolist()
    top_20_rsp_area = sorted(rsp_areas)[:20]
    area_threshold = max(top_20_rsp_area)

    deviation_scores = rsp_results_df["Deviation_Score"].tolist()
    top_20_deviation = sorted(deviation_scores, reverse=True)[:20]
    deviation_threshold = min(top_20_deviation)

    candidate_genes = filter_significant_genes(
        rsp_results_df, area_threshold, deviation_threshold
    )

    correlation_matrix = gene_to_gene_correlation(dge_matrix_filtered, candidate_genes)
    print("Completed gene-to-gene correlation analysis.")

    rsp_results_df.to_csv("results/biorsp_results.csv", index=False)
    correlation_matrix.to_csv("results/gene_correlation_matrix.csv")
    print("Saved analysis results.")

In [None]:
if __name__ == "__main__":
    dge_matrix_path = "../../data/dge/GSM4647185_FetalHeart_1_dge.txt"
    tsne_path = "embeddings/tsne_results.csv"
    dbscan_path = "embeddings/tsne_dbscan_results.csv"

    os.makedirs("embeddings", exist_ok=True)
    os.makedirs("results", exist_ok=True)

    run_full_analysis_workflow(dge_matrix_path, tsne_path, dbscan_path)