In [5]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import genpareto
import scanpy as sc
import scipy.sparse as sp

In [6]:
adata = sc.read_h5ad("/Users/chen_yiru/Desktop/simulation/data/raw/Sample_data_151676.h5ad")

In [7]:
def calc_zero_proportion(X):
    return np.mean(X == 0, axis=0)


In [8]:
def evaluate_fit(original, generated, quantiles=[0.25, 0.5, 0.75, 0.9, 0.95, 0.99]):
    def cohens_d(x1, x2):
        n1, n2 = len(x1), len(x2)
        var1, var2 = np.var(x1, ddof=1), np.var(x2, ddof=1)
        pooled_se = np.sqrt(((n1 - 1) * var1 + (n2 - 1) * var2) / (n1 + n2 - 2))
        return (np.mean(x1) - np.mean(x2)) / pooled_se

    def relative_error(x1, x2):
        return np.abs(np.mean(x1) - np.mean(x2)) / np.mean(x1)

    # 计算主要指标
    effect_size = cohens_d(original, generated)
    rel_error = relative_error(original, generated)
    ks_stat, _ = ks_2samp(original, generated)
    correlation = np.corrcoef(np.sort(original), np.sort(generated))[0, 1]

    # 计算分位数相对误差
    orig_quant = np.quantile(original, quantiles)
    gen_quant = np.quantile(generated, quantiles)
    quant_rel_errors = np.abs(orig_quant - gen_quant) / orig_quant

    # 评估结果
    results = {
        "Cohen's d": effect_size,
        "Relative Error": rel_error,
        "KS Statistic": ks_stat,
        "Correlation": correlation,
        "Quantile Relative Errors": dict(zip([f"{q*100}th" for q in quantiles], quant_rel_errors))
    }

    # 修改判定标准
    excellent = (abs(effect_size) < 0.05 and rel_error < 0.05 and ks_stat < 0.1 and correlation > 0.95
                )

    good = (abs(effect_size) < 0.1 and rel_error < 0.15 and ks_stat < 0.15 and correlation > 0.9 
            )

    fair = (abs(effect_size) < 0.2 and rel_error < 0.2 and ks_stat < 0.2 and correlation > 0.8 
            )
    if excellent:
        verdict = "Excellent fit"
    elif good:
        verdict = "Good fit"
    elif fair:
        verdict = "Fair fit"
    else:
        verdict = "Poor fit"

    results["Verdict"] = verdict

    return results

In [12]:
import numpy as np
import scipy.sparse as sp
from scipy.stats import genpareto, ks_2samp

def genpareto_fit_var(filtered_adata, mode="strict",threshold=0.99):
    if sp.issparse(filtered_adata.X):
        filtered_adata.X = filtered_adata.X.A

    # 计算基因方差
    gene_vars = np.var(filtered_adata.X, axis=0, ddof=1)  # 使用无偏估计

    if mode == "strict":
        thresholds = [99.5, 99, 98.5, 98, 97.5]
        best_score = float('inf')
        best_threshold = None
        best_samples = None
        best_evaluation = None

        for percentile in thresholds:
            current_threshold = np.percentile(gene_vars, percentile)
            main_data = gene_vars[gene_vars <= current_threshold]
            tail_data = gene_vars[gene_vars > current_threshold]

            # 直接对主体部分进行拟合
            shape, loc, scale = genpareto.fit(main_data)

            n_main = len(main_data)
            n_tail = len(tail_data)

            # 生成新样本
            new_main = genpareto.rvs(shape, loc, scale, size=n_main)
            new_tail = np.random.choice(tail_data, size=n_tail, replace=True)

            new_samples = np.concatenate([new_main, new_tail])
            new_samples = np.clip(new_samples, np.min(gene_vars), np.max(gene_vars))

            # 评估拟合效果
            evaluation = evaluate_fit(gene_vars, new_samples)

            # 计算综合得分
            score = (abs(evaluation["Cohen's d"]) + 
                     evaluation["Relative Error"] + 
                     evaluation["KS Statistic"] + 
                     (1 - evaluation["Correlation"]))

            if score < best_score:
                best_score = score
                best_threshold = percentile
                best_samples = new_samples
                best_evaluation = evaluation

        return best_samples, best_threshold, best_evaluation

    else:
        threshold = np.percentile(gene_vars, threshold*100)
        main_data = gene_vars[gene_vars <= threshold]
        tail_data = gene_vars[gene_vars > threshold]

        # 直接对主体部分进行拟合
        shape, loc, scale = genpareto.fit(main_data)

        n_main = len(main_data)
        n_tail = len(tail_data)

        # 生成新样本
        new_main = genpareto.rvs(shape, loc, scale, size=n_main)
        new_tail = np.random.choice(tail_data, size=n_tail, replace=True)

        new_samples = np.concatenate([new_main, new_tail])
        new_samples = np.clip(new_samples, np.min(gene_vars), np.max(gene_vars))

        return new_samples


{"Cohen's d": -0.0006803168468556671, 'Relative Error': 0.019774767652747598, 'KS Statistic': 0.06002969010948228, 'Correlation': 0.9911327255874208, 'Quantile Relative Errors': {'25.0th': 0.04388205566562302, '50.0th': 0.005639424308480667, '75.0th': 0.007241990035466662, '90.0th': 0.1356027875422833, '95.0th': 0.224480100243722, '99.0th': 0.008687105731989187}, 'Verdict': 'Excellent fit'}
97.5


In [None]:
from scipy.stats import ks_2samp

def genpareto_fit_mean(filtered_adata, mode="strict", threshold=None):
    if sp.issparse(filtered_adata.X):
        filtered_adata.X = filtered_adata.X.A

    gene_means = np.mean(filtered_adata.X, axis=0)

    if mode == "strict":
        thresholds = [99.5, 99, 98.5, 98, 97.5, 97]
        best_score = float('inf')
        best_threshold = None
        best_samples = None
        best_evaluation = None

        for percentile in thresholds:
            current_threshold = np.percentile(gene_means, percentile)
            main_data = gene_means[gene_means <= current_threshold]
            tail_data = gene_means[gene_means > current_threshold]

            shape, loc, scale = genpareto.fit(main_data)

            n_main = len(main_data)
            n_tail = len(tail_data)

            new_main = genpareto.rvs(shape, loc, scale, size=n_main)
            new_tail = np.random.choice(tail_data, size=n_tail, replace=True)

            new_samples = np.concatenate([new_main, new_tail])
            new_samples = np.clip(new_samples, np.min(gene_means), np.max(gene_means))

            # 评估拟合效果
            evaluation = evaluate_fit(gene_means, new_samples)
            
            score = (abs(evaluation["Cohen's d"]) + 
                     evaluation["Relative Error"] + 
                     evaluation["KS Statistic"] + 
                     (1 - evaluation["Correlation"]))

            if score < best_score:
                best_score = score
                best_threshold = current_threshold
                best_samples = new_samples
                best_evaluation = evaluation

        return best_samples, best_threshold, best_evaluation

    else:
        threshold = np.percentile(gene_means, 99)
        main_data = gene_means[gene_means <= threshold]
        tail_data = gene_means[gene_means > threshold]

        shape, loc, scale = genpareto.fit(main_data)

        n_main = len(main_data)
        n_tail = len(tail_data)

        new_main = genpareto.rvs(shape, loc, scale, size=n_main)
        new_tail = np.random.choice(tail_data, size=n_tail, replace=True)

        new_samples = np.concatenate([new_main, new_tail])
        new_samples = np.clip(new_samples, np.min(gene_means), np.max(gene_means))

        return new_samples
    


In [None]:
def genpareto_fit_zero_p(filtered_adata):
    if sp.issparse(filtered_adata.X):
        filtered_adata.X = filtered_adata.X.A

    zero_proportion = calc_zero_proportion(filtered_adata.X)
    def transform_data(data, epsilon=1e-10):
    # 将 [0, 1] 映射到 [0, +∞)
        return -np.log(1 - np.clip(data, 0, 1-epsilon))

    def inverse_transform(data):
        # 将 [0, +∞) 映射回 [0, 1]
        return 1 - np.exp(-data)

    def fit_extended_gpd(data, n_virtual_points=200, max_virtual_value=10):
        transformed_data = transform_data(data)
        
        # 添加虚拟点
        virtual_points = np.linspace(np.max(transformed_data), max_virtual_value, n_virtual_points)
        extended_data = np.concatenate([transformed_data, virtual_points])
        
        # 拟合 GPD
        shape, loc, scale = genpareto.fit(extended_data)
        
        return shape, loc, scale
    def sample_extended_gpd(shape, loc, scale, size):
        samples = genpareto.rvs(shape, loc, scale, size=size)
        
        # 将样本映射回 [0, 1] 区间
        return inverse_transform(samples)
    
    shape, loc, scale = fit_extended_gpd(zero_proportion, n_virtual_points=0.1*len(zero_proportion), max_virtual_value=10)
    new_samples = sample_extended_gpd(shape, loc, scale, size=len(zero_proportion))
    new_samples = np.clip(new_samples, 0, 1)

    return new_samples
