# Wednesday Noisy-ML Simulation Framework 

Case: Training n=1,000 run to assess relationships of noisy signal and model performance 

Part of a paper submission: *The Invisible Performance of Regression Models on Noisy Measurements*

Author: Fatma-Elzahraa Eid, Broad Institute of MIT and Harvard 

In [1]:
# [0] Imports 
import os
import numpy as np
import pandas as pd
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # '2' to suppress TensorFlow informational message (oneDNN)
import tensorflow as tf
from tensorflow import keras
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
from scipy.stats import pearsonr, spearmanr
from scipy.stats import shapiro, anderson
import warnings


In [2]:
# [1] Simulation Class
class NoiseSimulation:
    """
    Class for performing noise simulation on a neural network model.
    Encapsulates all relevant functions for data generation, training, and evaluation.
    """
    def __init__(self, results_folder='results', results_filename='Noise_Simulation_temp.csv',
                 n_rounds=100, 
                 features_range=[10,1000],
                 example_factor=200,
                 noise_factor_range=[0.01, 2.0], 
                 noise_factor_low=[0.01, 1.0],
                 noise_factor_high=[1.01, 2.0]):
        """
        Initialize the NoiseSimulation class with necessary parameters.
        
        :param results_folder: Folder to save results
        :param results_filename: Name of the results CSV file
        :param n_rounds: Number of simulation rounds
        :param noise_factor_range: Range for noise factor
        :param noise_factor_low: Range for low noise factor
        :param noise_factor_high: Range for high noise factor
        """
        self.results_folder = results_folder
        self.results_filename = results_filename
        self.n_rounds = n_rounds
        self.features_range = features_range
        self.example_factor = example_factor
        self.noise_factor_range = noise_factor_range
        self.noise_factor_low = noise_factor_low
        self.noise_factor_high = noise_factor_high
        self.results = {
            'n_features': [], 'n_samples': [], 'corr_Ytrue_Ynoisy1': [], 'corr_Ytrue_Ynoisy2': [],
            'corr_Ynoisy1_Ynoisy2': [], 'corr_Yavg12_Ytrue':[], 'var_Ynoisy1': [], 'var_Ynoisy2': [],
            'corr_pred_Ytrue': [], 'corr_pred_Ynoisy1': [],
            'corr_pred_Ynoisy2': [], 'corr_pred_YnoisyLow': [], 'corr_pred_YnoisyHigh': [],
            'var_Ynoisy1': [], 'var_Ynoisy2': [], 'var_Ypred': [], 'cov_Ynoisy1_Ynoisy2': [], 
            'cov_pred_Ynoisy1': []
        }
               

        # Create the results folder if it does not exist
        if not os.path.exists(self.results_folder):
            os.makedirs(self.results_folder)
        self.results_filename_infolder = os.path.join(self.results_folder, self.results_filename)

    @staticmethod
    #def generate_features(example_factor = 10,features_range = [10,100]): 
    def generate_features(n_features=10, n_samples =1000): 
        """
        Generate features using various statistical distributions.
        
        :param n_samples: Number of samples to generate
        :param n_features: Number of features to generate
        :return: A 2D NumPy array with generated features
        """
        #n_features = np.random.uniform(features_range[0], features_range[1], 1)
        #n_samples = n_features*example_factor           
            
        distribution_pool = [
            lambda: np.random.normal(0, 1, n_samples),
            lambda: np.random.uniform(-1, 1, n_samples),
            lambda: np.random.lognormal(0, 1, n_samples),
            lambda: np.random.exponential(1, n_samples),
            lambda: np.random.beta(2, 5, n_samples),
            lambda: np.random.gamma(2, 2, n_samples)
        ]
        features = [np.random.choice(distribution_pool)() for _ in range(n_features)]
        return np.column_stack(features)

    @staticmethod
    def generate_targets(X):
        """
        Generate target values based on transformations of standardized features.
        
        :param X: Standardized feature matrix
        :return: Generated target values
        """
        expressions_pool = [
            lambda x: np.random.uniform(1.0, 2.0, 1)[0] * np.sin(x),
            lambda x: np.log(np.abs(x) + np.random.uniform(0.01, 2.0, 1)[0]),
            lambda x: x ** 2,
            lambda x: x ** 3,
            lambda x: np.sqrt(np.abs(x)),
            lambda x: np.exp(x / (np.abs(x) + np.random.uniform(0.01, 2.0, 1)[0]))
        ]
        terms = [np.random.choice(expressions_pool) for _ in range(X.shape[1])]
        transformed_columns = [term(X[:, i]) for i, term in enumerate(terms)]
        return sum(transformed_columns)

    @staticmethod
    def noise_factor(Y, noise_factor_range=[0.05, 0.1]):
        """
        Determine noise factor based on the standard deviation of target values.
        
        :param Y: Target values
        :param noise_factor_range: Range for noise factor
        :return: Calculated noise level
        """
        # Noise factor is a scalar of the standard deviation of Y
        # so that whatever Y's range is, the noise level is proportional to it
        variability = np.std(Y)
        noise_factor = np.random.uniform(noise_factor_range[0], noise_factor_range[1])
        noise_level = noise_factor * variability
        return noise_level

    @staticmethod
    def add_noise(Y, noise_level):
        """
        Add Gaussian noise to target values.
        
        :param Y: Target values
        :param noise_level: Standard deviation of the Gaussian noise
        :return: Noisy target values
        """
        return Y + np.random.normal(0, noise_level, Y.shape[0])

    @staticmethod
    def norm_y(Y):
        """
        Normalize target values to ensure normal distribution using log transformation.
        
        :param Y: Target values
        :return: Normalized target values
        """
        # Perform Shapiro-Wilk test for normality
        # Even if this test is not accurate for N>5000, we are not concerned about the exact p-value
        # Suppress the N>5000 warning
        warnings.filterwarnings("ignore", message="p-value may not be accurate for N > 5000.")
        _, p_value = shapiro(Y)

        # Define the significance level
        alpha = 0.05
        if p_value < alpha:
            # Perform negativity test
            if np.min(Y) <= 0:
                # Shifting
                Y = Y - np.min(Y) + 1  # Shift Y to ensure all values are positive, add 1 to avoid zero values
            Y_transformed = np.log(Y)
            return Y_transformed
        else:
            return Y

    @staticmethod
    def build_model(input_shape):
        """
        Build and compile a neural network model.
        
        :param input_shape: Shape of the input layer
        :return: Compiled Keras model
        """
        model = keras.Sequential([
            keras.layers.Dense(16, activation='relu', input_shape=input_shape),
            keras.layers.Dense(8, activation='relu'),
            keras.layers.Dense(1)
        ])
        model.compile(optimizer='adam', loss='mse', metrics=['mae'])
        return model

    def run_simulation(self, n_samples=1000, n_features=10, noise_factor_range=[0.01, 2.0],
                       noise_factor_low=[0.01, 1.0], noise_factor_high=[1.01, 2.0]):
        """
        Run a single round of noise simulation.
        
        :param n_samples: Number of samples to generate
        :param n_features: Number of features to generate
        :param noise_factor_range: Range for general noise factor
        :param noise_factor_low: Range for low noise factor
        :param noise_factor_high: Range for high noise factor
        :return: Correlation results
        """
        # Generate features
        X = self.generate_features(n_features,n_samples)

        # Split features into train and test sets
        X_train, X_test = train_test_split(X, test_size=0.2, random_state=42)

        # Standardization for training and testing data
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)

        # Generate target values using the same transformations
        X_stacked = np.vstack((X_train_scaled, X_test_scaled))
        Y_stacked = self.generate_targets(X_stacked)

        # Normalize target values for better prediction performance
        Y_stacked = self.norm_y(Y_stacked)

        # Split target values into training and testing sets
        Ytrue_train, Ytrue_test = np.split(Y_stacked, [X_train_scaled.shape[0]], axis=0)

        # Generate noisy replicates
        noise_level1 = self.noise_factor(Ytrue_train, noise_factor_range)
        noise_level2 = self.noise_factor(Ytrue_train, noise_factor_range)
        noise_level_low = self.noise_factor(Ytrue_train, noise_factor_low)
        noise_level_high = self.noise_factor(Ytrue_train, noise_factor_high)

        Y_replicate1 = self.add_noise(Y_stacked, noise_level1)
        Y_replicate2 = self.add_noise(Y_stacked, noise_level2)
        Y_replicateLow = self.add_noise(Y_stacked, noise_level_low)
        Y_replicateHigh = self.add_noise(Y_stacked, noise_level_high)

        Y_replicate1_train, Y_replicate1_test = np.split(Y_replicate1, [X_train_scaled.shape[0]], axis=0)
        Y_replicate2_train, Y_replicate2_test = np.split(Y_replicate2, [X_train_scaled.shape[0]], axis=0)
        Y_replicateLow_train, Y_replicateLow_test = np.split(Y_replicateLow, [X_train_scaled.shape[0]], axis=0)
        Y_replicateHigh_train, Y_replicateHigh_test = np.split(Y_replicateHigh, [X_train_scaled.shape[0]], axis=0)

        # Data Quality
        corr_Ytrue_Ynoisy1 = pearsonr(Y_replicate1_train, Ytrue_train)[0]
        corr_Ytrue_Ynoisy2 = pearsonr(Y_replicate2_train, Ytrue_train)[0]
        corr_Ynoisy1_Ynoisy2 = pearsonr(Y_replicate1_train, Y_replicate2_train)[0]
        var_Ynoisy1 = np.var(Y_replicate1_train) 
        var_Ynoisy2 = np.var(Y_replicate2_train) 
        cov_Ynoisy1_Ynoisy2 = np.cov(Y_replicate1_train,Y_replicate2_train)[0,1]
        corr_Yavg12_Ytrue = pearsonr(0.5*(Y_replicate1_train+Y_replicate2_train), Ytrue_train)[0]
    
        

        # Build and train the neural network model
        model = self.build_model((n_features,))
        early_stopping = keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)
        model.fit(X_train_scaled, Y_replicate1_train, epochs=1000, validation_split=0.2,
                  callbacks=[early_stopping], verbose=0)


        # Prediction and performance evaluation
        Y_pred = model.predict(X_test_scaled).flatten()
        corr_pred_Ytrue = pearsonr(np.array(Ytrue_test).flatten(), Y_pred)[0]
        corr_pred_Ynoisy1 = pearsonr(np.array(Y_replicate1_test).flatten(), Y_pred)[0]
        corr_pred_Ynoisy2 = pearsonr(np.array(Y_replicate2_test).flatten(), Y_pred)[0]
        corr_pred_YnoisyLow = pearsonr(np.array(Y_replicateLow_test).flatten(), Y_pred)[0]
        corr_pred_YnoisyHigh = pearsonr(np.array(Y_replicateHigh_test).flatten(), Y_pred)[0]
        var_Ypred = np.var(Y_pred)
        cov_pred_Ynoisy1 = np.cov(Y_pred, Y_replicate1_test)[0,1]          
  

        return (corr_Ytrue_Ynoisy1, corr_Ytrue_Ynoisy2, corr_Ynoisy1_Ynoisy2,
                corr_pred_Ytrue, corr_pred_Ynoisy1, corr_pred_Ynoisy2,
                corr_pred_YnoisyLow, corr_pred_YnoisyHigh,
                var_Ynoisy1, var_Ynoisy2, corr_Yavg12_Ytrue, cov_Ynoisy1_Ynoisy2,
                var_Ypred, cov_pred_Ynoisy1)
          
    def run(self):
        """
        Run the simulation for a specified number of rounds.
        """
        for _ in range(self.n_rounds):
            n_features = np.random.randint(self.features_range[0], self.features_range[1])
            n_samples = n_features * self.example_factor

            (corr_Ytrue_Ynoisy1, corr_Ytrue_Ynoisy2, corr_Ynoisy1_Ynoisy2,
             corr_pred_Ytrue, corr_pred_Ynoisy1, corr_pred_Ynoisy2,
             corr_pred_YnoisyLow, corr_pred_YnoisyHigh,
             var_Ynoisy1, var_Ynoisy2, corr_Yavg12_Ytrue, 
             cov_Ynoisy1_Ynoisy2, var_Ypred, cov_pred_Ynoisy1) = self.run_simulation(
                n_samples=n_samples, n_features=n_features,
                noise_factor_range=self.noise_factor_range,
                noise_factor_low=self.noise_factor_low,
                noise_factor_high=self.noise_factor_high
            )
            

            # Append round-specific parameters to the results dictionary
            self.results['n_features'].append(n_features)
            self.results['n_samples'].append(n_samples)
            self.results['corr_Ytrue_Ynoisy1'].append(corr_Ytrue_Ynoisy1)
            self.results['corr_Ytrue_Ynoisy2'].append(corr_Ytrue_Ynoisy2)
            self.results['corr_Ynoisy1_Ynoisy2'].append(corr_Ynoisy1_Ynoisy2)
            self.results['corr_pred_Ytrue'].append(corr_pred_Ytrue)
            self.results['corr_pred_Ynoisy1'].append(corr_pred_Ynoisy1)
            self.results['corr_pred_Ynoisy2'].append(corr_pred_Ynoisy2)
            self.results['corr_pred_YnoisyLow'].append(corr_pred_YnoisyLow)
            self.results['corr_pred_YnoisyHigh'].append(corr_pred_YnoisyHigh)
            self.results['var_Ynoisy1'].append(var_Ynoisy1)
            self.results['var_Ynoisy2'].append(var_Ynoisy2)
            self.results['corr_Yavg12_Ytrue'].append(corr_Yavg12_Ytrue)
            self.results['cov_Ynoisy1_Ynoisy2'].append(cov_Ynoisy1_Ynoisy2)
            self.results['var_Ypred'].append(var_Ypred)
            self.results['cov_pred_Ynoisy1'].append(cov_pred_Ynoisy1)    
  

            # Save results after each round
            results_df = pd.DataFrame(self.results)
            results_df.to_csv(self.results_filename_infolder, index=False)




In [None]:
# [2] Run the Simulation
simulation = NoiseSimulation(results_folder='results', 
                             results_filename='Noise_Simulation_Output_n1000.csv',
                             n_rounds=1000, 
                             features_range=[10,1000],
                             example_factor=200,
                             noise_factor_range=[0.01, 2.0], 
                             noise_factor_low=[0.01, 1.0],
                             noise_factor_high=[1.01, 2.0])
simulation.run()

