In [4]:
import os
import numpy as np
import pandas as pd
import random

In [5]:
import numpy as np

def toep(d, rate):
    return np.array([[(rate) ** abs(i - j) for i in range(d)] for j in range(d)])


def generate_X(d, corr_rate, n):
    """Generate Gaussian feature matrix X with Toeplitz covariance."""

    cov = toep(d, corr_rate)
    X = np.random.multivariate_normal(np.zeros(d), cov, size=n)
    return X

def compute_centricity_score(probs):
    """
    Compute a more robust centricity score that better differentiates between distributions.
    Returns a score between 0 and 1:
    - Score near 0: probabilities concentrated near 0 and 1
    - Score near 1: probabilities concentrated near 0.5
    """
    # Calculate distances from 0.5
    distances = np.abs(probs - 0.5)
    
    # Use a modified beta function that gives more weight to extreme values
    beta_weight = np.power(1 - 2 * distances, 2)
    
    # Add penalty for asymmetric distributions
    symmetry_penalty = np.abs(np.mean(probs > 0.5) - 0.5)
    
    # Combine scores with appropriate scaling
    score = np.mean(beta_weight) * (1 - symmetry_penalty)
    
    return score

def adjust_beta(X, target_score=0.5, score_threshold=0.001, max_iter=200, 
                learning_rate=0.1, patience=20):
    """
    Adjust beta to achieve a desired distribution of probabilities p(y=1).
    Uses a more robust optimization approach with adaptive steps.
    
    Parameters:
    - X: Feature matrix (n x d)
    - target_score: Desired centricity score (0 to 1)
    - score_threshold: Threshold for stopping when close enough to target
    - max_iter: Maximum iterations for adjustment
    - learning_rate: Initial learning rate for beta adjustments
    - patience: Number of iterations to wait before adjusting strategy
    
    Returns:
    - beta: Adjusted coefficients
    - score_history: History of scores during adjustment
    """
    n, d = X.shape
    
    # Initialize beta based on target score
    # Use smaller initial values for more extreme targets
    initial_scale = np.exp(-2 * np.abs(target_score - 0.5))
    beta = np.random.normal(0, initial_scale, size=d)
    
    score_history = []
    best_score_diff = float('inf')
    best_beta = beta.copy()
    patience_counter = 0
    
    for iteration in range(max_iter):
        # Compute probabilities
        logits = X @ beta
        probs = 1 / (1 + np.exp(-logits))
        
        # Compute score
        score = compute_centricity_score(probs)
        score_history.append(score)
        
        # Check if we're close enough to target
        score_diff = np.abs(score - target_score)
        if score_diff < score_threshold:
            break
            
        # Update best solution if we've improved
        if score_diff < best_score_diff:
            best_score_diff = score_diff
            best_beta = beta.copy()
            patience_counter = 0
        else:
            patience_counter += 1
        
        # Adaptive adjustment strategy
        if patience_counter >= patience:
            # Reset to best known solution and reduce learning rate
            beta = best_beta.copy()
            learning_rate *= 0.5
            patience_counter = 0
            
        # Compute gradient-like direction
        if score < target_score:
            # Need more centered probabilities
            beta *= (1 - learning_rate)
        else:
            # Need more extreme probabilities
            beta *= (1 + learning_rate)
            
        # Add small random perturbation to avoid local optima
        beta += np.random.normal(0, learning_rate * 0.01, size=d)
    
    # Return the best beta found
    return best_beta, score_history

def validate_distribution(X, beta, target_score, title="Probability Distribution"):
    """
    Validate and visualize the resulting probability distribution.
    
    Parameters:
    - X: Feature matrix
    - beta: Coefficient vector
    - target_score: Target centricity score
    - title: Plot title
    """
    import matplotlib.pyplot as plt
    
    # Compute probabilities
    logits = X @ beta
    probs = 1 / (1 + np.exp(-logits))
    actual_score = compute_centricity_score(probs)
    
    # Create histogram
    plt.figure(figsize=(10, 6))
    plt.hist(probs, bins=40, density=True)
    plt.title(f"{title}\nTarget Score: {target_score:.3f}, Actual Score: {actual_score:.3f}")
    plt.xlabel("Probability")
    plt.ylabel("Density")
    plt.grid(True, alpha=0.3)
    plt.show()
    
    return actual_score

In [6]:

# Define the percentages of missingness and number of replicates
experiment_name = "ExpB"
experiment_data_folder = os.path.join("data", experiment_name)

# Create necessary directories
for subdir in ["original_data", "test_data", "pred_data", "bayes_data"]:
    path = os.path.join(experiment_data_folder, subdir)
    if not os.path.exists(path):
        os.makedirs(path)

# Modified parameters
missingness_percentages = [0.35]
n_replicates = 3
Ds = [5]
Corrs = [0.33]
CentricityScores = [0.20, 0.35, 0.85]  # Replaced PropOfY1 with CentricityScores
n_train = 10000
n_test = 15000
n = n_train + n_test

print("# of setups = ", n_replicates * len(missingness_percentages) * len(Corrs) * len(CentricityScores))

N_MC = 100

# Modified set-up dataframe
df_set_up = pd.DataFrame({
    "rep": [],
    "n": [],
    "true_beta": [],
    "center_X": [],
    "set_up": [],
    "d": [],
    "corr": [],
    "prcNA": [],
    "centricity": [],
}).T

np.random.seed(1)
random.seed(1)

def generate_mask(n, d, prc):
    """Generate missing data mask."""
    return np.random.binomial(n=1, p=prc, size=(n, d))

def sigma(x):
    """Sigmoid function."""
    return 1 / (1 + np.exp(-x))

for rep in range(n_replicates):
    print("REP", rep)
    for d in Ds:
        for corr in Corrs:
            corr_str = str(corr).replace(".", "")
            
            for centricity in CentricityScores:
                centricity_str = str(centricity).replace(".", "")

                # Generate X using the provided function
                X_full = generate_X(d=d, corr_rate=corr, n=n)
                center_X = np.zeros(d)  # Since X is already centered in generate_X

                # Adjust beta to achieve desired centricity score
                beta0, score_history = adjust_beta(X_full, target_score=centricity)
                
                # Generate probabilities and outcomes
                y_probs = sigma(X_full @ beta0)
                y = np.random.binomial(n=1, p=y_probs)
                final_centricity = 4 * np.mean(y_probs * (1 - y_probs))

                for prc in missingness_percentages:
                    prc_str = str(prc).replace(".", "")
                    set_up = f"LOG_n{n}_d{d}_corr{corr_str}_prcNA{prc_str}_cent{centricity_str}_rep{rep}"

                    M = generate_mask(n, d, prc)
                    
                    X_obs = X_full.copy()
                    X_obs[M == 1] = np.nan

                    new_row = pd.Series({
                        "rep": rep,
                        "n": n,
                        "d": d,
                        "corr": corr,
                        "prcNA": prc,
                        "centricity": centricity,
                        "true_beta": beta0,
                        "center_X": center_X,
                        "set_up": set_up
                    })

                    df_set_up = pd.concat([df_set_up, new_row], axis=1, ignore_index=True)

                    # Save original data
                    data_to_save = {
                        "X_obs": X_obs,
                        "M": M,
                        "y": y,
                        "y_probs": y_probs,
                        "X_full": X_full
                    }
                    np.savez(os.path.join(experiment_data_folder, "original_data", f"{set_up}.npz"), **data_to_save)

                    # Save test data
                    X_test = X_obs[n_train:]
                    y_test = y[n_train:]
                    y_probs_test = y_probs[n_train:]
                    M_test = M[n_train:]
                    data_to_save_test = {
                        "X_obs": X_test,
                        "M": M_test,
                        "y": y_test,
                        "y_probs": y_probs_test,
                        "X_full": X_full[n_train:]
                    }
                    np.savez(os.path.join(experiment_data_folder, "test_data", f"{set_up}.npz"), **data_to_save_test)

                    # Calculate and save Bayes predictions
                    def toep(d, rho):
                        """Generate Toeplitz correlation matrix."""
                        return np.fromfunction(lambda i, j: rho ** np.abs(i - j), (d, d))

                    def get_y_prob_bayes(X_test, full_mu, full_cov, true_beta, n_mc):
                        """Calculate Bayesian predictions."""
                        n, d = X_test.shape
                        y_probs = np.zeros((n, n_mc))
                        
                        for i in range(n):
                            miss_idx = np.isnan(X_test[i])
                            if np.any(miss_idx):
                                X_samples = np.random.multivariate_normal(
                                    full_mu, full_cov, size=n_mc
                                )
                                X_samples[:, ~miss_idx] = X_test[i, ~miss_idx]
                                y_probs[i] = sigma(X_samples @ true_beta)
                            else:
                                y_probs[i] = sigma(X_test[i] @ true_beta)
                        
                        return y_probs

                    y_probs_bayes = get_y_prob_bayes(X_test, full_mu=center_X, 
                                                    full_cov=toep(d, corr), 
                                                    true_beta=beta0, n_mc=N_MC)
                    y_probs_bayes = y_probs_bayes.mean(axis=1)

                    data_to_save_bayes = {
                        "y_probs_bayes": y_probs_bayes
                    }
                    np.savez(os.path.join(experiment_data_folder, "bayes_data", f"{set_up}.npz"), **data_to_save_bayes)

# Save setup dataframe
df_set_up.T.to_csv(os.path.join(experiment_data_folder, "set_up.csv"), index=False)

# of setups =  9
REP 0
REP 1
REP 2
