In [1]:
import os
import re

# Change directory to the root of the SpatialDeconBenchmark repository
os.chdir('/'.join(re.match(r'^(.*SpatialDeconBenchmark)', os.getcwd()).group(0).split('/')))

In [21]:
import pandas as pd 
from libpysal.weights import W
from scipy.spatial import KDTree
from esda.moran import Moran, Moran_Local
from scipy.stats import pearsonr


class SpotMetrics:
    def __init__(self, deconvolved_mtx, st_counts_mtx, st_locations_mtx, _marker_genes_dict):
        """
        Initialize the SpotMetrics class with matrices and marker gene data.

        Parameters:
        - deconvolved_mtx: DataFrame, spot by celltype matrix.
        - st_counts_mtx: DataFrame, gene by spot matrix.
        - st_locations_mtx: DataFrame, spot by coordinate matrix.
        - marker_genes_dict: dict, mapping cell types to marker genes.
        """
        self.deconvolved_mtx = deconvolved_mtx
        self.st_counts_mtx = st_counts_mtx
        self.st_locations_mtx = st_locations_mtx
        self.marker_genes_dict = _marker_genes_dict
        self.pcc = None
        self.spatial_weights = None
        self.global_moran = None
        self.local_moran = None

    def check_mtx_format(self):
        """
        Check if matrix formats align correctly across provided dataframes.
        Raises ValueError if mismatches are found.
        """
        if set(self.st_counts_mtx.columns) != set(self.st_locations_mtx.index):
            raise ValueError("Mismatch in spot labels between counts and location matrices.")

        if set(self.st_counts_mtx.columns) != set(self.deconvolved_mtx.index):
            print(set(self.st_counts_mtx.columns[:10]))
            print(set(self.deconvolved_mtx.index[:10]))
            raise ValueError("Mismatch in spot labels between counts and deconvolved matrices.")

        # Subset matrices to ensure alignment
        self.deconvolved_mtx = self.deconvolved_mtx.loc[self.st_counts_mtx.columns]
        self.st_locations_mtx = self.st_locations_mtx.loc[self.st_counts_mtx.columns]

    def calculate_pcc(self):
        """
        Calculate Pearson's Correlation Coefficient between estimated cell type abundances
        and expressions of canonical marker genes. Results are stored in a dictionary.
        """
        cell_types_pcc = {}

        for cell_type, markers in self.marker_genes_dict.items():
            marker_pccs = {}
            for marker in markers:
                # Extract gene expression and cell type abundance
                gene_expression = self.st_counts_mtx.loc[marker]
                cell_abundance = self.deconvolved_mtx[cell_type]
                
                # Calculate Pearson correlation
                pcc, _ = pearsonr(gene_expression, cell_abundance)
                marker_pccs[marker] = pcc

            cell_types_pcc[cell_type] = marker_pccs
        
        self.pcc = cell_types_pcc

    def compute_spatial_weights(self, k):
        """
        Compute spatial weights for each spot based on k nearest neighbors.
        """
        # Extract coordinates and build KDTree
        xy_coords = self.st_locations_mtx[['x', 'y']].values
        tree = KDTree(xy_coords)

        weights = {}
        for i, coord in enumerate(xy_coords):
            distances, indices = tree.query(coord, k=k+1)  # Include self in query
            neighbors = indices[1:]  # Exclude the spot itself
            weights[i] = {n: 1 for n in neighbors}  # Uniform weight of 1 for each neighbor

        self.spatial_weights = W(weights)

    def calculate_moran(self):
        """
        Compute Global and Local Moran's I statistics for cell type distributions.
        Requires spatial weights to be pre-computed.
        """
        if not hasattr(self, 'spatial_weights'):
            raise ValueError("Spatial weights must be calculated before Moran's I.")

        global_moran = {}
        local_moran = {}

        for cell_type in self.deconvolved_mtx.columns:
            values = self.deconvolved_mtx[cell_type]
            w = self.spatial_weights

            # Calculate global and local Moran's I
            moran_global = Moran(values, w)
            moran_local = Moran_Local(values, w)

            global_moran[cell_type] = (moran_global.I, moran_global.p_sim)
            local_moran[cell_type] = (moran_local.Is, moran_local.p_sim)

        self.global_moran = global_moran
        self.local_moran = local_moran

In [22]:
# Cell types and markers selected for visualization in benchmark paper
marker_genes_dict = {
    "Oligo": ["Mog", "Plp1"],
    "Ext_Thal": ["Synpo2", "Ptpn3", "Slc17a6"],
    "Ext_ClauPyr": ["Nr4a2", "Synpr"]
}

In [25]:
# Load the Visium deconvolved matrix
visium_spatial = pd.read_csv(os.getcwd() + "/data/Visium/filtered_st_counts.csv", index_col=0)
visium_locations = pd.read_csv(os.getcwd() + "/data/Visium/filtered_st_locations.csv", index_col=0)
visium_locations.index = list(map(lambda x: ".".join(x.split("-")), visium_locations.index))

results_dir = os.getcwd() + "/results/Visium/"
benchmark = []

In [26]:
def run_benchmarking():
    for result in os.listdir(results_dir):
        method = result.split("_")[1].split(".")[0]
        
        visium_deconvolved = pd.read_csv(results_dir + result, index_col=0)
        visium_deconvolved.index = list(map(lambda x: ".".join(x.split("-")), visium_deconvolved.index))
        
        visium_metrics = SpotMetrics(visium_deconvolved, visium_spatial, visium_locations, marker_genes_dict)
        visium_metrics.check_mtx_format()
        visium_metrics.calculate_pcc()
        visium_metrics.compute_spatial_weights(6)
        visium_metrics.calculate_moran()
        
        # Collecting data for benchmarking
        for cell_type, pcc_values in visium_metrics.pcc.items():
            for marker, pcc in pcc_values.items():
                global_moran, moran_p_value = visium_metrics.global_moran[cell_type]
                data = {
                    'method': method,
                    'cell_type': cell_type,
                    'marker': marker,
                    'PCC': pcc,
                    'MI': global_moran,
                    'MI_p_value': moran_p_value
                }
                benchmark.append(pd.DataFrame([data]))
        
        # Prepare dataframes for Local Moran's I and p-values
        local_moran_data = []
        local_moran_p_values = []

        for cell_type in visium_metrics.local_moran:
            local_is, p_sim = visium_metrics.local_moran[cell_type]
            local_moran_df = pd.DataFrame(local_is, columns=[cell_type], index=visium_metrics.deconvolved_mtx.index)
            local_moran_p_df = pd.DataFrame(p_sim, columns=[cell_type], index=visium_metrics.deconvolved_mtx.index)
            local_moran_data.append(local_moran_df)
            local_moran_p_values.append(local_moran_p_df)

        # Concatenate dataframes horizontally (cell types as columns)
        all_local_moran = pd.concat(local_moran_data, axis=1)
        all_local_p_values = pd.concat(local_moran_p_values, axis=1)

        # Save to CSV
        all_local_moran.to_csv(f"{os.getcwd()}/results/Visium_LMI/{method}_LMI.csv")
        all_local_p_values.to_csv(f"{os.getcwd()}/results/Visium_LMI/{method}_LMI_p_values.csv")


# Run benchmarking
run_benchmarking()
benchmark = pd.concat(benchmark, ignore_index=True)
benchmark.to_csv(os.getcwd() + "/results/Visium_Benchmarking_Results.csv", index=False)

  k = k_num / k_den
  return self.n / s0 * inum / self.z2ss
  self.z /= sy
  k = k_num / k_den
  return self.n / s0 * inum / self.z2ss
  self.z /= sy
