In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, explained_variance_score
from sklearn.model_selection import KFold
from sklearn.preprocessing import MinMaxScaler
from itertools import product, combinations
from scipy.sparse import coo_matrix
from scipy.stats import spearmanr, pearsonr
import networkx as nx
from utils_plot import *

In [None]:
# 1. Data Normalization
def preprocessing(gamma, M, ampl_threshold=0.1):

    targetnames = np.array(sorted(set(M.index) & set(gamma.index)))
    print("Genes in common :", len(targetnames))

    gamma = gamma.loc[targetnames]
                           
    # Assume gamma has a 'gene' column and expression columns only
    # Step 1: Assign replicate indices within each gene
    gamma['rep'] = gamma.groupby('gene').cumcount()

    # Step 2: Randomly permute replicates per gene
    gamma['perm_rep'] = gamma.groupby('gene')['rep'].transform(lambda x: np.random.permutation(len(x)))

    # Step 3: Split into two groups
    gamma_group1 = gamma[gamma['perm_rep'] < 5].copy()
    gamma_group2 = gamma[gamma['perm_rep'] >= 5].copy()

    # Step 4: Drop helper columns and compute group means
    expr_cols = gamma.columns.difference(['gene', 'rep', 'perm_rep'])

    gamma1 = gamma_group1.groupby('gene')[expr_cols].mean().to_numpy()
    gamma2 = gamma_group2.groupby('gene')[expr_cols].mean().to_numpy()
 
    ampl1 = (gamma1.max(axis=1)-gamma1.min(axis=1))/2
    ampl2 = (gamma2.max(axis=1)-gamma2.min(axis=1))/2
    ind = (ampl1 > ampl_threshold) & (ampl2 > ampl_threshold)
    gamma1, gamma2 = gamma1[ind,:], gamma2[ind,:]
    targetnames_filtered = targetnames[ind]

    M = M.loc[targetnames_filtered].to_numpy()
    
    # Identify TFs that are not present in any gene
    inactive_tfs = np.where(M.sum(axis=0) == 0)[0]
    print(f"Number of inactive TFs: {len(inactive_tfs)}")
    M = np.delete(M, inactive_tfs, axis=1)
    tf_names_filtered = np.delete(tf_names, inactive_tfs)
    
    print(f"Kept genes: {M.shape[0]} (ampl > {ampl_threshold})")
    gamma1_norm = gamma1 - np.mean(gamma1, axis=1, keepdims=True) - np.mean(gamma1, axis=0, keepdims=True) + np.mean(gamma1)
    gamma2_norm = gamma2 - np.mean(gamma2, axis=1, keepdims=True) - np.mean(gamma2, axis=0, keepdims=True) + np.mean(gamma2)
    #M_norm = M - np.mean(M, axis=0, keepdims=True) #We will optimize the sparse matrix, so we need to keep the absolute zero values.

    return gamma1_norm, gamma2_norm, M, targetnames_filtered, tf_names_filtered

In [None]:
#Model 1
def svd_regression_with_lambda_CV(gamma, M_BSM, lambdas, n_splits=5, seed=42):
    """
    Perform regression using SVD and select best regularization parameter (lambda)
    using k-fold Cross-Validation, following ISMARA approach.
    """
    # Get dimensions
    G, M = M_BSM.shape
    C = gamma.shape[1]
    
    # Perform SVD once
    U, s, VT = np.linalg.svd(M_BSM, full_matrices=False)
    
    # Initialize k-fold cross-validation
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=seed)
    
    # Metrics storage
    val_errors = np.zeros((len(lambdas), n_splits))
    val_explained_variances = np.zeros((len(lambdas), n_splits))
    train_explained_variances = np.zeros((len(lambdas), n_splits))
    
    # Cross-validation loop
    for fold, (train_idx, val_idx) in enumerate(kf.split(np.arange(G))):
        # Prepare matrices for this fold
        U_train = U[train_idx, :]
        gamma_train = gamma[train_idx, :]
        M_train = U_train.T @ gamma_train
        
        U_val = U[val_idx, :]
        gamma_val = gamma[val_idx, :]
        
        # Test each lambda value
        for i, lambd in enumerate(lambdas):
            # Calculate shrinkage factors
            shrink = s / (s**2 + lambd)
            
            # Get A_star for this lambda using training data
            A_star = VT.T @ (shrink[:, None] * M_train)
            
            # Make predictions
            R_train = M_BSM[train_idx, :] @ A_star
            R_val = M_BSM[val_idx, :] @ A_star
            
            # Calculate metrics
            val_errors[i, fold] = mean_squared_error(gamma_val.T, R_val.T)
            val_explained_variances[i, fold] = explained_variance_score(gamma_val, R_val)
            train_explained_variances[i, fold] = explained_variance_score(gamma_train, R_train)
    
    # Average metrics across folds
    mean_val_errors = np.mean(val_errors, axis=1)
    mean_val_explained_variances = np.mean(val_explained_variances, axis=1)
    mean_train_explained_variances = np.mean(train_explained_variances, axis=1)
    
    # Find optimal lambda
    best_lambda_idx = np.argmin(mean_val_errors)
    lambda_opt = lambdas[best_lambda_idx]
    
    # Train final model on all data using optimal lambda
    shrink_opt = s / (s**2 + lambda_opt)
    M_full = U.T @ gamma
    A_star = VT.T @ (shrink_opt[:, None] * M_full)

    # Plot
    fig, ax1 = plt.subplots(figsize=(4,3))

    color = 'tab:blue'
    ax1.set_xlabel('Lambda')
    ax1.set_ylabel('Validation MSE', color=color)
    ax1.plot(lambdas, mean_val_errors, color=color, label='Validation MSE')
    ax1.tick_params(axis='y', labelcolor=color)
    ax1.set_xscale('log')
    ax1.grid(True)

    ax2 = ax1.twinx()
    ax2.set_ylabel('Explained Variance (%)')

    color = 'tab:green'
    ax2.plot(lambdas, mean_val_explained_variances * 100, color='tab:green', linestyle='--', label='Validation EV')
    ax2.plot(lambdas, mean_train_explained_variances * 100, color='tab:orange', linestyle='--', label='Training EV')
    ax2.tick_params(axis='y')

    # Legends
    lines_1, labels_1 = ax1.get_legend_handles_labels()
    lines_2, labels_2 = ax2.get_legend_handles_labels()
    ax2.legend(lines_1 + lines_2, labels_1 + labels_2, loc='best')

    fig.suptitle('Cross-Validation: MSE and EV vs Lambda')
    fig.tight_layout()
    plt.show()

    return A_star, lambda_opt

In [None]:
# Model 2: OscilloTF
# 2 Define Ridge Regression Model with Trainable Sparse W
class TrainableModel(nn.Module):
    def __init__(self, M, gamma, A_split, num_tfs, num_thetas, lambdaW=0.01, lambdaA=0.01):
        super(TrainableModel, self).__init__()
        
        self.lambdaW = lambdaW  # L1 regularization for W
        self.lambdaA = lambdaA  # L2 regularization for A

        # Convert M to COO format
        sparse_matrix = coo_matrix(M)

        # Get the nonzero indices and values
        self.i = torch.tensor(sparse_matrix.row, dtype=torch.long)
        self.j = torch.tensor(sparse_matrix.col, dtype=torch.long)
        values = sparse_matrix.data

        # Create W as a trainable vector for the non-zero elements of M
        self.A = nn.Parameter(torch.tensor(A_split, dtype=torch.float32))
        self.W = nn.Parameter(torch.tensor(np.log(values), dtype=torch.float32)) #Transform for exponential optimisation M = exp(W) <=> W = log(N)
        
        self.num_genes, self.num_tfs = M.shape

    def forward(self):
        # Create a sparse tensor for W
        M_sparse = torch.sparse_coo_tensor(
            indices=torch.stack([self.i, self.j]), 
            values=torch.exp(self.W),
            size=(self.num_genes, self.num_tfs)
        )
        M_dense_tensor = M_sparse.to_dense()

        # Compute the reconstructed gamma matrix.
        return torch.matmul(M_dense_tensor, self.A)

    def loss(self, gamma_true):
        gamma_pred = self.forward()
        main_loss = torch.sum((gamma_true - gamma_pred) ** 2)
        l1_loss = torch.sum(torch.abs(self.W))  # L1 on W
        l2_loss = torch.sum(self.A ** 2)         # L2 on A
        
        #smoothness_loss = 10*torch.sum((self.x[:, 1:] - self.x[:, :-1])**2) #Avoid spikes in A
        #cyclic_loss = torch.sum((self.x[:, 0] - self.x[:, -1])**2) #Make activities more cyclic
        #slope_loss = torch.sum((self.x[:, 1] - self.x[:, 0] - (self.x[:, -1] - self.x[:, -2]))**2) #Make slopes matc

        total_loss = main_loss + self.lambdaW * l1_loss + self.lambdaA * l2_loss
        return total_loss

In [None]:
def train_model(M, gamma, gamma_test, lambdaW, lambdaA, A_split, tol=(0.04, 150), patience=20, num_epochs=400, lr=0.01):
    # require 4% improvement every 20 epochs
    num_genes, num_tfs = M.shape
    num_thetas = gamma.shape[1]
    
    model = TrainableModel(M, gamma, A_split, num_tfs, num_thetas, lambdaW, lambdaA)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    best_EV = -float("inf")
    best_test_loss = float("inf")
    patience_counter = 0
    
    train_loss_history = []
    test_loss_history = []
    test_EV_history = []

    for epoch in range(num_epochs):
        optimizer.zero_grad()
        loss = model.loss(gamma)
        loss.backward()
        optimizer.step()

       # Validation Step (keep computations in PyTorch)
        W_sparse_vector = model.W.detach()
        W_dense = torch.sparse_coo_tensor(
            indices=torch.stack([model.i, model.j]),
            values=torch.exp(W_sparse_vector),
            size=(num_genes, num_tfs)
        ).to_dense()
        A = model.A.detach()

        # Calculate R_test as a torch tensor
        R_test = torch.matmul(W_dense, A)

        # Calculate EV_test (convert tensors to NumPy for explained_variance_score)
        EV_test = explained_variance_score(gamma_test.numpy(), R_test.detach().numpy())

        # Calculate Test Loss in PyTorch
        main_loss = torch.sum((gamma_test - R_test) ** 2)
        l1_loss = torch.sum(torch.abs(W_sparse_vector))  # L1 on W
        l2_loss = torch.sum(A ** 2)                      # L2 on the unconstrained A
        total_test_loss = main_loss + lambdaW * l1_loss + lambdaA * l2_loss
        
        train_loss_history.append(loss.item())
        test_loss_history.append(total_test_loss)
        test_EV_history.append(EV_test)

        # Check for early stopping
        is_relative_loss_better = total_test_loss < best_test_loss * (1 - tol[0])
        is_absolute_loss_better = total_test_loss < best_test_loss - tol[1]
        if is_relative_loss_better and is_absolute_loss_better:
            best_EV = EV_test
            best_test_loss = total_test_loss
            patience_counter = 0  # Reset patience
        else:
            patience_counter += 1
        
        if epoch % 100 == 0:
            print(f"Epoch {epoch}, Loss: {loss.item():.0f}, Loss test: {total_test_loss:.0f}, EV_test: {EV_test*100:.2f}%")

        if patience_counter >= patience:
            if not is_relative_loss_better:
                print(f"Early stopping (relative={tol[0]}) at epoch {epoch+1}. Best Loss test: {best_test_loss:.0f}. Best EV_test: {best_EV*100:.2f}%")
            if not is_absolute_loss_better:
                print(f"Early stopping (absolute={tol[1]}) at epoch {epoch+1}. Best Loss test: {best_test_loss:.0f}. Best EV_test: {best_EV*100:.2f}%")
            break
    
    fig, ax1 = plt.subplots(figsize=(4, 3))
    ax1.set_xlabel('Epochs')
    ax1.set_ylabel('Loss value')
    ax1.plot(range(epoch+1), train_loss_history, color='tab:red', label="Train Loss")
    ax1.plot(range(epoch+1), test_loss_history, color='tab:green', label="Test Loss")
    ax1.tick_params(axis='y')
    ax1.grid(True)
    ax1.legend(loc='center right')

    ax2 = ax1.twinx()
    ax2.set_ylabel('EV Test', color='tab:blue')
    ax2.plot(range(epoch+1), test_EV_history, color='tab:blue', linestyle='--')
    ax2.tick_params(axis='y', labelcolor='tab:blue')

    fig.suptitle('Loss and EV Over Epochs')
    fig.tight_layout()
    plt.show()

    return W_dense.numpy(), A.numpy(), loss.item()#, total_test_loss

In [None]:
# 4. Cross-Validation for Lambda Optimization
def cross_val_lambda(M, gamma1, gamma2, lambdaW_values, lambdaA_values, A_split, A_split_2, tol=(0.04, 150), patience=20):
    best_lambdaW, best_lambdaA, best_EV = None, None, -np.inf
    losses1 = []
    losses2 = []
    EVs_avg = []

    for lambdaW, lambdaA in product(lambdaW_values, lambdaA_values):
        print(f"Testing lambdaW = {lambdaW:.2f}, lambdaA = {lambdaA:.2f}")

        # Train on gamma1, test on gamma2
        W1, A1, loss1 = train_model(M, gamma1, gamma2, lambdaW, lambdaA, A_split, tol, patience)
        losses1.append(loss1)
        R_test1 = W1 @ A1
        EV1 = explained_variance_score(gamma2, R_test1)

        # Train on gamma2, test on gamma1
        W2, A2, loss2 = train_model(M, gamma2, gamma1, lambdaW, lambdaA, A_split_2, tol, patience)
        losses2.append(loss2)
        R_test2 = W2 @ A2
        EV2 = explained_variance_score(gamma1, R_test2)

        avg_EV = (EV1 + EV2) / 2
        EVs_avg.append(avg_EV)
        print(f"lambdaW={lambdaW:.2f}, lambdaA={lambdaA:.2f}, EV={avg_EV*100:.2f}%\n")

        if avg_EV > best_EV:
            best_lambdaW, best_lambdaA, best_EV = lambdaW, lambdaA, avg_EV
            
    EV_surface = np.array(EVs_avg).reshape(len(lambdaW_values), len(lambdaA_values))
    LambdaW, LambdaA = np.meshgrid(lambdaW_values, lambdaA_values, indexing='ij')

    # Find optimal point
    best_idx = np.unravel_index(np.argmax(EV_surface), EV_surface.shape)
    opt_lambdaW = lambdaW_values[best_idx[0]]
    opt_lambdaA = lambdaA_values[best_idx[1]]

    # Plot
    plt.figure(figsize=(7, 5))
    cp = plt.contourf(LambdaW, LambdaA, EV_surface, levels=30, cmap='viridis')
    plt.colorbar(cp, label='Average Explained Variance')
    plt.xscale('log')
    plt.yscale('log')
    plt.xlabel('λ_W (W regularization)')
    plt.ylabel('λ_A (A regularization)')
    plt.title('EV Surface over λ_W, λ_A')
    plt.scatter([opt_lambdaW], [opt_lambdaA], color='red', label='Optimum')
    plt.legend()
    plt.grid(True, which='both', ls='--', lw=0.3)
    plt.tight_layout()
    plt.show()

    print(f"Best λ_W = {best_lambdaW:.4f}, Best λ_A = {best_lambdaA:.2f}, Best EV = {best_EV*100:.2f}%\n")
    return best_lambdaW, best_lambdaA

In [None]:
# 5. Cross train for best model
def cross_train(M, gamma1, gamma2, best_lambdaW, best_lambdaA, A_split, A_split_2, tol=(0.04, 150), patience=20):
    print("Training on α1, testing on α2...")
    W1, A1, loss1 = train_model(M, gamma1, gamma2, best_lambdaW, best_lambdaA, A_split, tol, patience)
    R_test1 = W1 @ A1
    EV1_train = explained_variance_score(gamma1.numpy(), R_test1)
    EV1_test = explained_variance_score(gamma2.numpy(), R_test1)

    print("Training on α2, testing on α1...")
    W2, A2, loss2 = train_model(M, gamma2, gamma1, best_lambdaW, best_lambdaA, A_split_2, tol, patience)
    R_test2 = W2 @ A2
    EV2_train = explained_variance_score(gamma2.numpy(), R_test2)
    EV2_test = explained_variance_score(gamma1.numpy(), R_test2)

    avg_EV_train = (EV1_train + EV2_train) / 2
    avg_EV_test = (EV1_test + EV2_test) / 2
    print(f"Average EV_train: {avg_EV_train*100:.2f}%")
    print(f"Average EV_test: {avg_EV_test*100:.2f}%")
    
    return W1, A1, W2, A2

In [None]:
def standardize_amplitudes(matrices_from, matrices_to = False, target_amp=0.2):
    """
    Rescale the matrices to a single target amplitude.
    """
    standardized_matrices_from = []
    standardized_matrices_to = []
    for i in range(len(matrices_from)):
        amp = (np.max(matrices_from[i], axis=1) - np.min(matrices_from[i], axis=1)) / 2
        scale = target_amp / amp
        standardized_matrix_from = matrices_from[i] * scale[:, np.newaxis] #TF x theta
        standardized_matrices_from.append(standardized_matrix_from)
        if (matrices_to != False):
            standardized_matrix_to = matrices_to[i] / scale[np.newaxis, :] #genes x TF
            standardized_matrices_to.append(standardized_matrix_to)
    
    return standardized_matrices_from, standardized_matrices_to

In [None]:
# Load Data & Run
fileBSM = 'ENCORI_mm10_RBPTarget_matrix.txt' # text file with the binding sites matrix that infrom the interactions BP -> Target
fileGamma = 'bootstrap_gamma_scrna_100_1_1.csv'
process = ["degradation", "\u03B3"]
theta_smooth = np.round(np.linspace(0.01, 1.00, 100), 2)  # 100 bins from 0.01 to 1.00
ampl_threshold=0.1

# Define Lambda Values
lambdaW_values = np.logspace(-3, -2, 5)
lambdaA_values = np.logspace(1, 3, 15)
#best_lambdaW, best_lambdaA = 0, 0
best_lambdaW, best_lambdaA = 0.0056, 372.76

M = pd.read_csv(fileBSM, sep="\t",index_col=0)
M = M.drop(M.columns[-1], axis=1)
tf_names = M.columns
gamma = pd.read_csv(fileGamma, sep=",",index_col=0)

#Select common genes and normalize
print(gamma.shape, M.shape)
gamma1_norm, gamma2_norm, M_norm, targetnames, tf_names = preprocessing(gamma, M, ampl_threshold=ampl_threshold)
print(gamma1_norm.shape, gamma2_norm.shape, M_norm.shape, "\n")

n_runs = 5
A1_list = []
W1_list = []
A2_list = []
W2_list = []
A1_split_list = []
A2_split_list = []

for seed in np.random.randint(1,100, n_runs):
    torch.manual_seed(seed) #useless now that init is not random anymore
    np.random.seed(seed)
    lambdas = np.logspace(0, 6, 40)
    A_split, lambda_opt1 = svd_regression_with_lambda_CV(gamma1_norm, M_norm, lambdas, seed=seed)
    A_split_2, lambda_opt2 = svd_regression_with_lambda_CV(gamma2_norm, M_norm, lambdas, seed=seed)
    A1_split_list.append(A_split)
    A2_split_list.append(A_split_2)
    #print("Best lambda:", lambda_opt1)
    #print("Best lambda:", lambda_opt2)

    M_tensor = torch.tensor(M_norm, dtype=torch.float32)  # (genes, TFs)
    gamma1_tensor = torch.tensor(gamma1_norm, dtype=torch.float32)  # (genes, thetas)
    gamma2_tensor = torch.tensor(gamma2_norm, dtype=torch.float32)  # (genes, thetas)

    tol = (0.05, 100) #(0.07, 800) for ampl 0.2  #np.linspace(0.01, 0.07, 7)
    patience = 20 #np.linspace(5, 30, 7)
    # Optimize Lambda
    if (best_lambdaW == 0):
        best_lambdaW, best_lambdaA = cross_val_lambda(M_tensor, gamma1_tensor, gamma2_tensor, lambdaW_values, lambdaA_values, A_split, A_split_2, tol=(0.04, 150), patience=20)
    # Train and Cross-Test
    print("Seed :", seed, "Tolerance :", tol, "Patience :", patience)
    W1, A1, W2, A2 = cross_train(M_tensor, gamma1_tensor, gamma2_tensor, best_lambdaW, best_lambdaA, A_split, A_split_2, tol, patience)
    A1_list.append(A1)
    W1_list.append(W1)
    A2_list.append(A2)
    W2_list.append(W2)

In [None]:
explv = []
for A_spliit in A1_split_list:
    R_split = M_norm @ A_spliit
    explv.append(explained_variance_score(gamma2_norm, R_split))
    #print((explained_variance_score(alpha2_norm, R_split)))
for A_spliit in A2_split_list:
    R_split = M_norm @ A_spliit
    explv.append(explained_variance_score(gamma1_norm, R_split))
    #print(explained_variance_score(alpha1_norm, R_split))
explv = np.array(explv)
print(np.mean(explv)*100)
print((np.max(explv)-np.min(explv))/2*100)

explv = []
for i in range(len(A1_list)):
    R_split = W1_list[i] @ A1_list[i]
    explv.append(explained_variance_score(gamma2_norm, R_split))
    #print((explained_variance_score(alpha2_norm, R_split)))
for i in range(len(A2_list)):
    R_split = W2_list[i] @ A2_list[i]
    explv.append(explained_variance_score(gamma1_norm, R_split))
    #print((explained_variance_score(alpha2_norm, R_split)))
explv = np.array(explv)
print(np.mean(explv)*100)
print((np.max(explv)-np.min(explv))/2*100)

In [None]:
R_split = M_norm @ A_split
explained_variance_score(gamma1_norm, R_split)

In [None]:
n_tfs = A1_list[0].shape[0]
n_genes = W1_list[0].shape[0]
A_list = A1_list
W_list = W1_list
A_split_list = A1_split_list

heatmap_vals = np.zeros((4, n_runs+1, n_runs+1)) #A_split  A1_1  A1_2  A1_3  A1_4  A1_5
heatmap_labels = ['Model 1']
for l in range(n_runs):
    heatmap_labels.append("Rep "+str(l+1))

for k in range(n_runs+1): #Diagonal to 1
    heatmap_vals[0][k, k] = 1.0
    heatmap_vals[1][k, k] = 1.0
    heatmap_vals[2][k, k] = 1.0
    heatmap_vals[3][k, k] = 1.0

for i, j in combinations(range(n_runs), 2): #Corr of A1 between runs
    A1_i, A1_j = A_list[i], A_list[j]
    W1_i, W1_j = W_list[i], W_list[j]

    # Per TF correlation of activities
    A_corrs = np.zeros(n_tfs)
    for m in range(n_tfs):
        A_corrs[m], _ = pearsonr(A1_i[m, :], A1_j[m, :])

    # Per gene correlation of weights
    W_corrs = np.zeros(n_genes)
    for g in range(n_genes):
        W_corrs[g], _ = pearsonr(W1_i[g, :], W1_j[g, :])
        
    heatmap_vals[0][i+1, j+1] = heatmap_vals[0][j+1, i+1] = round(np.mean(A_corrs), 3)
    heatmap_vals[1][i+1, j+1] = heatmap_vals[1][j+1, i+1] = round(np.median(A_corrs), 3)
    heatmap_vals[2][i+1, j+1] = heatmap_vals[2][j+1, i+1] = round(np.mean(W_corrs), 3)
    heatmap_vals[3][i+1, j+1] = heatmap_vals[3][j+1, i+1] = round(np.median(W_corrs), 3)
    
for l in range(n_runs): #A_split corr with corresponding A after run
    A_split, A1 = A_split_list[l], A_list[l]
    W1 = W_list[l]

    # Per TF correlation of activities
    A_corrs = np.zeros(n_tfs)
    for m in range(n_tfs):
        A_corrs[m], _ = pearsonr(A_split[m, :], A1[m, :])

    # Per gene correlation of weights
    W_corrs = np.zeros(n_genes)
    for g in range(n_genes):
        W_corrs[g], _ = pearsonr(M_norm[g, :], W1[g, :])
        
    heatmap_vals[0][l+1, 0] = heatmap_vals[0][0, l+1] = round(np.mean(A_corrs), 3)
    heatmap_vals[1][l+1, 0] = heatmap_vals[1][0, l+1] = round(np.median(A_corrs), 3)
    heatmap_vals[2][l+1, 0] = heatmap_vals[2][0, l+1] = round(np.mean(W_corrs), 3)
    heatmap_vals[3][l+1, 0] = heatmap_vals[3][0, l+1] = round(np.median(W_corrs), 3)

for i, title in enumerate(["Mean A Corr", "Median A Corr", "Mean W Corr", "Median W Corr"]):
    plt.figure(figsize=(8, 6))
    plt.title(title)
    ax = sns.heatmap(heatmap_vals[i], cbar=True, annot=True, fmt=".2f", vmin=0, vmax=1, yticklabels=heatmap_labels,xticklabels=heatmap_labels)
    plt.show()

In [None]:
#Zfp36, Zfp36l1, Elavl1, Upf1, Pabpc1, Ythdf2, Alkbh5
BP_nb = np.where(tf_names == 'Zfp36l1')[0][0]
plot_binding_protein_activity(tf_names, A_split, process, theta_smooth, BP_nb=BP_nb)

In [None]:
plt.rcParams.update({
    'font.size': 13,          # base font size
    'axes.labelsize': 13,     # axis label font size
    'axes.titlesize': 11,     
    'xtick.labelsize': 9,
    'ytick.labelsize': 11,
})
plt.figure(figsize=(5,5))
# Compute TF importance: sum of absolute weights per TF
A1_list_standardized, W1_list_standardized = standardize_amplitudes(A1_list, W1_list, target_amp=0.2)
print("tol=", tol, "ampl=", ampl_threshold, "Patience=", patience)
W = W1_list_standardized[0]
W_key_TF(W, tf_names, top_k=20)

In [None]:
def get_tf_targets(W, tf_names, gene_names, tf_query):
    # Find TF index
    tf_idx = np.where(tf_names == tf_query)[0][0]
    W_tf = W[:, tf_idx]

    # Filter out zero weights
    nonzero_idx = np.where(W_tf != 0)[0]
    W_tf_nonzero = W_tf[nonzero_idx]
    gene_names_nonzero = [gene_names[i] for i in nonzero_idx]

    # Sort by absolute weight (descending)
    sorted_idx = np.argsort(W_tf_nonzero)[::-1]
    top_gene_names = [gene_names_nonzero[i] for i in sorted_idx]
    weights = [W_tf_nonzero[i] for i in sorted_idx]
    print(str(len(weights))+" targets for "+tf_query)

    # Build dictionary
    targets_dic = dict(zip(top_gene_names, weights))
    return targets_dic

targets_dic = get_tf_targets(W, tf_names, targetnames, tf_query="Ptbp1")
#targets_dic

In [None]:
np.sort(W1_list_standardized[0][W1_list_standardized[0] > 0].flatten())

In [None]:
A1, A2 = A1_list[0], A1_list[1]
residuals = A1 - A2
std_residuals = residuals.std(axis=1)

# Compute per-TF Pearson correlations
n_tfs = A1.shape[0]
pearson_rs = np.array([pearsonr(A1[i, :], A2[i, :])[0] for i in range(n_tfs)])

# Plot histogram of std residuals => check for biases
plt.figure(figsize=(6, 4))
plt.hist(std_residuals, bins=30, color='steelblue', alpha=0.8)
plt.xlabel("Std of Residuals (A1 - A2)")
plt.ylabel("Number of TFs")
plt.title("Distribution of Std Residuals per TF")
plt.grid(True)
plt.tight_layout()
plt.show()

# Scatter plot: Pearson r vs Residual std => identify shape and/or amplitude disagreements
plt.figure(figsize=(6, 5))
plt.scatter(pearson_rs, std_residuals, alpha=0.8, edgecolor='k')
plt.xlabel("Pearson Correlation (A1 vs A2)")
plt.ylabel("Std of Residuals (A1 - A2)")
plt.title("Per-TF Reproducibility: Shape vs Magnitude")
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
A1, A2 = A1_list[0], A1_list[1]
W1, W2 = W1_list[0], W1_list[1]
print(np.sort(W1.flatten()))
print(np.sort(A1.flatten()))

In [None]:
W1

In [None]:
#We smooth activities
#A1 = fourier_fit(A1, theta_smooth)
#A2 = fourier_fit(A2, theta_smooth)
R1 = W1 @ A1
R2 = W2 @ A2

In [None]:
plt.rcParams.update({
    'font.size': 22,          # base font size
    'axes.labelsize': 20,     # axis label font size
    'axes.titlesize': 20,     
    'xtick.labelsize': 18,
    'ytick.labelsize': 18,
})

In [None]:
expl = []
for k in range(gamma1_norm.shape[0]):
    expl.append(explained_variance_score(gamma2_norm[k,:], R1[k,:]))
expl = np.array(expl)
expl_sorted = np.sort(expl)[:]
plt.hist(expl_sorted, bins=170)
plt.xlabel("Explained variance")
plt.xlim(-2,1)
plt.ylabel("Number of genes")
plt.grid(True)
plt.title("Distribution of gene-wise EVs for degradation")
plt.show()
print(explained_variance_score(gamma2_norm, R1))
np.mean(expl)

In [None]:
BP_nb = np.where(tf_names == 'Ptbp1')[0][0]
#BP_nb = 37
plot_binding_protein_activity(tf_names, A1, process, theta_smooth, BP_nb=BP_nb)
print(f"Positive W1 among target genes of {tf_names[BP_nb]} : {np.sum(W1[:, BP_nb] > 0)}/{np.sum(W1[:, BP_nb] != 0)} ({np.sum(W1[:, BP_nb] > 0)/np.sum(W1[:, BP_nb] != 0)*100:.2f}%)")

In [None]:
n = np.where(targetnames == 'Nusap1')[0][0]
#n = 4972
print("Train")
plot_rate_comparison(targetnames, gamma1_norm, R1, process, theta_smooth, target_nb=n)
print("Test")
plot_rate_comparison(targetnames, gamma2_norm, R1, process, theta_smooth, target_nb=n)

In [None]:
compute_reproducibility(A1, A2, gamma1_norm, gamma2_norm, metric="TF activities")
compute_reproducibility(W1, W2, gamma1_norm, gamma2_norm, metric="W site counts")
#compute_reproducibility(R1, R2, gamma1_norm, gamma2_norm, metric="Reconstruction")

In [None]:
plt.rcParams.update({
    'font.size': 20,          # base font size
    'axes.labelsize': 18,     # axis label font size
    'axes.titlesize': 18,     
    'xtick.labelsize': 16,
    'ytick.labelsize': 16,
})
A1 = fourier_fit(A1, theta_smooth, num_harmonics=3)
plot_heatmap(A1, cmap='RdBu_r', title=" RBPs activities on degradation across the cell cycle")

In [None]:
#Heatmap of BPs activity along cell cycle (Export)
key_tfs = ["Smad3", "Hbp1", "E2f1", "E2f2_E2f5", "E2f3", "E2f4", "E2f6", "E2f7", "E2f8", "Sp1", "Hes1", "Elf3", "Tfap4"]
tf_displayed = plot_heatmap_list(A1, tf_names, key_tfs, clip=True)
print(tf_displayed)

In [None]:
expected_activity = {
    "Smad3": {"ranges": [(0.1, 0.4)], "inhibitory": False}, #True?
    "Hbp1": {"ranges": [(0.1, 0.4)], "inhibitory": True},
    "E2f1": {"ranges": [(0.1, 0.4)], "inhibitory": False},
    "E2f2_E2f5": {"ranges": [(0.1, 0.4)], "inhibitory": False},
    "E2f3": {"ranges": [(0.1, 0.4)], "inhibitory": False},
    "E2f4": {"ranges": [(0.01, 0.25)], "inhibitory": True}, #Not 100% confident
    "E2f6": {"ranges": [(0.01, 0.25), (0.63, 0.9)], "inhibitory": True}, #Not 100% confident
    "E2f7": {"ranges": [(0.4, 0.9)], "inhibitory": True}, #Not 100% confident
    "E2f8": {"ranges": [(0.4, 0.9)], "inhibitory": True}, #Not 100% confident
    "Sp1": {"ranges": [(0.1, 0.63)], "inhibitory": False}, #Can be both?
    "Hes1": {"ranges": [(0.1, 0.4)], "inhibitory": True},
    #"Elf3": {"ranges": [(0.25, 1)], "inhibitory": False},
    #"Tfap4": {"ranges": [(0.63, 0.9)], "inhibitory": False}
}

In [None]:
#### TF EXPRESSION AND BIOLOGICAL MEANING ####

In [None]:
key_tfs = ["Smad3", "Hbp1", "E2f1", "E2f2", "E2f3", "E2f4", "E2f5", "E2f6", "E2f7", "E2f8", "Sp1", "Hes1"]

tf_names_filtered = np.array([tf for tf in key_tfs if tf in gamma.index and tf in M.columns])
print("TFs in common :", str(len(tf_names_filtered))+"/"+str(len(key_tfs)))

gamma_f = gamma.loc[tf_names_filtered]
gamma_n = gamma_f.to_numpy()

#Standardize amplitudes
#A_standard = A_standard - np.mean(A_standard, axis=1, keepdims=True)
gamma_n_norm = gamma_n - np.mean(gamma_n, axis=1, keepdims=True) - np.mean(gamma_n, axis=0, keepdims=True) + np.mean(gamma_n)
matrices_from, _ = standardize_amplitudes([gamma_n_norm, A1])
gamma_n_norm, A_standard = matrices_from

In [None]:
corrs = []
z_vals = []
for tf in range(len(tf_names_filtered)):
    plot_TF_exp_activity(theta_smooth, gamma_n_norm, A_standard, tf_names, tf_names_filtered, tf)
    corr = spearmanr(gamma_n_norm[tf], A_standard[list(tf_names).index(tf_names_filtered[tf])])[0]
    action = "activator" if not expected_activity[tf_names_filtered[tf]]["inhibitory"] else "inhibitor"
    if (action == "inhibitor"):
        corr = -corr
    print(f"scRNA & A correlation : {corr:.3f} ({ action })\n")
    z_val = compute_tf_activity_difference(A_standard[list(tf_names).index(tf_names_filtered[tf]), :], theta_smooth, expected_activity[tf_names_filtered[tf]]["ranges"], expected_activity[tf_names_filtered[tf]]["inhibitory"])
    corrs.append(corr)
    z_vals.append(z_val)
    print(f"Expected activity range : {expected_activity[tf_names_filtered[tf]]["ranges"]}")
    print(f"TF activity biological z-score : {z_val:.2f} ({ action })")
print(f"\nGlobal correlation :{np.mean(corrs):.3f}")
print(f"Global z-score :{np.mean(z_vals):.2f}")