In [None]:
pip install numpy pandas scikit-learn anndata scipy

In [None]:
pip install scanpy

In [None]:
import numpy as np
import pandas as pd
import sklearn.metrics
import scipy.sparse as sparse
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
import scanpy as sc
import matplotlib.pyplot as plt

In [None]:
adata = sc.read("/work/linear model/New_datasets_fixed/normalized/focal_cortical_processed_normalized.h5ad")
adata

In [None]:
# Split the data set into train and test
from sklearn.model_selection import train_test_split


split_key = "split"
adata.obs[split_key] = "train"
idx = list(range(len(adata)))
idx_train, idx_test = train_test_split(adata.obs_names, test_size=0.1, random_state=42)
adata.obs.loc[idx_train, split_key] = "train"
adata.obs.loc[idx_test, split_key] = "test"

In [None]:
adata_train = adata[adata.obs["split"] == "train"].copy()
adata_test = adata[adata.obs["split"] == "test"].copy()

In [None]:
adata

In [None]:
class PCA_recon():
   def __init__(self, n_components):
       self.pipeline = Pipeline([('scaling', StandardScaler(with_mean=True, with_std=False)), 
                            ('pca', PCA(n_components=n_components, random_state=42))])

   def fit(self, adata):
       X = adata.X
       if sparse.issparse(X):
           X = np.array(X.todense()).astype(np.float32)

       self.pipeline.fit(X)
   def transform(self, adata):
       X = adata.X
       if sparse.issparse(X):
           X = np.array(X.todense()).astype(np.float32)

       return self.pipeline.transform(X)

   def predict(self, adata):
       adata_out = adata.copy()

       scaler = self.pipeline.named_steps['scaling']
       pca = self.pipeline.named_steps['pca']

       X_transformed = self.transform(adata)

       # Undo PCA transformation
       X_reconstructed_centered = np.dot(X_transformed, pca.components_)

       # Add back the original mean from the scaler
       X_reconstructed = X_reconstructed_centered + scaler.mean_
       adata_out.X = X_reconstructed
       return adata_out

In [None]:
# Step 1: Initialize PCA model
model = PCA_recon(n_components=128)

# Step 2: Fit the model ONLY on the training set
model.fit(adata_train)

# Step 3: Apply transformation (dimensionality reduction)
#X_pca = model.transform(adata)  

# Step 4: Predict (Reconstruct the original data)
#adata_reconstructed = model.predict(adata)

# Check the reconstructed data
#print(adata_reconstructed.X.shape) 


# the reconstructed data shape should match the original^^^^^^^


# Step 3: Apply transformation (dimensionality reduction) on both sets
X_pca_train = model.transform(adata_train)
#X_pca_test = model.transform(adata_test)

# Step 4: Reconstruct original data from both train and test sets
#adata_train_reconstructed = model.predict(adata_train)
adata_test_reconstructed = model.predict(adata_test)


In [None]:
z = model.transform(adata)

In [None]:
z_basal = np.mean(z, axis=0, keepdims=True)  # Baseline latent representation

In [None]:
# If rec is an AnnData object, extract the X attribute (i.e., the data matrix)
import anndata
if isinstance(adata_test_reconstructed, anndata.AnnData):
    adata_test_reconstructed = adata_test_reconstructed.X

# Now, rec should be a numpy array or sparse matrix, which is what obsm expects
adata_test.obsm["X_reconstructed"] = adata_test_reconstructed

adata_test.write("adata_post_with_latent_and_reconstructed_Focal_cortical_Linear_model.h5ad")

In [None]:
explained_variance = model.pipeline.named_steps['pca'].explained_variance_ratio_
print(f"Variance explained by each component: {explained_variance}")
print(f"Total variance explained: {explained_variance.sum()}")


In [None]:
from sklearn.metrics import mean_squared_error, root_mean_squared_error, mean_absolute_error, r2_score

# Original and reconstructed data
X_original = adata.X
X_reconstructed = adata_reconstructed.X

# Compute MSE as a baseline reconstruction error
mse_baseline = mean_squared_error(X_original, X_reconstructed)
print(f"PCA Reconstruction MSE: {mse_baseline}")

mae_baseline = mean_absolute_error(X_original, X_reconstructed)
print(f"PCA Reconstruction MAE: {mae_baseline}")


rmse_baseline = root_mean_squared_error(X_original, X_reconstructed)
print(f"PCA Reconstruction RMSE: {rmse_baseline}")

r2_baseline = r2_score(X_original, X_reconstructed)
print(f"PCA R2_score: {r2_baseline}")


In [None]:
from sklearn.metrics import mean_squared_error, root_mean_squared_error, mean_absolute_error, r2_score

In [None]:
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import KBinsDiscretizer
from sklearn.cluster import KMeans
from sklearn.metrics import mutual_info_score
from sklearn.model_selection import train_test_split

import numpy as np
import anndata
import pandas as pd

def encode_categorical(data):
    encoders = []
    encoded_data = np.zeros_like(data, dtype=int)
    for i in range(data.shape[1]):
        le = LabelEncoder()
        encoded_data[:, i] = le.fit_transform(data[:, i])
        encoders.append(le)
    return encoded_data, encoders

def prep_data(adata, embedding, covriate_keys=None, continuous_covriate_keys=None, test_size=0.25):
    idx_train, idx_test = train_test_split(
        range(len(adata.obs_names)), test_size=test_size, random_state=42
    )
    print("Splitting complete.")
    
    encoded_factors_of_variation, _ = encode_categorical(adata.obs[covriate_keys].values)
  #  print("Encoded factors of variation:", np.unique(encoded_factors_of_variation, axis=0))
   # print("Encoded factors of variation sample:", encoded_factors_of_variation[:5])
   # print("Categorical encoding complete.")
    
    if isinstance(embedding, anndata.AnnData):  
        embedding_data = embedding.X
    else:
        embedding_data = embedding

   # print("Embedding shape:", embedding_data.shape)
  #  print("Number of train indices:", len(idx_train))
   # print("Number of test indices:", len(idx_test))
    
    mus_train = np.array(embedding_data[idx_train])
    ys_train = np.array(encoded_factors_of_variation[idx_train])
    mus_test = np.array(embedding_data[idx_test])
    ys_test = np.array(encoded_factors_of_variation[idx_test])
    
   # print("mus_train shape:", mus_train.shape)
   # print("ys_train shape:", ys_train.shape)
   # print("mus_test shape:", mus_test.shape)
   # print("ys_test shape:", ys_test.shape)
   # print("Sample of mus_train:", mus_train[:, :5])
   # print("Sample of ys_train:", ys_train[:, :5])
    #print("Min/Max of mus_train:", mus_train.min(), mus_train.max())
   # print("Unique values in ys_train:", np.unique(ys_train))

    return mus_train.T.copy(), ys_train.T.copy(), mus_test.T.copy(), ys_test.T.copy()

def compute_mig(mus_train, ys_train, covariate_names=None):
    """Computes the mutual information gap."""
    return _compute_mig(mus_train, ys_train, covariate_names)

def _compute_mig(mus_train, ys_train, covariate_names=None):
    """Computes MIG score based on latent codes and covariates."""
    score_dict = {}
    discretized_mus = make_discretizer(mus_train, discretizer_fn=_histogram_discretize)
   # print("Sample Discretized Latent Variables:\n", discretized_mus[:, :5])
    
    m = discrete_mutual_info(discretized_mus, ys_train)

    if covariate_names is None:
        covariate_names = [f"Covariate {j}" for j in range(m.shape[1])]
        
    for j in range(m.shape[1]):
        top_indices = np.argsort(m[:, j])[::-1][:3]
        top_scores = m[top_indices, j]
        print(f"Top 3 MI scores for covariate '{covariate_names[j]}':")
        for idx, score in zip(top_indices, top_scores):
            print(f"  Latent dim {idx}: MI = {score:.4f}")

    assert m.shape[0] == mus_train.shape[0]
    assert m.shape[1] == ys_train.shape[0]

    entropy = discrete_entropy(ys_train)
    sorted_m = np.sort(m, axis=0)[::-1]

    score_dict["discrete_mig"] = np.mean(
        np.divide(sorted_m[0, :] - sorted_m[1, :], entropy[:])
    )

    print("Þetta er score:", score_dict)
    print("Entropy values:", entropy)
    return score_dict

def discrete_mutual_info(mus, ys):
    num_codes = mus.shape[0]
    num_factors = ys.shape[0]
    m = np.zeros([num_codes, num_factors])
    
    for i in range(num_codes):
        for j in range(num_factors):
            m[i, j] = mutual_info_score(ys[j, :], mus[i, :])
    
    return m

def discrete_entropy(ys):
    num_factors = ys.shape[0]
    h = np.zeros(num_factors)
    
    for j in range(num_factors):
        h[j] = mutual_info_score(ys[j, :], ys[j, :])
    
    return h

def _identity_discretizer(target, num_bins):
    del num_bins
    return target

def make_discretizer(target, num_bins=10, discretizer_fn=_identity_discretizer):
    return discretizer_fn(target, num_bins)

def _histogram_discretize(target, num_bins=10):
    discretized = np.zeros_like(target)
    for i in range(target.shape[0]):
        discretized[i, :] = np.digitize(target[i, :], np.histogram(
            target[i, :], num_bins)[1][:-1])
    return discretized

def k_means_discretize(target, num_clusters=10):
    discretized = np.zeros_like(target)
    for i in range(target.shape[0]):
        latent_variable = target[i, :].reshape(-1,1)
        kmeans = KMeans(n_clusters = num_clusters, random_state=0)
        kmeans.fit(latent_variable)
        discretized[i,:]=kmeans.labels_
    return discretized

def score_disentanglement(adata, embedding_data, embedding_basal, covriate_keys=None, continuous_covriate_keys=None, test_size=0.25):
    mus_train, ys_train, mus_test, ys_test = prep_data(adata, embedding_data, covriate_keys=covriate_keys)
    print('Computing MIG')
    mig = compute_mig(mus_train, ys_train, covariate_names=covriate_keys)
    return mig

# Run MIG score
mig_1 = score_disentanglement(
    adata,
    z,
    z_basal,
    covriate_keys=["cell_type", "tissue", "development_stage", "donor_id", "development_stage_ontology_term_id", "lateralization"]
)

print("MIG Score:", mig_1)


In [None]:
# Finalized DCI computation based on disentanglement_lib

import numpy as np
import pandas as pd
import anndata
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from scipy.stats import entropy

# === Encoding and Preprocessing ===
def encode_categorical(data):
    encoded_data = np.zeros_like(data, dtype=int)
    for i in range(data.shape[1]):
        le = LabelEncoder()
        encoded_data[:, i] = le.fit_transform(data[:, i])
    return encoded_data

def remove_duplicate_columns(df):
    df_unique = df.T.drop_duplicates().T
    return df_unique

def prep_data(adata, embedding, covariate_keys, test_size=0.25):
    idx_train, idx_test = train_test_split(
        range(len(adata)), test_size=test_size, random_state=42
    )
    cov_df = adata.obs[covariate_keys].copy()
    cov_df = remove_duplicate_columns(cov_df)
    encoded_factors = encode_categorical(cov_df.values)
    embedding_data = embedding.X if isinstance(embedding, anndata.AnnData) else embedding
    mus_train = embedding_data[idx_train]
    mus_test = embedding_data[idx_test]
    ys_train = encoded_factors[idx_train]
    ys_test = encoded_factors[idx_test]
    return mus_train.T, ys_train.T, mus_test.T, ys_test.T

# === Importance Matrix ===
def compute_importance_rf(x_train, y_train, x_test, y_test):
    num_factors = y_train.shape[0]
    num_codes = x_train.shape[0]
    importance_matrix = np.zeros((num_codes, num_factors))
    train_acc = []
    test_acc = []
    for i in range(num_factors):
        model = RandomForestClassifier(random_state=42, max_depth=5)
        model.fit(x_train.T, y_train[i])
        importance_matrix[:, i] = np.abs(model.feature_importances_)
        train_acc.append(np.mean(model.predict(x_train.T) == y_train[i]))
        test_acc.append(np.mean(model.predict(x_test.T) == y_test[i]))
    return importance_matrix, np.mean(train_acc), np.mean(test_acc)

# === Disentanglement ===
def disentanglement_per_code(importance_matrix):
    row_sums = importance_matrix.sum(axis=1, keepdims=True)
    safe_matrix = np.where(row_sums == 0, 1e-11, row_sums)
    normalized = importance_matrix / safe_matrix
    return 1. - entropy(normalized.T + 1e-11, base=importance_matrix.shape[1])

def disentanglement(importance_matrix):
    per_code = disentanglement_per_code(importance_matrix)
    total = importance_matrix.sum()
    if total == 0.:
        return 0.0
    code_importance = importance_matrix.sum(axis=1) / total
    return np.sum(per_code * code_importance)

# === Completeness ===
def completeness_per_factor(importance_matrix):
    return 1. - entropy(importance_matrix + 1e-11, base=importance_matrix.shape[0])

def completeness(importance_matrix):
    per_factor = completeness_per_factor(importance_matrix)
    total = importance_matrix.sum()
    if total == 0.:
        return 0.0
    factor_importance = importance_matrix.sum(axis=0) / total
    return np.sum(per_factor * factor_importance)

# === DCI Master Function ===
def compute_dci(mus_train, ys_train, mus_test, ys_test):
    importance_matrix, train_acc, test_acc = compute_importance_rf(
        mus_train, ys_train, mus_test, ys_test
    )
    threshold = 1e-11
    importance_matrix = np.where(importance_matrix < threshold, 0, importance_matrix)
    return {
        "disentanglement": disentanglement(importance_matrix),
        "completeness": completeness(importance_matrix),
        "informativeness_train": train_acc,
        "informativeness_test": test_acc,
    }

In [None]:
covariate_keys = ["cell_type", "tissue", "development_stage", "donor_id", "development_stage_ontology_term_id", "lateralization"]
mus_train, ys_train, mus_test, ys_test = prep_data(
    adata, z,covariate_keys=covariate_keys )
dci_scores = compute_dci(mus_train, ys_train, mus_test, ys_test)
dci_scores

In [None]:
#SAP score
from sklearn import svm

def compute_sap(mus, ys, mus_test, ys_test, continuous_factors):
    """Computes the SAP score.

    Args:
        mus, ys, mus_test, ys_test
        continuous_factors: Factors are continuous variable (True) or not (False).

    Returns:
        Dictionary with SAP score.
    """

    return _compute_sap(mus, ys, mus_test, ys_test, continuous_factors)

def _compute_sap(mus, ys, mus_test, ys_test, continuous_factors):
    """Computes score based on both training and testing codes and factors."""
    score_matrix = compute_score_matrix(mus, ys, mus_test, ys_test, continuous_factors)
    # Score matrix should have shape [num_latents, num_factors].
    assert score_matrix.shape[0] == mus.shape[0]
    assert score_matrix.shape[1] == ys.shape[0]
    scores_dict = {}
    scores_dict["SAP_score"] = compute_avg_diff_top_two(score_matrix)

    return scores_dict

def compute_score_matrix(mus, ys, mus_test, ys_test, continuous_factors):
    """Compute score matrix as described in Section 3."""
    num_latents = mus.shape[0]
    num_factors = ys.shape[0]
    score_matrix = np.zeros([num_latents, num_factors])
    for i in range(num_latents):
        for j in range(num_factors):
            mu_i = mus[i, :]
            y_j = ys[j, :]
            if continuous_factors:
                # Attribute is considered continuous.
                cov_mu_i_y_j = np.cov(mu_i, y_j, ddof=1)
                cov_mu_y = cov_mu_i_y_j[0, 1]**2
                var_mu = cov_mu_i_y_j[0, 0]
                var_y = cov_mu_i_y_j[1, 1]
                if var_mu > 1e-12:
                    score_matrix[i, j] = cov_mu_y * 1. / (var_mu * var_y)
                else:
                    score_matrix[i, j] = 0.
            else:
                # Attribute is considered discrete.
                mu_i_test = mus_test[i, :]
                y_j_test = ys_test[j, :]
                classifier = svm.LinearSVC(C=0.01, class_weight="balanced")
                classifier.fit(mu_i[:, np.newaxis], y_j)
                pred = classifier.predict(mu_i_test[:, np.newaxis])
                score_matrix[i, j] = np.mean(pred == y_j_test)
    return score_matrix

def compute_avg_diff_top_two(matrix):
    sorted_matrix = np.sort(matrix, axis=0)
    return np.mean(sorted_matrix[-1, :] - sorted_matrix[-2, :])

sap = compute_sap(mus_train, ys_train, mus_test, ys_test, continuous_factors=False)
sap

In [None]:
# IRS 


def compute_irs(mus, ys, diff_quantile=0.99):
    ys_discrete = make_discretizer(ys)

    active_mask = (mus.var(axis=1) > 0)
    active_mus = mus[active_mask, :]

    if active_mus.size == 0:
        irs_score = 0.0
    else:
        irs_score = scalable_disentanglement_score(ys_discrete.T, active_mus.T, diff_quantile)["avg_score"]

    score_dict = {}
    score_dict["IRS"] = irs_score
    score_dict["num_active_dims"] = int(np.sum(active_mask))
    return score_dict


def _drop_constant_dims(ys):
    """Returns a view of the matrix `ys` with dropped constant rows."""
    ys = np.asarray(ys)
    if ys.ndim != 2:
        raise ValueError("Expecting a matrix.")

    variances = ys.var(axis=1)
    active_mask = variances > 0.
    return ys[active_mask, :]


def scalable_disentanglement_score(gen_factors, latents, diff_quantile=0.99):
    """Computes IRS scores of a dataset.

    Assumes no noise in X and crossed generative factors (i.e. one sample per
    combination of gen_factors). Assumes each g_i is an equally probable
    realization of g_i and all g_i are independent.

    Args:
        gen_factors: Numpy array of shape (num samples, num generative factors),
            matrix of ground truth generative factors.
        latents: Numpy array of shape (num samples, num latent dimensions), matrix
            of latent variables.
        diff_quantile: Float value between 0 and 1 to decide what quantile of diffs
            to select (use 1.0 for the version in the paper).

    Returns:
        Dictionary with IRS scores.
    """
    num_gen = gen_factors.shape[1]
    num_lat = latents.shape[1]

    # Compute normalizer.
    max_deviations = np.max(np.abs(latents - latents.mean(axis=0)), axis=0)
    cum_deviations = np.zeros([num_lat, num_gen])
    for i in range(num_gen):
        unique_factors = np.unique(gen_factors[:, i], axis=0)
        assert unique_factors.ndim == 1
        num_distinct_factors = unique_factors.shape[0]
        for k in range(num_distinct_factors):
            # Compute E[Z | g_i].
            match = gen_factors[:, i] == unique_factors[k]
            e_loc = np.mean(latents[match, :], axis=0)

            # Difference of each value within that group of constant g_i to its mean.
            diffs = np.abs(latents[match, :] - e_loc)
            max_diffs = np.percentile(diffs, q=diff_quantile*100, axis=0)
            cum_deviations[:, i] += max_diffs
        cum_deviations[:, i] /= num_distinct_factors
    # Normalize value of each latent dimension with its maximal deviation.
    normalized_deviations = cum_deviations / max_deviations[:, np.newaxis]
    irs_matrix = 1.0 - normalized_deviations
    disentanglement_scores = irs_matrix.max(axis=1)
    if np.sum(max_deviations) > 0.0:
        avg_score = np.average(disentanglement_scores, weights=max_deviations)
    else:
        avg_score = np.mean(disentanglement_scores)

    parents = irs_matrix.argmax(axis=1)
    score_dict = {}
    score_dict["disentanglement_scores"] = disentanglement_scores
    score_dict["avg_score"] = avg_score
    score_dict["parents"] = parents
    score_dict["IRS_matrix"] = irs_matrix
    score_dict["max_deviations"] = max_deviations
    return score_dict



irs = compute_irs(mus_train, ys_train, diff_quantile=0.99)
irs