In [2]:
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.mixture import GaussianMixture
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import scanpy as sc
import warnings
import diptest
warnings.filterwarnings('ignore')

In [None]:
class CellHeterogeneityAnalyzer:
    def __init__(self, expression_matrix, cell_types, gene_names=None):
        """
        Initialize the analyzer with expression data and cell type labels
        
        Parameters:
        -----------
        expression_matrix : np.ndarray
            Genes x Cells matrix of expression values
        cell_types : np.array
            Array of cell type labels for each cell
        gene_names : np.array, optional
            Array of gene names corresponding to rows in expression_matrix
        """
        self.expression = expression_matrix
        self.cell_types = cell_types
        self.gene_names = gene_names if gene_names is not None else np.arange(expression_matrix.shape[0])
        self.results = {}
        
    def calculate_transcriptional_entropy(self, cell_type=None):
        """
        Calculate Shannon entropy of gene expression for each cell
        Higher entropy indicates more diverse gene expression
        """
        # Normalize expression values to sum to 1 for each cell
        exp_norm = self.expression / (self.expression.sum(axis=0) + 1e-10)
        
        # Calculate entropy for each cell
        entropy = -np.sum(exp_norm * np.log2(exp_norm + 1e-10), axis=0)
        
        if cell_type is not None:
            mask = self.cell_types == cell_type
            entropy = entropy[mask]
            
        return entropy
    
    def calculate_activation_score(self, activation_genes, cell_type=None):
        """
        Calculate activation score based on mean expression of activation markers
        
        Parameters:
        -----------
        activation_genes : list
            List of gene names or indices considered as activation markers
        """
        if isinstance(activation_genes[0], str) and self.gene_names is not None:
            gene_indices = [np.where(self.gene_names == gene)[0][0] for gene in activation_genes]
        else:
            gene_indices = activation_genes
            
        activation_exp = self.expression[gene_indices]
        activation_score = np.mean(activation_exp, axis=0)
        
        if cell_type is not None:
            mask = self.cell_types == cell_type
            activation_score = activation_score[mask]
            
        return activation_score
    
    def calculate_gene_variability(self, cell_type=None):
        """
        Calculate coefficient of variation for each gene within cell types
        """
        if cell_type is not None:
            mask = self.cell_types == cell_type
            exp_subset = self.expression[:, mask]
        else:
            exp_subset = self.expression
            
        # Calculate CV (standard deviation / mean)
        cv = np.std(exp_subset, axis=1) / (np.mean(exp_subset, axis=1) + 1e-10)
        
        return pd.Series(cv, index=self.gene_names)
    
    def find_bimodal_genes(self, cell_type=None, significance=0.05):
        """
        Identify genes showing bimodal expression distribution
        Uses Hartigan's dip test for multimodality
        """
        
        if cell_type is not None:
            mask = self.cell_types == cell_type
            exp_subset = self.expression[:, mask]
        else:
            exp_subset = self.expression
            
        bimodal_genes = []
        p_values = []
        
        for i, gene in enumerate(self.gene_names):
            if np.var(exp_subset[i]) == 0:  # Skip genes with no variance
                continue
                
            # Perform normality test - rejection suggests possible bimodality
            _, p_val = diptest.diptest(exp_subset[i])
            
            if p_val < significance:
                bimodal_genes.append(gene)
                p_values.append(p_val)
            
                
        return pd.Series(p_values, index=bimodal_genes)
    
    def detect_subpopulations(self, cell_type, n_clusters=2, method='gmm'):
        """
        Detect subpopulations within a cell type using clustering
        
        Parameters:
        -----------
        cell_type : str
            Cell type to analyze
        n_clusters : int
            Number of subpopulations to look for
        method : str
            Clustering method ('gmm' or 'kmeans')
        """
        mask = self.cell_types == cell_type
        exp_subset = self.expression[:, mask].T  # Transpose to cells x genes
        
        # Standardize the data
        scaler = StandardScaler()
        exp_scaled = scaler.fit_transform(exp_subset)
        
        # Reduce dimensionality for clustering
        pca = PCA(n_components=min(50, exp_scaled.shape[1]))
        exp_pca = pca.fit_transform(exp_scaled)
        
        # Perform clustering
        if method == 'gmm':
            clusterer = GaussianMixture(n_components=n_clusters, random_state=42)
        else:
            clusterer = KMeans(n_clusters=n_clusters, random_state=42)
            
        labels = clusterer.fit_predict(exp_pca)
        
        # Calculate cluster characteristics
        cluster_stats = {}
        for i in range(n_clusters):
            cluster_mask = labels == i
            cluster_stats[f'Cluster_{i}'] = {
                'size': np.sum(cluster_mask),
                'prop': np.mean(cluster_mask),
                'entropy': np.mean(self.calculate_transcriptional_entropy(cell_type)[cluster_mask]),
                'top_genes': self._find_cluster_markers(exp_subset, cluster_mask)
            }
            
        return labels, cluster_stats
    
    def _find_cluster_markers(self, expression, cluster_mask, top_n=10):
        """
        Find marker genes for a cluster using differential expression
        """
        cluster_exp = expression[cluster_mask]
        other_exp = expression[~cluster_mask]
        
        # Calculate t-test for each gene
        t_stats = []
        p_vals = []
        for i in range(expression.shape[1]):
            t_stat, p_val = stats.ttest_ind(cluster_exp[:, i], other_exp[:, i])
            t_stats.append(t_stat)
            p_vals.append(p_val)
            
        # Get top genes by absolute t-statistic
        top_indices = np.argsort(np.abs(t_stats))[-top_n:]
        
        return pd.DataFrame({
            'gene': self.gene_names[top_indices],
            't_statistic': np.array(t_stats)[top_indices],
            'p_value': np.array(p_vals)[top_indices]
        })

    def analyze_cell_type_heterogeneity(self, cell_type, activation_genes=None):
        """
        Comprehensive analysis of heterogeneity for a specific cell type
        """
        results = {}
        
        # 1. Basic metrics
        mask = self.cell_types == cell_type
        results['cell_count'] = np.sum(mask)
        
        # 2. Transcriptional heterogeneity
        entropy = self.calculate_transcriptional_entropy(cell_type)
        results['entropy_mean'] = np.mean(entropy)
        results['entropy_std'] = np.std(entropy)
        
        # 3. Gene variability
        cv = self.calculate_gene_variability(cell_type)
        results['top_variable_genes'] = cv.nlargest(10)
        
        # 4. Bimodal genes
        bimodal = self.find_bimodal_genes(cell_type)
        results['bimodal_genes'] = bimodal
        
        # 5. Activation score if activation genes provided
        if activation_genes is not None:
            act_scores = self.calculate_activation_score(activation_genes, cell_type)
            results['activation_score_mean'] = np.mean(act_scores)
            results['activation_score_std'] = np.std(act_scores)
        
        # 6. Subpopulation detection
        labels, cluster_stats = self.detect_subpopulations(cell_type)
        results['subpopulations'] = cluster_stats
        
        return results