# bioRSP module

In [1]:
import numpy as np

In [2]:
def find_foreground_background_points(
    gene_name, dge_matrix, tsne_results, dbscan_df, threshold=1, selected_clusters=None
):
    """
    Function to find foreground and background points based on gene expression levels in BioRSP,
    with DBSCAN results integrated to allow for cluster-based analysis. Both foreground and background
    points are filtered by clusters if specified.

    Parameters:
    - gene_name: The gene of interest.
    - dge_matrix: A dataframe containing the gene expression data (rows = genes, columns = cells).
    - tsne_results: A 2D numpy array with the t-SNE (or UMAP) coordinates for each cell.
    - dbscan_df: DataFrame with DBSCAN cluster labels for each cell.
    - threshold: The expression level threshold for the foreground points. Default is 1.
    - selected_clusters: A list of cluster labels to focus on, or None to include all cells.

    Returns:
    - foreground_points: A numpy array of (x, y) coordinates for cells with gene expression above the threshold.
    - background_points: A numpy array of (x, y) coordinates for all cells (or cells in selected clusters).
    """

    dbscan_clusters = dbscan_df["Cluster"].values

    if len(dbscan_clusters) != tsne_results.shape[0]:
        raise ValueError(
            "DBSCAN cluster labels do not match the number of t-SNE results."
        )

    if gene_name not in dge_matrix.index:
        raise ValueError(f"Gene {gene_name} not found in the dataset.")

    gene_expression = dge_matrix.loc[gene_name]
    cell_barcodes = dge_matrix.columns
    cell_index_map = {barcode: idx for idx, barcode in enumerate(cell_barcodes)}

    # Foreground points: cells where gene expression is above the threshold
    foreground_indices = gene_expression[gene_expression > threshold].index
    foreground_points = [
        tsne_results[cell_index_map[barcode]] for barcode in foreground_indices
    ]
    foreground_points = np.array(foreground_points)

    if selected_clusters is not None:
        foreground_indices_filtered = [
            i
            for i in foreground_indices
            if dbscan_clusters[cell_index_map[i]] in selected_clusters
        ]
        foreground_points = np.array([
            tsne_results[cell_index_map[barcode]]
            for barcode in foreground_indices_filtered
        ])

        # Filter cells that are part of the selected clusters for background points
        selected_indices = [
            idx
            for idx, cluster in enumerate(dbscan_clusters)
            if cluster in selected_clusters
        ]
        background_points = tsne_results[selected_indices]
    else:
        background_points = tsne_results

    return foreground_points, np.array(background_points)

In [3]:
def convert_to_polar(coords, vantage_point):
    if coords.shape[0] == 0:
        return np.array([]), np.array([])
    translated_coords = coords - vantage_point

    r = np.sqrt(translated_coords[:, 0] ** 2 + translated_coords[:, 1] ** 2)
    theta = np.arctan2(translated_coords[:, 1], translated_coords[:, 0])
    theta = np.mod(theta + 2 * np.pi, 2 * np.pi)

    sorted_indices = np.argsort(theta)
    sorted_r = r[sorted_indices]
    sorted_theta = theta[sorted_indices]

    return sorted_r, sorted_theta

In [4]:
def in_scanning_range(pt_theta, angle, window):
    angular_difference = np.abs((pt_theta - angle + np.pi) % (2 * np.pi) - np.pi)
    return angular_difference <= window / 2

In [5]:
def compute_histogram(projection, resolution, angle, window):
    start_angle = (angle - window / 2) % (2 * np.pi)
    end_angle = (angle + window / 2) % (2 * np.pi)
    bin_edges = np.linspace(0, window, resolution + 1)

    if start_angle > end_angle:
        # Scanning window crosses the 2*pi boundary
        # Adjust angles by subtracting start_angle and wrapping around
        adjusted_projection = (projection - start_angle) % (2 * np.pi)
        # Keep only the points within the window (0 to window)
        adjusted_projection = adjusted_projection[adjusted_projection <= window]
    else:
        # Scanning window does not cross boundary
        # Keep only points within the scanning window
        adjusted_projection = projection[
            (projection >= start_angle) & (projection <= end_angle)
        ]
        # Adjust angles to start from 0
        adjusted_projection = adjusted_projection - start_angle

    histogram, _ = np.histogram(adjusted_projection, bins=bin_edges)
    return histogram

In [6]:
def compute_cdf(histogram):
    total = np.sum(histogram)
    if total > 0:
        cdf = np.cumsum(histogram).astype(np.float64) / total
    else:
        cdf = np.zeros_like(histogram, dtype=np.float64)
    return cdf

In [7]:
def compute_cdfs(
    fg_projection, bg_projection, angle, scanning_window, resolution, mode
):
    fg_histogram = compute_histogram(
        fg_projection, resolution, angle, scanning_window
    )
    bg_histogram = compute_histogram(
        bg_projection, resolution, angle, scanning_window
    )

    fg_cdf = compute_cdf(fg_histogram)
    bg_cdf = compute_cdf(bg_histogram)

    if mode == "absolute":
        # Avoid division by zero
        bg_total = np.sum(bg_histogram)
        fg_total = np.sum(fg_histogram)
        if bg_total > 0:
            scaling_factor = fg_total / bg_total
            fg_cdf *= scaling_factor
        else:
            # No background points; cannot scale
            pass  # fg_cdf remains as is

    return fg_cdf, bg_cdf

In [8]:
def compute_area(fg_cdf, bg_cdf, window):
    """
    Compute the area under the absolute difference between the foreground and background CDFs.
    """
    dx = window / fg_cdf.shape[0]
    area_diff = np.trapz(np.abs(bg_cdf - fg_cdf), dx=dx)
    # area_diff = np.abs(area_diff)  # Ensure area_diff is non-negative
    return area_diff

In [9]:
def calculate_differences(
    foreground_points,
    background_points,
    scanning_window,
    resolution,
    vantage_point,
    angle_range,
    mode,
):
    # Convert to polar coordinates and sort
    _, fg_theta = convert_to_polar(foreground_points, vantage_point)
    _, bg_theta = convert_to_polar(background_points, vantage_point)

    # Calculate CDFs for each angle
    differences = np.empty(resolution)
    angles = np.linspace(angle_range[0], angle_range[1], resolution, endpoint=False)
    delta_theta = (angle_range[1] - angle_range[0]) / resolution

    for i, angle in enumerate(angles):
        fg_in_range = in_scanning_range(fg_theta, angle, scanning_window)
        bg_in_range = in_scanning_range(bg_theta, angle, scanning_window)

        fg_projection = fg_theta[fg_in_range]
        bg_projection = bg_theta[bg_in_range]

        fg_cdf, bg_cdf = compute_cdfs(
            fg_projection, bg_projection, angle, scanning_window, resolution, mode
        )
        differences[i] = compute_area(fg_cdf, bg_cdf, scanning_window)

    return differences

In [10]:
def calculate_rsp_area(
    foreground_points,
    background_points,
    vantage_point,
    scanning_window=np.pi,
    resolution=1000,
    angle_range=np.array([0, 2 * np.pi]),
    mode="absolute",
):
    """
    Calculate the RSP area and RMSD.

    Returns:
        - rsp_area (float): Calculated RSP area.
        - differences (ndarray): Differences calculated during the process.
        - rmsd (float): Root Mean Square Deviation.
    """
    differences = calculate_differences(
        foreground_points,
        background_points,
        scanning_window,
        resolution,
        vantage_point,
        angle_range,
        mode,
    )

    delta_theta = (angle_range[1] - angle_range[0]) / resolution
    segment_areas = 0.5 * delta_theta * np.power(differences, 2)
    rsp_area = np.sum(segment_areas)

    # RMSD calculation
    rmsd = np.sqrt(np.mean(np.square(differences)))

    return rsp_area, differences, rmsd

In [11]:
def calculate_deviation_score(rsp_area, differences, resolution, angle_range):
    """
    Calculate the deviation score.

    Parameters:
        - rsp_area (float): Calculated RSP area.
        - differences (ndarray): Differences calculated during the process.
        - resolution (int): Resolution for the calculation.
        - angle_range (array): The angular range over which the radar scans.

    Returns:
        - deviation_score (float): Deviation score.
    """
    # Equivalent circular radius
    radius = np.sqrt(rsp_area / np.pi)

    delta_theta = (angle_range[1] - angle_range[0]) / resolution

    # Calculate the overlap area
    intersection_area = np.sum(
        np.minimum(differences, radius)
    ) * delta_theta

    # Deviation score
    if rsp_area != 0:
        deviation_score = intersection_area / rsp_area
    else:
        deviation_score = 0  # or handle as undefined

    return deviation_score

In [12]:
def rsp(
    foreground_points,
    background_points,
    vantage_point,
    scanning_window=np.pi,
    resolution=1000,
    angle_range=np.array([0, 2 * np.pi]),
    mode="absolute",
):
    """
    Perform the full RSP analysis including RSP area, RMSD, and deviation score.

    Returns:
        - rsp_area (float): Calculated RSP area.
        - rmsd (float): Root Mean Square Deviation.
        - deviation_score (float): Deviation score.
        - differences (ndarray): Differences calculated during the process.
    """
    rsp_area, differences, rmsd = calculate_rsp_area(
        foreground_points,
        background_points,
        vantage_point,
        scanning_window,
        resolution,
        angle_range,
        mode,
    )

    deviation_score = calculate_deviation_score(rsp_area, differences, resolution, angle_range)

    return rsp_area, rmsd, deviation_score, differences