In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from tqdm import trange
from scipy.stats import norm
from loguru import logger

import os

In [2]:
class Config(object):
    def __init__(self):
        self.seed = 2024
        self.n = 1000        # Number of labeled samples
        self.p = 10          # Number of features
        self.alpha = 0.05    # Significance level
        self.n_iter = 100
        self.savedir = 'results'
        self.verbose = True

        if not os.path.exists(self.savedir):
            os.makedirs(self.savedir)

In [3]:
def generate_data(n, m, p):
    X_labeled_ols = np.random.randn(n, p)
    X_labeled_rest = np.random.randn(n, p)
    X_unlabeled = np.random.randn(m, p)
    beta = np.random.randn(p)
    assert p >= 3, "p must be greater than or equal to 3"
    # Make first feature irrelevant
    beta[0] = 0
    # The second and third features are correlated with target
    beta[1] = 0.05
    beta[2] = 0.1

    # Fix variance of noise as 1
    y_ols = X_labeled_ols @ beta + np.random.randn(n)
    y_rest = X_labeled_rest @ beta + np.random.randn(n)

    return X_labeled_ols, X_labeled_rest, X_unlabeled, y_ols, y_rest, beta

In [4]:
def generate_orthogonal_design(n, m, p):
    # Generate random design matrices
    X_labeled_ols = np.random.randn(n, p)
    X_labeled_rest = np.random.randn(n, p)
    X_unlabeled = np.random.randn(m, p)
    
    # Generate orthogonal design matrices
    X_labeled_ols_orth = np.linalg.qr(X_labeled_ols)[0]
    X_labeled_rest_orth = np.linalg.qr(X_labeled_rest)[0]
    X_unlabeled_orth = np.linalg.qr(X_unlabeled)[0]
    # Generate random coefficients
    beta = np.random.randn(p)
    assert p >= 3, "p must be greater than or equal to 3"
    # Make first feature irrelevant
    beta[0] = 0
    # The second and third features are correlated with target
    beta[1] = 0.05
    beta[2] = 0.1

    # Fix variance of noise as 1
    y_ols = X_labeled_ols_orth @ beta + np.random.randn(n)
    y_rest = X_labeled_rest_orth @ beta + np.random.randn(n)

    return X_labeled_ols_orth, X_labeled_rest_orth, X_unlabeled_orth, y_ols, y_rest, beta

In [5]:
def fit_ols(X, y):
    beta_ols = np.linalg.inv(X.T @ X) @ X.T @ y
    return beta_ols

In [6]:
def hypothesis_testing(X, y, beta_ols, alpha):
    # Since we fixed the variance as known
    se = np.sqrt(np.diag(np.linalg.inv(X.T @ X)))
    z_stat = beta_ols / se
    p_values = 2 * (1 - norm.cdf(np.abs(z_stat)))
    return p_values

In [7]:
def run_simulation(config):
    np.random.seed(config.seed)
    n = config.n
    p = config.p
    alpha = config.alpha
    m_var = [100, 500, 1000, 1500, 2000, 2500, 3000]  # varying number of unlabeled samples
    
    results = pd.DataFrame()
    for m in m_var:
        print(f"Running simulation for m = {m}")
        type_I_error_feat_0 = np.zeros(config.n_iter)
        power_feat_1 = np.zeros(config.n_iter)
        power_feat_2 = np.zeros(config.n_iter)
        for i in trange(config.n_iter):    
            X_labeled_ols, X_labeled_rest, X_unlabeled, y_ols, y_rest = generate_data(n, m, p)
            # X_labeled_ols, y_ols for fitting OLS model
            # X_labeled_rest, y_rest combined with X_unlabeled for hypothesis testing

            # Fit OLS model        
            beta_ols = fit_ols(X_labeled_ols, y_ols)

            # Predict on unlabeled data
            y_unlabeled_pred = X_unlabeled @ beta_ols

            # Combine labeled and unlabeled data
            X = np.vstack([X_labeled_rest, X_unlabeled])
            y = np.hstack([y_rest, y_unlabeled_pred])

            # Fit new model on combined data and perform hypothesis testing
            beta_ols_combined = fit_ols(X, y)
            p_values = hypothesis_testing(X, y, beta_ols_combined, alpha)

            # Check if null hypothesis is rejected for first 3 feature
            # First feature is irrelevant, second and third are correlated with target
            type_I_error_feat_0[i] = p_values[0] < alpha
            power_feat_1[i] = p_values[1] < alpha
            power_feat_2[i] = p_values[2] < alpha

        # Save average type I error and power
        type_I_error_feat_0 = np.mean(type_I_error_feat_0)
        power_feat_1 = np.mean(power_feat_1)
        power_feat_2 = np.mean(power_feat_2)
        logger.info(f"m = {m}, Type I error (feature 0) = {type_I_error_feat_0}, Power (feature 1) = {power_feat_1}, Power (feature 2) = {power_feat_2}")
        results[m] = {
            'type_I_error_feat_0': type_I_error_feat_0,
            'power_feat_1': power_feat_1,
            'power_feat_2': power_feat_2
        }

    results.to_csv(os.path.join(config.savedir, 'results.csv'))
    return results

In [8]:
def plot_results(result, save_path):
    """
    Plot type I error and power for different features.
    
    Args:
        result (pd.DataFrame): DataFrame containing the results.
        save_path (str): Path to save the plot.
    """
    type_I_error_feat_0 = result.loc['type_I_error_feat_0']
    power_feat_1 = result.loc['power_feat_1']
    power_feat_2 = result.loc['power_feat_2']
    
    # Plot results
    plt.figure(figsize=(10, 6))
    sns.set_style('whitegrid')
    sns.color_palette("hls", 8)
    plt.plot(result.columns, type_I_error_feat_0, marker='o')
    plt.plot(result.columns, power_feat_1, marker='o')
    plt.plot(result.columns, power_feat_2, marker='o')
    plt.axhline(0.05, color='r', linestyle='--')
    plt.xlabel('Number of unlabeled samples, fixed n=1000')
    plt.ylabel('Probability')
    plt.title('Type I error and power')
    plt.legend(['Type I error (Feature 0)', 'Power (Feature 1)', 'Power (Feature 2)'])
    
    # Save the plot
    plt.savefig(save_path, format='pdf', bbox_inches='tight')
    plt.close()

save_path = os.path.join(Config().savedir, 'results.pdf')

In [37]:
result = run_simulation(Config())

# Plot results
plot_results(result, save_path)

Running simulation for m = 100


  0%|          | 0/100 [00:00<?, ?it/s]

100%|██████████| 100/100 [00:00<00:00, 1380.96it/s]
[32m2024-06-20 17:13:46.020[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_simulation[0m:[36m45[0m - [1mm = 100, Type I error (feature 0) = 0.05, Power (feature 1) = 0.39, Power (feature 2) = 0.94[0m


Running simulation for m = 500


100%|██████████| 100/100 [00:00<00:00, 1832.97it/s]
[32m2024-06-20 17:13:46.076[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_simulation[0m:[36m45[0m - [1mm = 500, Type I error (feature 0) = 0.04, Power (feature 1) = 0.58, Power (feature 2) = 1.0[0m


Running simulation for m = 1000


100%|██████████| 100/100 [00:00<00:00, 1587.51it/s]
[32m2024-06-20 17:13:46.141[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_simulation[0m:[36m45[0m - [1mm = 1000, Type I error (feature 0) = 0.02, Power (feature 1) = 0.63, Power (feature 2) = 0.98[0m


Running simulation for m = 1500


100%|██████████| 100/100 [00:00<00:00, 1391.54it/s]
[32m2024-06-20 17:13:46.214[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_simulation[0m:[36m45[0m - [1mm = 1500, Type I error (feature 0) = 0.12, Power (feature 1) = 0.67, Power (feature 2) = 1.0[0m


Running simulation for m = 2000


100%|██████████| 100/100 [00:00<00:00, 1261.70it/s]
[32m2024-06-20 17:13:46.294[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_simulation[0m:[36m45[0m - [1mm = 2000, Type I error (feature 0) = 0.07, Power (feature 1) = 0.73, Power (feature 2) = 0.99[0m


Running simulation for m = 2500


100%|██████████| 100/100 [00:00<00:00, 1140.04it/s]
[32m2024-06-20 17:13:46.384[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_simulation[0m:[36m45[0m - [1mm = 2500, Type I error (feature 0) = 0.18, Power (feature 1) = 0.77, Power (feature 2) = 1.0[0m


Running simulation for m = 3000


100%|██████████| 100/100 [00:00<00:00, 1044.98it/s]
[32m2024-06-20 17:13:46.481[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_simulation[0m:[36m45[0m - [1mm = 3000, Type I error (feature 0) = 0.17, Power (feature 1) = 0.8, Power (feature 2) = 1.0[0m


In [9]:
def run_simulation_orth(config):
    np.random.seed(config.seed)
    n = config.n
    p = config.p
    alpha = config.alpha
    m_var = [100, 200, 300, 500, 800, 1000, 2000, 3000, 4000, 5000]  # varying number of unlabeled samples
    
    results = pd.DataFrame()
    for m in m_var:
        print(f"Running simulation for m = {m}")
        type_I_error_feat_0 = np.zeros(config.n_iter)
        power_feat_1 = np.zeros(config.n_iter)
        power_feat_2 = np.zeros(config.n_iter)
        for i in trange(config.n_iter):    
            X_labeled, X_unlabeled, y = generate_orthogonal_design(n, m, p)
            # X_labeled_ols, y_ols for fitting OLS model
            # X_labeled_rest, y_rest combined with X_unlabeled for hypothesis testing
            X_labeled_ols, X_labeled_rest = X_labeled[:n], X_labeled[n:]
            y_ols, y_rest = y[:n], y[n:]

            # Fit OLS model        
            beta_ols = fit_ols(X_labeled_ols, y_ols)

            # Predict on unlabeled data
            y_unlabeled_pred = X_unlabeled @ beta_ols

            # Combine labeled and unlabeled data
            X = np.vstack([X_labeled_rest, X_unlabeled])
            y = np.hstack([y_rest, y_unlabeled_pred])

            # Fit new model on combined data and perform hypothesis testing
            beta_ols_combined = fit_ols(X, y)
            p_values = hypothesis_testing(X, y, beta_ols_combined, alpha)

            # Check if null hypothesis is rejected for first 3 feature
            # First feature is irrelevant, second and third are correlated with target
            type_I_error_feat_0[i] = p_values[0] < alpha
            power_feat_1[i] = p_values[1] < alpha
            power_feat_2[i] = p_values[2] < alpha

        # Save average type I error and power
        type_I_error_feat_0 = np.mean(type_I_error_feat_0)
        power_feat_1 = np.mean(power_feat_1)
        power_feat_2 = np.mean(power_feat_2)
        logger.info(f"m = {m}, Type I error (feature 0) = {type_I_error_feat_0}, Power (feature 1) = {power_feat_1}, Power (feature 2) = {power_feat_2}")
        results[m] = {
            'type_I_error_feat_0': type_I_error_feat_0,
            'power_feat_1': power_feat_1,
            'power_feat_2': power_feat_2
        }

    results.to_csv(os.path.join(config.savedir, 'results_orth.csv'))
    return results

In [10]:
result_orth = run_simulation_orth(Config())

save_path_orth = os.path.join(Config().savedir, 'results_orth.pdf')

plot_results(result_orth, save_path_orth)

Running simulation for m = 100


  0%|          | 0/100 [00:00<?, ?it/s]


ValueError: too many values to unpack (expected 3)

In [43]:
# Test generate_orthogonal_design
n = 1000
m = 2000
p = 10
X_labeled, X_unlabeled, y = generate_orthogonal_design(n, m, p)
assert X_labeled.shape == (2 * n, p)
assert X_unlabeled.shape == (m, p)

# test whether the design matrices are orthogonal
assert np.allclose(X_labeled.T @ X_labeled, np.eye(p))
# print the norm of the correlation matrix
print(np.linalg.norm(X_labeled.T @ X_labeled - np.eye(p)))

9.221269758799057e-16
