In [1]:
import os

import shutil

import torch
from collections import Counter
import numpy as np
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import os
import pandas as pd

from joblib import Parallel, delayed
import argparse
import torch.nn.functional as F



def f_1(x):
    return x[:, 0] + 0.25 * x[:, 1] ** 2+0.1*torch.tanh(0.5*x[:,2]-0.3)


# def f_2(x):
#     return x[:,0]**2/2-abs(x[:,4]*x[:,9])+torch.exp(0.1*x[:,14])-torch.sin(3.141592*x[:,19])
# def f_2(x):
#     return 2*(x[:,0]>0)-2*(x[:,0]<0)

def poisson_loss(logits,y_true):
    """
    Compute the Poisson negative log-likelihood loss.
    
    Args:
        y_true (torch.Tensor): True labels (0, 1, 2, ...), shape (batch_size,).
        logits (torch.Tensor): Output of the DNN (before exponentiation), shape (batch_size,).
    
    Returns:
        torch.Tensor: Mean negative log-likelihood loss over the batch.
    """
    # Convert logits to λ(x) = e^logits
    lambda_pred = torch.exp(logits)
    
    # Compute the negative log-likelihood
    loss = lambda_pred - y_true * logits  # Equivalent to λ(x) - Y * log(λ(x))
    return loss.mean()


def binomial_loss(logits, batch_Y, batch_trial):
    # Ensure batch_trial is a float tensor
    batch_trial = batch_trial.float()
    # Compute softplus for logits and -logits
    softplus_pos = F.softplus(logits)
    softplus_neg = F.softplus(-logits)
    # Calculate the loss components
    loss_elements = batch_Y * softplus_neg + (batch_trial - batch_Y) * softplus_pos
    # Return the mean loss over the batch
    return loss_elements.mean()


class SampleSet:
    def __init__(self, n, p,f_X,module='Bernoulli', mean=0, std=1,trials=None):
        """
        Initializes the SampleSet with n samples and p features for X.
        Y is generated based on the conditional probability P(Y=1|X).
        """
        self.n = n
        self.p = p
        self.mean = mean
        self.std = std
        self.r=None
        self.B=None
        self.module=module
        # Generate X with dimension (n, p)
        X_main = torch.normal(0.0, 1.0, size=(n, 2))
        X_noise = torch.normal(0.0, 1, size=(n, p - 2))
        self.X = torch.cat([X_main, X_noise], dim=1)

        self.subtrain=None
        self.subval=None
        self.counts=None
        self.trials=trials
        self.f_X=f_X
        # Compute z = f(X) and use it to generate P(Y=1|X) and Y
        self.z = self._compute_z(self.X)  # Save z values (f(X))
        self.Y = self._generate_Y(self.z)
    
    def _compute_z(self, X):
        """Computes f(X) = 1 - X_1 + X_2^2 + X_3 * X_4 and returns it."""
        return   self.f_X(X)
    
    def _generate_Y(self, z):
        if self.module=='Bernoulli':
            # Generate Y as a Bernoulli random variable with probability P(Y=1|X)
            P_Y_given_X = 1 / (1 + torch.exp(-z))
            Y = torch.bernoulli(P_Y_given_X)
        elif self.module=='Gaussian':
            Y = torch.normal(mean=z, std=1.0)
        elif self.module=='Binomial':
            if self.trials is None:
                self.trials = torch.randint(low=5, high=6, size=(self.n,))  # Random n_i =5

            P_Y_given_X = 1 / (1 + torch.exp(-z)) # Ensure the rate parameter is positive
            Y = torch.binomial(self.trials.float(), P_Y_given_X)

        elif self.module == 'Poisson':
        # Generate Y as a Poisson random variable with rate parameter (lambda) equal to exp(z)
            rate_param = torch.log(1+torch.exp(z))   # Ensure the rate parameter is positive
            Y = torch.poisson(rate_param)
        else:
        # Raise an error for unsupported modules
            raise ValueError(f"Unsupported module type: {self.module}. Expected one of: 'Bernoulli', 'Binomial', 'Poisson'.")
        return Y
    
    def get_z(self):
        """Returns the computed z values, which represent f(X)."""
        return self.z
    
    def get_sample_set(self):
        """Returns the main sample set (X, Y)."""
        return self.X, self.Y
    
    def get_sub_samples_with_validation(self, B, r):
        """
        Generates B sub-sample sets, each containing r samples randomly selected 
        from the main sample set, along with corresponding validation sets.
        
        Also counts the number of times each index is selected across all B sub-samples.
        
        Returns:
            train_samples: List of tuples, each containing (train_X, train_Y, train_indices)
            validation_samples: List of tuples, each containing (val_X, val_Y, val_indices)
            selection_counts: Dictionary with counts of each index's appearance in the B sub-samples.
        """
        train_samples = []
        validation_samples = []
        selection_counts = Counter({i: 0 for i in range(self.n)})   # To track appearances of each index
        indices = torch.arange(self.n)
        self.B=B
        self.r=r

        
        for _ in range(B):
            # Randomly select r unique indices for the sub-sample
            selected_indices = indices[torch.randperm(self.n)[:r]]
            
            # Update selection count for each index
            selection_counts.update(selected_indices.tolist())
            
            # Get validation indices (those not in selected_indices)
            val_indices = torch.tensor([i for i in indices if i not in selected_indices])

            # Separate sub-sample and validation sets, including original indices
            X_sub = self.X[selected_indices]
            Y_sub = self.Y[selected_indices]
            
            X_val = self.X[val_indices]
            Y_val = self.Y[val_indices]
            if self.module=="Binomial":
                trial_sub=self.trials[selected_indices]
                trial_val=self.trials[val_indices]
            
            # Append to train_samples and validation_samples lists
                train_samples.append((X_sub, Y_sub, selected_indices,trial_sub))
                validation_samples.append((X_val, Y_val, val_indices,trial_val))
            else:
                train_samples.append((X_sub, Y_sub, selected_indices))
                validation_samples.append((X_val, Y_val, val_indices))
        self.subtrain=train_samples
        self.subval=validation_samples
        self.counts=dict(selection_counts)
        return train_samples, validation_samples, dict(selection_counts)
        
        

    
    def save(self, file_path):
        """Saves the SampleSet instance to a file."""
        torch.save(self, file_path)
    
    @staticmethod
    def load(file_path):
        """Loads a SampleSet instance from a file."""
        return torch.load(file_path)



def clear_folder(folder_path):

    if not os.path.exists(folder_path):
        print(f"folder {folder_path} does not exist")
        return


    for item in os.listdir(folder_path):
        item_path = os.path.join(folder_path, item)

        if os.path.isfile(item_path):
            os.remove(item_path)

        elif os.path.isdir(item_path):
            shutil.rmtree(item_path)




class NeuralNetwork(nn.Module):
    def __init__(self, input_size, dropout_rate=0.0):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, 10)
        self.dropout1 = nn.Dropout(dropout_rate)
        self.layer2 = nn.Linear(10, 15)
        self.bn2 = nn.BatchNorm1d(15)
        self.layer3 = nn.Linear(15, 20)
        self.dropout3 = nn.Dropout(dropout_rate)
        self.layer4 = nn.Linear(20, 30)
        self.bn4 = nn.BatchNorm1d(30)
        self.output = nn.Linear(30, 1)

    def forward(self, x):
        x = torch.relu(self.layer1(x))
        x = self.dropout1(x)
        x = torch.relu(self.bn2(self.layer2(x)))
        x = torch.relu(self.layer3(x))
        x = self.dropout3(x)
        x = torch.relu(self.bn4(self.layer4(x)))
        x = self.output(x)
        return x

def train_network(network, train_loader,mode="Bernoulli", num_epochs=100, learning_rate=0.1,weight_decay=0.05,tol=0.0001,patience=7):
    if mode == "Bernoulli":
        criterion = nn.BCEWithLogitsLoss() 
    elif mode=="Poisson":
        criterion=poisson_loss
    elif mode=="Binomial":
        criterion=binomial_loss
    else:
    # Raise an error for unsupported modules
        raise ValueError(f"Unsupported module type {mode}. Expected one of: 'Bernoulli', 'Gaussian', 'Exponential', 'Poisson'.")
    
    # BCELossWithLogits combines sigmoid and binary cross-entropy in one function
    length=len(train_loader.dataset)
    optimizer = optim.SGD(network.parameters(), lr=learning_rate,weight_decay=weight_decay)
    prev_epoch_loss = None  
    stable_count=0 
    # Training loop
    if mode=="Binomial":
        for epoch in range(num_epochs):
            running_loss = 0.0  
            for batch_X, batch_Y,batch_trial in train_loader:
                # Forward pass
                logits = network(batch_X).squeeze() # Get scalar logits, shape (batch_size)
                # print(torch.max(logits))  
                loss = criterion(logits, batch_Y.float(),batch_trial)  # Y needs to be float for BCELossWithLogits
                # Backward pass and optimization
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                running_loss += loss.item() * batch_X.size(0)
        
        # Compute average loss for the epoch
            epoch_loss = running_loss / length
            # print(f"Epoch {epoch}/{num_epochs}, Loss: {epoch_loss:.4f}")
            
            # Early‐stop check
            if prev_epoch_loss is not None:
                delta = abs(prev_epoch_loss - epoch_loss)
                if delta < tol:
                    stable_count += 1
                else:
                    stable_count = 0  # 重置计数

                if stable_count >= patience:
                    print(f"Early stop at epoch {epoch} after {patience} stable epochs with loss {epoch_loss}.")
                    break
            prev_epoch_loss = epoch_loss
        print(f"With loss {epoch_loss}.")

    else:
        for epoch in range(num_epochs):
            running_loss = 0.0  
            for batch_X, batch_Y in train_loader:
                # Forward pass
                logits = network(batch_X).squeeze() # Get scalar logits, shape (batch_size)
                # print(torch.max(logits))  
                loss = criterion(logits, batch_Y.float())  # Y needs to be float for BCELossWithLogits
                # Backward pass and optimization
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                running_loss += loss.item() * batch_X.size(0)
        
        # Compute average loss for the epoch
            epoch_loss = running_loss / length
            # print(f"Epoch {epoch}/{num_epochs}, Loss: {epoch_loss:.4f}")
            
            # Early‐stop check
            if prev_epoch_loss is not None:
                delta = abs(prev_epoch_loss - epoch_loss)
                if delta < tol:
                    stable_count += 1
                else:
                    stable_count = 0  # 重置计数

                if stable_count >= patience:
                    print(f"Early stop at epoch {epoch} after {patience} stable epochs with loss {epoch_loss}.")
                    break
            prev_epoch_loss = epoch_loss
        print(f"With loss {epoch_loss}.")

# Instantiate and train B neural networks, one for each sub-sample
def train_multiple_networks(sample_set, input_size, mode="Bernoulli",batchsize=64, dropout_rate=0.1, num_epochs=100, learning_rate=0.01,weight_decay=0.05,tol=0.0001,patience=7):
    train_samples, validation_samples, selection_counts = sample_set.subtrain, sample_set.subval, sample_set.counts
    
    networks = []  # List to hold trained neural networks
    for i, (train_data, val_data) in enumerate(zip(train_samples, validation_samples)):

        if sample_set.module=="Binomial":
            X_sub, Y_sub, _,trial_sub = train_data
            X_val, Y_val, _,trial_val = val_data

            
            # Prepare data loader for this sub-sample
            train_dataset = TensorDataset(X_sub, Y_sub,trial_sub)
            train_loader = DataLoader(train_dataset, batch_size=batchsize, shuffle=True)

  
        else:
            X_sub, Y_sub, _ = train_data
            X_val, Y_val, _ = val_data

            
            # Prepare data loader for this sub-sample
            train_dataset = TensorDataset(X_sub, Y_sub)
            train_loader = DataLoader(train_dataset, batch_size=batchsize, shuffle=True)
            
        # Instantiate a new network for this sub-sample
        network = NeuralNetwork(input_size,  dropout_rate=dropout_rate)
        # print(f"\nTraining network {i+1}/{B} on sub-sample {i+1}")
        
        # Train the network on the current sub-sample
        train_network(network, train_loader,mode=sample_set.module, num_epochs=num_epochs, learning_rate=learning_rate,weight_decay=weight_decay)
        
        # Append the trained network to the list of networks
        networks.append(network)

        # Validation performance (optional)
        with torch.no_grad():
            if sample_set.module=="Bernoulli":
                logits = network(X_val).squeeze()
                true_logits=f_1(X_val)
                accuracy = (torch.sign(logits) == torch.sign(true_logits)).float().mean() * 100
                print(f"Validation Accuracy for network {i+1}: {accuracy.item():.2f}%")
                print(torch.mean(abs(logits-true_logits)))
            elif sample_set.module == "Poisson":
                logits = network(X_val).squeeze()
                # print(torch.max(logits))
                true_lambda=torch.log(1+torch.exp(f_1(X_val)))
                estimated_lambda=torch.exp(logits)
                print(f" network {i+1}: {torch.max(true_lambda),torch.min(true_lambda)}%")
                # print(torch.max(estimated_lambda))
                # print(torch.max(true_lambda-estimated_lambda),torch.min(true_lambda-estimated_lambda))
                print(torch.mean(abs(true_lambda-estimated_lambda)),torch.std(abs(true_lambda-estimated_lambda)))
                # print(torch.mean(estimated_lambda))
            elif sample_set.module == "Binomial":
                logits = network(X_val).squeeze()
                true_logits=f_1(X_val)
                print(torch.mean(abs(torch.sigmoid(logits)-torch.sigmoid(true_logits))))
                # print(torch.max(abs(torch.sigmoid(logits)-torch.sigmoid(true_logits))))

            else:
                raise ValueError(f"Unsupported module type {sample_set.module}. Expected one of: 'Bernoulli', 'Gaussian', 'Exponential', 'Poisson'.")


    
    return networks





def ensemble_predict_batch_f(Xtest, networks, sample_set):
    ntest = Xtest.shape[0]
    n = sample_set.n       # Total number of original samples
    r = sample_set.r       # Size of each sub-sample
    B = len(networks)      # Number of sub-samples (number of neural networks)

    # Collect logits from all networks for the test set (shape: [ntest, B])
    all_outputs = torch.zeros(ntest, B)
    with torch.no_grad():
        for j, net in enumerate(networks):
            logits = net(Xtest).squeeze(1)
            all_outputs[:, j] = logits

    # Compute inclusion counts J_bji and mean inclusion J_dot_i for each training index i
    J_bji = sample_set.counts  # Dict mapping i -> count of i in each sub-sample
    J_dot_i = {i: J_bji[i] / B for i in range(n)}

    # Ensemble mean prediction for each test sample
    hatf_B = all_outputs.mean(dim=1)  # Shape: [ntest]

    # Initialize accumulators for variance correction terms
    sum_V2 = torch.zeros(ntest)      # Accumulate sum of hat_V_i^2 over i
    sum_Zdiff2 = torch.zeros(ntest)  # Accumulate sum of (Z_ji - hat_V_i)^2 over i and j
    if sample_set.module=='Binomial':
        for i in range(n):
            # Gather Z_{b_j i}(x*) for all sub-samples j (shape: [B, ntest])
            Zs = torch.zeros(B, ntest)
            for j in range(B):
                _, _, Jbjicount,_ = sample_set.subtrain[j]
                in_subset = 1.0 if (i in Jbjicount) else 0.0
                deviations = all_outputs[:, j] - hatf_B  # Shape: [ntest]
                Zs[j] = (in_subset - J_dot_i[i]) * deviations

            # Compute hat_V_i(x*) and accumulate
            hat_V_i = Zs.mean(dim=0)  # Shape: [ntest]
            sum_V2 += hat_V_i.pow(2)
            sum_Zdiff2 += (Zs - hat_V_i.unsqueeze(0)).pow(2).sum(dim=0)


    else: 
        # Loop over each original data index i
        for i in range(n):
            # Gather Z_{b_j i}(x*) for all sub-samples j (shape: [B, ntest])
            Zs = torch.zeros(B, ntest)
            for j in range(B):
                _, _, Jbjicount = sample_set.subtrain[j]
                in_subset = 1.0 if (i in Jbjicount) else 0.0
                deviations = all_outputs[:, j] - hatf_B  # Shape: [ntest]
                Zs[j] = (in_subset - J_dot_i[i]) * deviations

            # Compute hat_V_i(x*) and accumulate
            hat_V_i = Zs.mean(dim=0)  # Shape: [ntest]
            sum_V2 += hat_V_i.pow(2)
            sum_Zdiff2 += (Zs - hat_V_i.unsqueeze(0)).pow(2).sum(dim=0)

    # Correction factor: n(n-1)/(n-r)^2
    factor = (n - 1) / n * (n / (n - r))**2

    # Compute corrected variance terms
    term1 = factor * sum_V2
    term2 = factor * sum_Zdiff2 / (B * (B - 1))
    var_f = term1 - term2          # Bias-corrected variance estimate

    # Standard deviations
    sd_f_raw = torch.sqrt(term1)       # Without bias correction
    sd_f_correct = torch.sqrt(var_f)   # With bias correction


    return all_outputs, [hatf_B, sd_f_raw, sd_f_correct]




def run_one_repeat(rep_id, n, r, B, p, GLM_name, f_1, xtest):
    # 每个 repeat 训练 B 个网络，最后做一次 ensemble 预测
    ss = SampleSet(n, p, f_1, module=GLM_name)
    ss.get_sub_samples_with_validation(B, r)

    networks = train_multiple_networks(
        ss,
        input_size=p,
        mode=GLM_name,
        batchsize=r,
        num_epochs=500,
        learning_rate=0.1,
        weight_decay=0.02,
        dropout_rate=0.1,
        tol=0.0001,
        patience=5
    )

    _, Bf = ensemble_predict_batch_f(xtest, networks, ss)
    return Bf[0], Bf[1],Bf[2]




In [41]:

n = 400
p=10 # Main sample size
B = 1400   
GLM_name='Poisson' # 

In [42]:



function_name='f_1'
r=int(n**(0.9))
input_size = p
hidden_size1 = 128
hidden_size2 = 64

torch.manual_seed(126)
filename_data=f'{GLM_name}_{function_name}n{n}p{p}B{B}r{r}.pth'
filename_model=f'{GLM_name}_{function_name}n{n}p{p}B{B}r{r}h1{hidden_size1}h2{hidden_size2}'
sample_set = SampleSet(n, p, f_1, module=GLM_name)



X_sub,Y_sub=sample_set.get_sample_set()
X_sub1,Y_sub1=X_sub[0:int(n/2),],Y_sub[0:int(n/2)]
X_sub2,Y_sub2=X_sub[int(n/2):n,],Y_sub[int(n/2):n]

if GLM_name=="Binomial":
    trials_sub=sample_set.trials
    X_trial1=trials_sub[int(n/2):n]
    X_trial2=trials_sub[int(n/2):n,]
    train_dataset1 = TensorDataset(X_sub1, Y_sub1,X_trial1)

else: 
    train_dataset1 = TensorDataset(X_sub1, Y_sub1)



train_loader1 = DataLoader(train_dataset1, batch_size=int(n/2), shuffle=True)


network1 = NeuralNetwork(input_size, dropout_rate=0.1)

# Train the network on the current sub-sample
train_network(network1, train_loader1,mode=GLM_name, num_epochs=500, learning_rate=0.1,weight_decay=0.02)


def sigmoid(x):
    return 1/(1+torch.exp(-x))

if function_name=='f_1':
    f=f_1
else:
    f=f_2
    

if GLM_name=="Bernoulli":


    predictions_logits_train2 = network1(torch.tensor(X_sub2)).squeeze()  # Model predictions on training data
    true_logits2=torch.tensor(f_1(X_sub2))
    predictions_train2=1/(1+torch.exp(-predictions_logits_train2))
    true_f2=1/(1+torch.exp(-true_logits2))
    residuals2 = torch.abs(true_logits2 - predictions_logits_train2)  # Absolute residuals

    # quantile=torch.quantile(torch.cat((residuals1,residuals2),dim=0),0.95)

    torch.manual_seed(23)
    xtest=torch.normal(0, 1, (80,p))
    quantile = torch.quantile(residuals2, (1 - 0.05))


    predictions_logits_test=network1(torch.tensor(xtest)).squeeze()
    # predictions_logits_test=(network2(torch.tensor(xtest)).squeeze()+network2(torch.tensor(xtest)).squeeze())/2
    true_logits_test=torch.tensor(f_1(xtest))
    true_f_test=1/(1+torch.exp(-true_logits_test))
    predictions_test=1/(1+torch.exp(-predictions_logits_test))






    print("biasf:",torch.mean(predictions_logits_test-true_logits_test),"biasfsd:",torch.std(predictions_logits_test-true_logits_test))    
    print("MAEf:",torch.mean(abs(predictions_logits_test-true_logits_test)),"MAEfsd:",torch.std(abs(predictions_logits_test-true_logits_test)))    

    print("biaspsi:",torch.mean(predictions_test-true_f_test),"biaspsisd:",torch.std(predictions_test-true_f_test))
    print('MAEpsi:',(torch.mean(abs(true_f_test-predictions_test))),'MAEpsisd:',(torch.std(abs(true_f_test-predictions_test))))
    print("EpmSD:",torch.sqrt(torch.sum((predictions_logits_test-true_logits_test)**2)/len(predictions_logits_test)))

    within_confidence_interval = ((true_logits_test >= predictions_logits_test-quantile) & (true_logits_test <= predictions_logits_test+quantile))

    # within_confidence_interval = ((sigmoid(true_logits_test) >= sigmoid(predictions_logits_test)-quantile) & (sigmoid(true_logits_test) <= sigmoid(predictions_logits_test)+quantile))

    # Step 2: Count the number of times true_probability is within the confidence interval
    count_within_interval = within_confidence_interval.sum()

    # Step 3: Calculate the frequency
    frequency_within_interval = count_within_interval / len(predictions_logits_test) * 100  # as a percentage
    print(frequency_within_interval)
    print(quantile)
    CI_length=torch.mean(abs(1/(1+torch.exp(-predictions_logits_test+quantile))-1/(1+torch.exp(-predictions_logits_test-quantile))))
    print("AIC:",CI_length)
    CI_length_sd=torch.std(abs(1/(1+torch.exp(-predictions_logits_test+quantile))-1/(1+torch.exp(-predictions_logits_test-quantile))))
    print("sd:",CI_length_sd)

elif GLM_name=="Poisson":
    predictions_logits_train2 = network1(torch.tensor(X_sub2)).squeeze()  # Model predictions on training data
    true_logits2=torch.log(torch.log(1+torch.exp(torch.tensor(f(X_sub2)))))
    predictions_lambda2=torch.exp(predictions_logits_train2)
    true_lambda2=torch.exp(true_logits2)
    residuals2 = torch.abs(true_logits2 - predictions_logits_train2)  # Absolute residuals

    quantile = torch.quantile(residuals2, (1 - 0.05))


    xtest=torch.normal(0, 1, (80,p))


    predictions_logits_test=network1(torch.tensor(xtest)).squeeze()
    true_logits_test=torch.log(torch.log(1+torch.exp(torch.tensor(f(xtest)))))

    true_lambda_test=torch.exp(true_logits_test)
    predictions_lambda_test=torch.exp(predictions_logits_test)



    print("biasf:",torch.mean(predictions_logits_test-true_logits_test),"biasfsd:",torch.std(predictions_logits_test-true_logits_test))
    print("MAEf:",torch.mean(abs(predictions_logits_test-true_logits_test)),"MAEfsd:",torch.std(abs(predictions_logits_test-true_logits_test)))
    print("biaspsi:",torch.mean(predictions_lambda_test-true_lambda_test),"biaspsisd:",torch.std(predictions_lambda_test-true_lambda_test))
    print('MAEpsi:',torch.mean(abs(true_lambda_test-predictions_lambda_test)),'MAEpsisd:',torch.std(abs(true_lambda_test-predictions_lambda_test)))
    print("EmpSD:",torch.sqrt(torch.sum((predictions_logits_test-true_logits_test)**2)/len(predictions_logits_test)))

    # print(torch.max(predictions_logits_test))

    # print(predictions_lambda_test[1:60])
    # print(max(predictions_lambda_test+quantile))
    # print(max(true_lambda_test))
    print(quantile)

    within_confidence_interval = ((true_logits_test >= predictions_logits_test-quantile) & (true_logits_test <= predictions_logits_test+quantile))

    # Step 2: Count the number of times true_probability is within the confidence interval
    count_within_interval = within_confidence_interval.sum()

    # Step 3: Calculate the frequency
    frequency_within_interval = count_within_interval / len(predictions_logits_test) * 100  # as a percentage
    print(frequency_within_interval)

    CI_length=torch.mean(abs(torch.exp(predictions_logits_test+quantile)-torch.exp(predictions_logits_test-quantile)))
    print("AIL:",CI_length)
    CI_length_sd=torch.std(abs(torch.exp(predictions_logits_test+quantile)-torch.exp(predictions_logits_test-quantile)))
    print("sd:",CI_length_sd)
else: 

    predictions_logits_train2 = network1(torch.tensor(X_sub2)).squeeze()  # Model predictions on training data
    true_logits2=torch.tensor(f_1(X_sub2))
    predictions_train2=1/(1+torch.exp(-predictions_logits_train2))
    true_f2=1/(1+torch.exp(-true_logits2))
    residuals2 = torch.abs(true_logits2 - predictions_logits_train2)  # Absolute residuals

    # quantile=torch.quantile(torch.cat((residuals1,residuals2),dim=0),0.95)

    torch.manual_seed(23)
    xtest=torch.normal(0, 1, (80,p))
    quantile = torch.quantile(residuals2, (1 - 0.05))


    predictions_logits_test=network1(torch.tensor(xtest)).squeeze()
    # predictions_logits_test=(network2(torch.tensor(xtest)).squeeze()+network2(torch.tensor(xtest)).squeeze())/2
    true_logits_test=torch.tensor(f_1(xtest))
    true_f_test=1/(1+torch.exp(-true_logits_test))
    predictions_test=1/(1+torch.exp(-predictions_logits_test))






    print("biasf:",torch.mean(predictions_logits_test-true_logits_test))    
    print("MAEf:",torch.mean(abs(predictions_logits_test-true_logits_test)))    
    print("MAEfsd:",torch.std(abs(predictions_logits_test-true_logits_test)))   
    print("biaspsi:",torch.mean(predictions_test-true_f_test))
    print('MAEpsi:',(torch.mean(abs(true_f_test-predictions_test))))
    print('MAEpsisd:',(torch.std(abs(true_f_test-predictions_test))))
    print("EpmSD:",torch.sqrt(torch.sum((predictions_logits_test-true_logits_test)**2)/len(predictions_logits_test)))

    within_confidence_interval = ((true_logits_test >= predictions_logits_test-quantile) & (true_logits_test <= predictions_logits_test+quantile))

    # within_confidence_interval = ((sigmoid(true_logits_test) >= sigmoid(predictions_logits_test)-quantile) & (sigmoid(true_logits_test) <= sigmoid(predictions_logits_test)+quantile))

    # Step 2: Count the number of times true_probability is within the confidence interval
    count_within_interval = within_confidence_interval.sum()

    # Step 3: Calculate the frequency
    frequency_within_interval = count_within_interval / len(predictions_logits_test) * 100  # as a percentage
    print(frequency_within_interval)
    print(quantile)
    CI_length=torch.mean(abs(1/(1+torch.exp(-predictions_logits_test+quantile))-1/(1+torch.exp(-predictions_logits_test-quantile))))
    print("AIC:",CI_length)
    CI_length_sd=torch.std(abs(1/(1+torch.exp(-predictions_logits_test+quantile))-1/(1+torch.exp(-predictions_logits_test-quantile))))
    print("sd:",CI_length_sd)







With loss 0.5079498291015625.
biasf: tensor(-0.3823, grad_fn=<MeanBackward0>) biasfsd: tensor(1.3055, grad_fn=<StdBackward0>)
MAEf: tensor(0.9693, grad_fn=<MeanBackward0>) MAEfsd: tensor(0.9492, grad_fn=<StdBackward0>)
biaspsi: tensor(-0.0003, grad_fn=<MeanBackward0>) biaspsisd: tensor(0.8312, grad_fn=<StdBackward0>)
MAEpsi: tensor(0.5904, grad_fn=<MeanBackward0>) MAEpsisd: tensor(0.5813, grad_fn=<StdBackward0>)
EmpSD: tensor(1.3525, grad_fn=<SqrtBackward0>)
tensor(3.1146, grad_fn=<SqueezeBackward4>)
tensor(93.7500)
AIL: tensor(20.8727, grad_fn=<MeanBackward0>)
sd: tensor(16.9415, grad_fn=<StdBackward0>)


  predictions_logits_train2 = network1(torch.tensor(X_sub2)).squeeze()  # Model predictions on training data
  true_logits2=torch.log(torch.log(1+torch.exp(torch.tensor(f(X_sub2)))))
  predictions_logits_test=network1(torch.tensor(xtest)).squeeze()
  true_logits_test=torch.log(torch.log(1+torch.exp(torch.tensor(f(xtest)))))


In [32]:



function_name='f_1'
r=int(n**(0.9))
input_size = p
hidden_size1 = 128
hidden_size2 = 64

torch.manual_seed(13)
filename_data=f'{GLM_name}_{function_name}n{n}p{p}B{B}r{r}.pth'
filename_model=f'{GLM_name}_{function_name}n{n}p{p}B{B}r{r}h1{hidden_size1}h2{hidden_size2}'
xtest    = torch.load(f"xtest10.pt")
prediction_logit_list=torch.zeros(20, 80)
quantile_list=torch.zeros(20)
for repeat0 in range(20): 
    print(repeat0)
    sample_set = SampleSet(n, p, f_1, module=GLM_name)



    X_sub,Y_sub=sample_set.get_sample_set()
    X_sub1,Y_sub1=X_sub[0:int(n/2),],Y_sub[0:int(n/2)]
    X_sub2,Y_sub2=X_sub[int(n/2):n,],Y_sub[int(n/2):n]

    if GLM_name=="Binomial":
        trials_sub=sample_set.trials
        X_trial1=trials_sub[int(n/2):n]
        X_trial2=trials_sub[int(n/2):n,]
        train_dataset1 = TensorDataset(X_sub1, Y_sub1,X_trial1)

    else: 
        train_dataset1 = TensorDataset(X_sub1, Y_sub1)



    train_loader1 = DataLoader(train_dataset1, batch_size=int(n/2), shuffle=True)


    network1 = NeuralNetwork(input_size,  dropout_rate=0.1)

    # Train the network on the current sub-sample
    train_network(network1, train_loader1,mode=GLM_name, num_epochs=500, learning_rate=0.1,weight_decay=0.02)


    def sigmoid(x):
        return 1/(1+torch.exp(-x))

    if function_name=='f_1':
        f=f_1
    else:
        f=f_2
    



    if GLM_name=="Bernoulli":


        predictions_logits_train2 = network1(torch.tensor(X_sub2)).clone().detach().squeeze()  # Model predictions on training data
        true_logits2=torch.tensor(f_1(X_sub2))
        predictions_train2=1/(1+torch.exp(-predictions_logits_train2))
        true_f2=1/(1+torch.exp(-true_logits2))
        residuals2 = torch.abs(true_logits2 - predictions_logits_train2)  # Absolute residuals

        # quantile=torch.quantile(torch.cat((residuals1,residuals2),dim=0),0.95)

        
        
        quantile = torch.quantile(residuals2, (1 - 0.05))


        predictions_logits_test=network1(torch.tensor(xtest)).squeeze()
        prediction_logit_list[repeat0,:]=predictions_logits_test
        quantile_list[repeat0]=quantile
        # predictions_logits_test=(network2(torch.tensor(xtest)).squeeze()+network2(torch.tensor(xtest)).squeeze())/2
        # true_logits_test=torch.tensor(f_1(xtest))
        # true_f_test=1/(1+torch.exp(-true_logits_test))
        # predictions_test=1/(1+torch.exp(-predictions_logits_test))


    elif GLM_name=="Poisson":
        predictions_logits_train2 = network1(torch.tensor(X_sub2)).clone().detach().squeeze()  # Model predictions on training data
        true_logits2=torch.log(torch.log(1+torch.exp(torch.tensor(f(X_sub2)))))
        predictions_lambda2=torch.exp(predictions_logits_train2)
        true_lambda2=torch.exp(true_logits2)
        residuals2 = torch.abs(true_logits2 - predictions_logits_train2)  # Absolute residuals

        quantile = torch.quantile(residuals2, (1 - 0.05))


        


        predictions_logits_test=network1(torch.tensor(xtest)).squeeze()
        prediction_logit_list[repeat0,:]=predictions_logits_test
        quantile_list[repeat0]=quantile



    else: 

        predictions_logits_train2 = network1(torch.tensor(X_sub2)).squeeze()  # Model predictions on training data
        true_logits2=torch.tensor(f_1(X_sub2))
        predictions_train2=1/(1+torch.exp(-predictions_logits_train2))
        true_f2=1/(1+torch.exp(-true_logits2))
        residuals2 = torch.abs(true_logits2 - predictions_logits_train2)  # Absolute residuals

        # quantile=torch.quantile(torch.cat((residuals1,residuals2),dim=0),0.95)

        
        quantile = torch.quantile(residuals2, (1 - 0.05))


        predictions_logits_test=network1(torch.tensor(xtest)).squeeze()
        prediction_logit_list[repeat0,:]=predictions_logits_test
        quantile_list[repeat0]=quantile











0
With loss 0.2975127398967743.
1


  predictions_logits_train2 = network1(torch.tensor(X_sub2)).clone().detach().squeeze()  # Model predictions on training data
  true_logits2=torch.tensor(f_1(X_sub2))
  predictions_logits_test=network1(torch.tensor(xtest)).squeeze()


With loss 0.3306395709514618.
2
With loss 0.3748362064361572.
3
With loss 0.30596643686294556.
4
With loss 0.35511329770088196.
5
With loss 0.27200281620025635.
6
With loss 0.3251241147518158.
7
With loss 0.29820695519447327.
8
With loss 0.33640316128730774.
9
With loss 0.40474221110343933.
10
With loss 0.3220440149307251.
11
With loss 0.36167341470718384.
12
With loss 0.3439752161502838.
13
With loss 0.3581956624984741.
14
With loss 0.35045912861824036.
15
With loss 0.31318238377571106.
16
With loss 0.34927910566329956.
17
With loss 0.4166376292705536.
18
With loss 0.43191924691200256.
19
With loss 0.4177580773830414.


In [33]:
quantile_matrix = quantile_list.view(-1, 1)  
quantile_matrix = quantile_matrix.expand_as(prediction_logit_list) 

true_matrix = true_logits_test.unsqueeze(0).expand_as(prediction_logit_list)  # [20, 80]
quantile_matrix = quantile_list.view(-1, 1).expand_as(prediction_logit_list)  # [20, 80]

# Confidence interval bounds
lower_bound = prediction_logit_list - quantile_matrix
upper_bound = prediction_logit_list + quantile_matrix

# --------------------------
# Biasf and MAEf
mean_prediction = prediction_logit_list.mean(dim=0)  # [80]
bias_f = (mean_prediction - true_logits_test).mean().item()
mae_f = (mean_prediction - true_logits_test).abs().mean().item()

# --------------------------
# ψ' transformation: assume logistic model, so ψ'(x) = sigmoid(x)
true_psi = torch.sigmoid(true_logits_test)  # [80]
pred_psi = torch.sigmoid(mean_prediction)   # [80]

bias_psi = (pred_psi - true_psi).mean().item()
mae_psi = (pred_psi - true_psi).abs().mean().item()

# --------------------------
# Empirical Standard Deviation
emp_sd = prediction_logit_list.std(dim=0)  # [80]
emp_sd_mean = emp_sd.mean().item()

# --------------------------
# Coverage Probability
covered = (true_matrix >= lower_bound) & (true_matrix <= upper_bound)  # [20, 80]
cp = covered.float().mean(dim=0).mean().item()  # scalar: average over 80 points

# --------------------------
# AIL (average interval length) after sigmoid
upper_sigmoid = torch.sigmoid(upper_bound)
lower_sigmoid = torch.sigmoid(lower_bound)
ail = (upper_sigmoid - lower_sigmoid).mean().item()


In [36]:
ail

0.7787343859672546

In [5]:

    


if GLM_name=="Bernoulli":


    predictions_logits_train2 = network1(torch.tensor(X_sub2)).squeeze()  # Model predictions on training data
    true_logits2=torch.tensor(f_1(X_sub2))
    predictions_train2=1/(1+torch.exp(-predictions_logits_train2))
    true_f2=1/(1+torch.exp(-true_logits2))
    residuals2 = torch.abs(true_logits2 - predictions_logits_train2)  # Absolute residuals

    # quantile=torch.quantile(torch.cat((residuals1,residuals2),dim=0),0.95)

    
    
    quantile = torch.quantile(residuals2, (1 - 0.05))


    predictions_logits_test=network1(torch.tensor(xtest)).squeeze()
    # predictions_logits_test=(network2(torch.tensor(xtest)).squeeze()+network2(torch.tensor(xtest)).squeeze())/2
    true_logits_test=torch.tensor(f_1(xtest))
    true_f_test=1/(1+torch.exp(-true_logits_test))
    predictions_test=1/(1+torch.exp(-predictions_logits_test))






    print("biasf:",torch.mean(predictions_logits_test-true_logits_test),"biasf:",torch.std(predictions_logits_test-true_logits_test))    
    print("MAEf:",torch.mean(abs(predictions_logits_test-true_logits_test)),"MAEfsd:",torch.std(abs(predictions_logits_test-true_logits_test)))     
    print("biaspsi:",torch.mean(predictions_test-true_f_test),"biaspsisd:",torch.std(predictions_test-true_f_test))
    print('MAEpsi:',(torch.mean(abs(true_f_test-predictions_test))),'MAEpsisd:',(torch.std(abs(true_f_test-predictions_test))))
    # print('MAEpsisd:',(torch.std(abs(true_f_test-predictions_test))))
    print("EpmSD:",torch.sqrt(torch.sum((predictions_logits_test-true_logits_test)**2)/len(predictions_logits_test)))

    within_confidence_interval = ((true_logits_test >= predictions_logits_test-quantile) & (true_logits_test <= predictions_logits_test+quantile))

    # within_confidence_interval = ((sigmoid(true_logits_test) >= sigmoid(predictions_logits_test)-quantile) & (sigmoid(true_logits_test) <= sigmoid(predictions_logits_test)+quantile))

    # Step 2: Count the number of times true_probability is within the confidence interval
    count_within_interval = within_confidence_interval.sum()

    # Step 3: Calculate the frequency
    frequency_within_interval = count_within_interval / len(predictions_logits_test) * 100  # as a percentage
    print("Coverp:",frequency_within_interval)
    CI_length=torch.mean(abs(1/(1+torch.exp(-predictions_logits_test+quantile))-1/(1+torch.exp(-predictions_logits_test-quantile))))
    print("AIC:",CI_length)
    CI_length_sd=torch.std(abs(1/(1+torch.exp(-predictions_logits_test+quantile))-1/(1+torch.exp(-predictions_logits_test-quantile))))
    print("sd:",CI_length_sd)

elif GLM_name=="Poisson":
    


    
    true_logits_test=torch.log(torch.log(1+torch.exp(torch.tensor(f(xtest)))))
    
    true_lambda_test=torch.exp(true_logits_test)
    predictions_lambda_test=torch.exp(prediction_logit_list)



    print("biasf:",torch.mean(prediction_logit_list-true_logits_test),"biasfsd:",torch.std(prediction_logit_list-true_logits_test))
    print("biasf:",torch.mean(abs(prediction_logit_list-true_logits_test)),"biasfsd:",torch.std(abs(prediction_logit_list-true_logits_test)))
    print("biaspsi:",torch.mean(predictions_lambda_test-true_lambda_test),"biaspsisd:",torch.std(predictions_lambda_test-true_lambda_test))
    print('MAEpsi:',torch.mean(abs(true_lambda_test-predictions_lambda_test)),'MAEpsisd:',torch.std(abs(true_lambda_test-predictions_lambda_test)))
    print("EmpSD:",torch.sqrt(torch.sum((prediction_logit_list-true_logits_test)**2)/len(prediction_logit_list)))

    # print(torch.max(predictions_logits_test))

    # print(predictions_lambda_test[1:60])
    # print(max(predictions_lambda_test+quantile))
    # print(max(true_lambda_test))
    print(quantile)

    within_confidence_interval = ((true_logits_test >= prediction_logit_list-quantile_list) & (true_logits_test <= prediction_logit_list+quantile_list))

    # Step 2: Count the number of times true_probability is within the confidence interval
    count_within_interval = within_confidence_interval.sum()

    # Step 3: Calculate the frequency
    frequency_within_interval = count_within_interval / len(predictions_logits_test) * 100  # as a percentage
    print("CP:",frequency_within_interval)

    CI_length=torch.mean(abs(torch.exp(prediction_logit_list+quantile_list)-torch.exp(prediction_logit_list-quantile_list)))
    print("AIL:",CI_length)
    CI_length_sd=torch.std(abs(torch.exp(prediction_logit_list+quantile_list)-torch.exp(prediction_logit_list-quantile_list)))
    print("sd:",CI_length_sd)
else: 

    predictions_logits_train2 = network1(torch.tensor(X_sub2)).squeeze()  # Model predictions on training data
    true_logits2=torch.tensor(f_1(X_sub2))
    predictions_train2=1/(1+torch.exp(-predictions_logits_train2))
    true_f2=1/(1+torch.exp(-true_logits2))
    residuals2 = torch.abs(true_logits2 - predictions_logits_train2)  # Absolute residuals

    # quantile=torch.quantile(torch.cat((residuals1,residuals2),dim=0),0.95)

    
    quantile = torch.quantile(residuals2, (1 - 0.05))


    predictions_logits_test=network1(torch.tensor(xtest)).squeeze()
    # predictions_logits_test=(network2(torch.tensor(xtest)).squeeze()+network2(torch.tensor(xtest)).squeeze())/2
    true_logits_test=torch.tensor(f_1(xtest))
    true_f_test=1/(1+torch.exp(-true_logits_test))
    predictions_test=1/(1+torch.exp(-predictions_logits_test))






    print("biasf:",torch.mean(prediction_logit_list-true_logits_test),"biasf:",torch.std(predictions_logits_test-true_logits_test))    
    print("MAEf:",torch.mean(abs(predictions_logits_test-true_logits_test)),"MAEfsd:",torch.std(abs(prediction_logit_list-true_logits_test)))     
    print("biaspsi:",torch.mean(predictions_test-true_f_test),"biaspsisd:",torch.std(predictions_test-true_f_test))
    print('MAEpsi:',(torch.mean(abs(true_f_test-predictions_test))),'MAEpsisd:',(torch.std(abs(true_f_test-predictions_test))))
    # print('MAEpsisd:',(torch.std(abs(true_f_test-predictions_test))))
    print("EpmSD:",torch.sqrt(torch.sum((predictions_logits_test-true_logits_test)**2)/len(predictions_logits_test)))

    within_confidence_interval = ((true_logits_test >= predictions_logits_test-quantile) & (true_logits_test <= predictions_logits_test+quantile))

    # within_confidence_interval = ((sigmoid(true_logits_test) >= sigmoid(predictions_logits_test)-quantile) & (sigmoid(true_logits_test) <= sigmoid(predictions_logits_test)+quantile))

    # Step 2: Count the number of times true_probability is within the confidence interval
    count_within_interval = within_confidence_interval.sum()

    # Step 3: Calculate the frequency
    frequency_within_interval = count_within_interval / len(predictions_logits_test) * 100  # as a percentage
    print("CP:",frequency_within_interval)
    # print(quantile)
    CI_length=torch.mean(abs(1/(1+torch.exp(-predictions_logits_test+quantile))-1/(1+torch.exp(-predictions_logits_test-quantile))))
    print("AIC:",CI_length)
    CI_length_sd=torch.std(abs(1/(1+torch.exp(-predictions_logits_test+quantile))-1/(1+torch.exp(-predictions_logits_test-quantile))))
    print("sd:",CI_length_sd)







biasf: tensor(-0.1020, grad_fn=<MeanBackward0>) biasf: tensor(2.0035, grad_fn=<StdBackward0>)
MAEf: tensor(1.7192, grad_fn=<MeanBackward0>) MAEfsd: tensor(1.0157, grad_fn=<StdBackward0>)
biaspsi: tensor(-0.0352, grad_fn=<MeanBackward0>) biaspsisd: tensor(0.2792, grad_fn=<StdBackward0>)
MAEpsi: tensor(0.2489, grad_fn=<MeanBackward0>) MAEpsisd: tensor(0.1284, grad_fn=<StdBackward0>)
EpmSD: tensor(1.9936, grad_fn=<SqrtBackward0>)
Coverp: tensor(88.7500)
AIC: tensor(0.6732, grad_fn=<MeanBackward0>)
sd: tensor(0.2170, grad_fn=<StdBackward0>)


  predictions_logits_train2 = network1(torch.tensor(X_sub2)).squeeze()  # Model predictions on training data
  true_logits2=torch.tensor(f_1(X_sub2))
  predictions_logits_test=network1(torch.tensor(xtest)).squeeze()
  true_logits_test=torch.tensor(f_1(xtest))
