This notebook is used to perform a validation of methods used for multiple missing data imputation on a Cardiovascular Disease dataset.

In [None]:
import pandas as pd
import numpy as np
import copy
import os
import sys

# Add the parent directory to the system path
module_path = os.path.abspath(os.getcwd() + '\\..')
if module_path not in sys.path:
    sys.path.append(module_path)

RANDOM_STATE = 404

In [None]:
df = pd.read_csv('../data/cardio_train.csv', delimiter=';')
df.drop(columns=['id'], inplace=True)
df.head(5)

In [None]:
X = df.drop(columns=['cardio'])
y = df['cardio']

In [None]:
# Create an empty dictionary to store the results after each method
results_dict = {}

# Define the number of samples and the fraction of data to use
default_number_of_samples = 10
fraction_of_data = 0.1

## Data manipulation

#### Removing outliers

In [None]:
indices_to_remove = []

for index, row in X.iterrows():
    if X['ap_hi'].iloc[index] < 55 or X['ap_hi'].iloc[index] > 200 or X['ap_lo'].iloc[index] < 55 or X['ap_lo'].iloc[index] > 120 or X['height'].iloc[index] < 125 or X['height'].iloc[index] > 200:
        indices_to_remove.append(index)
        
X = X.drop(indices_to_remove)
y = y.drop(indices_to_remove)

# Reindexing
X = X.reset_index(drop=True)
y = y.reset_index(drop=True)

#### Standardization

In [None]:
from sklearn.preprocessing import StandardScaler, LabelEncoder

# Select columns to be scaled
numeric_columns = ['age', 'height', 'weight', 'ap_hi', 'ap_lo']
categorical_columns = ['gluc', 'smoke', 'alco', 'active', 'cholesterol', 'gender']

# Initialize the scalers
standard_scaler = StandardScaler()

# Fit and transform your data
X[numeric_columns] = standard_scaler.fit_transform(X[numeric_columns])
X[categorical_columns] = X[categorical_columns].apply(LabelEncoder().fit_transform)

X.head(5)

In [None]:
X.describe()

#### Visualization

In [None]:
import matplotlib.pyplot as plt

# Check if the file exists
file_path = "../images/without_missingness/density_plots.png"
if not os.path.isfile(file_path):
    # Set the size of the figure
    plt.figure(figsize=(15, 8))

    # Loop through each column in X and generate a density plot
    for i, feature_name in enumerate(X.columns):
        # Set the subplot and plot the density of the column
        plt.subplot(4, 4, i + 1)  # 4x4 grid, current subplot index
        X[feature_name].plot(kind='density', color='blue', label=feature_name)
        plt.title(feature_name, fontsize=32)
        plt.xlabel('Scaled Value')
        plt.ylabel('Density')

    # Adjust the layout of the subplots
    plt.tight_layout()

    # Save the figure as an image
    plt.savefig(file_path)

    # Display the figure
    plt.show()

In [None]:
import seaborn as sns

# Check if the file exists
file_path = "../images/without_missingness/correlation_matrix.png"
if not os.path.isfile(file_path):
    # Set the size of the figure
    plt.figure(figsize=(10, 8))

    # Draw correlation matrix
    sns.heatmap(X.corr(), annot=True, cmap='coolwarm', fmt=".2f", linewidths=.5)

    # Show the figure
    plt.title('Correlation Matrix')

    # Save the figure as an image
    plt.savefig(file_path)

    # Display the figure
    plt.show()

#### Splitting

In [None]:
from sklearn.model_selection import train_test_split

# Split into train and test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=RANDOM_STATE)
X_train.shape, X_test.shape

## Preparing 10 subsets with removed features

In [None]:
def remove_features(num_features_to_remove=None, feature_indices_to_remove=None):
    """
    Randomly removes features from a subset of data and replaces their values with NaN.
    
    Parameters:
        num_features_to_remove (int): Number of features to remove randomly.
        feature_indices_to_remove (array-like): Indices of features to remove.
        
    Returns:
        pandas.DataFrame: Subset of data with removed features and NaN values.
    """
    # Sample a subset of data
    subset = X_test.sample(frac=fraction_of_data, random_state=RANDOM_STATE).copy()
    # subset = X_test.copy()
    
    # Determine features to remove based on number or indices provided
    if feature_indices_to_remove is None:
        if num_features_to_remove is None:
            num_features_to_remove = np.random.randint(1, min(5, len(X_test.columns) - 1))
        else:
            features_to_remove = np.random.choice(subset.columns[:-1], num_features_to_remove, replace=False)
    else:
        features_to_remove = subset.columns[feature_indices_to_remove]
    
    # Replace values of selected features with NaN
    for feature in features_to_remove:
        subset[feature] = np.NaN
    
    return subset.astype('object')

list_of_subsets = []
subset_without_changes = X_test.sample(frac=fraction_of_data, random_state=RANDOM_STATE).copy()

# Generate subsets with varying numbers of removed features
for _ in range(3):
    list_of_subsets.append(remove_features(1))

for _ in range(3):
    list_of_subsets.append(remove_features(2))

for _ in range(3):
    list_of_subsets.append(remove_features(3))

for _ in range(1):
    list_of_subsets.append(remove_features(4))

# Print information about subsets and their missing columns
print(f'Subsets with {list_of_subsets[0].shape[0]} datapoints and their columns with missing values:')
for subset_index, current_row in enumerate(list_of_subsets):
    nan_columns = current_row.columns[current_row.isnull().all()]
    print(f"Subset {subset_index+1}: {', '.join(nan_columns)}")

## Shared functions

### Imputation

##### Simple Imputer

In [None]:
from sklearn.impute import SimpleImputer

def simple_impute(current_subset):
    """
    Impute missing values using SimpleImputer with mean strategy.

    Parameters:
        current_subset (pandas.DataFrame): Subset of data with missing values.

    This function iterates over each column in the given DataFrame and imputes missing values using
    the SimpleImputer class from scikit-learn. The imputer is initialized with the 'mean' strategy,
    which replaces missing values with the mean of the non-missing values in the column.
    """

    # Create a SimpleImputer object with 'mean' strategy
    imp = SimpleImputer(missing_values=np.nan, strategy='mean')

    # Iterate over each column in the DataFrame
    for col in current_subset.columns:
        # Check if any values in the column are missing
        if pd.isna(current_subset[col]).any():
            # Fit the imputer to the non-missing values in the column
            imp.fit(X_test[[col]])
            # Transform the missing values in the column
            current_subset[col] = imp.transform(current_subset[[col]])
            
            # Approximate categorical values to the nearest whole number
            if col in categorical_columns:
                current_subset[col] = np.round(current_subset[col])

#### Multivariate normal distribution & cGMM

In [None]:
from helpers.ConditionalGMM.condGMM import CondGMM

def cgmm_impute(gmm, missing_features_indices, current_row, number_of_samples):
    """
    Impute missing values using Conditional GMM, returning parameters of the predictive distribution.
    
    Parameters:
        gmm (GMM): Gaussian Mixture Model.
        missing_features_indices (list): Indices of missing features.
        current_row (pandas.Series): Current row with missing values.
        number_of_samples (int): Number of samples to generate.
        
    Returns:
        dict: A dictionary containing samples, means ('mu'), and covariances ('sigma').
    """
    # Find indices of known features
    known_features_indices = [i for i in range(len(current_row)) if i not in missing_features_indices]
    
    # Extract values of known features for the given row
    known_features_values = current_row.iloc[known_features_indices].values
    
    # Initialize CondGMM
    cGMM = CondGMM(gmm.weights_, gmm.means_, gmm.covariances_, known_features_indices)
    
    # Generate samples using Conditional GMM
    generated_samples = cGMM.rvs(known_features_values, size=number_of_samples, random_state=RANDOM_STATE)
    
    # Extract mean and covariance for the conditional distribution
    mus = cGMM.conditional_component_means(known_features_values)
    c_weights = cGMM.conditional_weights(known_features_values)
    c_weights = c_weights[:, np.newaxis]  # Ensure weights are aligned for broadcasting
    mu = np.sum(c_weights * mus, axis=0)  # Weighted sum across the correct axis
    sigma = cGMM.conditional_component_covs()
    
    # Replace NaN values with 0, if there are any
    if any(np.isnan(x) for x in mu):
        mu = np.nan_to_num(mu)

    return {
        "samples": generated_samples,
        "mu": mu,
        "sigma": sigma
    }

#### VAEAC & GAIN

In [None]:
def vaeac_gain_impute(model, missing_features_indices, current_row, number_of_samples):
    """
    Impute missing values using a Variational Autoencoder or Generative Adversarial Imputation Network.

    Args:
        model (keras.Model): Trained Variational Autoencoder or GAIN model.
        missing_features_indices (list): Indices of missing features.
        current_row (pandas.Series): Current row with missing values.
        number_of_samples (int): Number of samples to generate.

    Returns:
        list: Generated data with imputed missing values.
    """
    generated_samples = np.empty((number_of_samples, len(missing_features_indices)))
    
    # Repeat the prediction process for the specified number of samples
    for i in range(number_of_samples):
        # Impute missing values for each feature index
        # Reshape the missing values to a 2D array with one row and all missing features
        missing_features_values = model.predict(current_row.values.reshape(1, -1).astype(np.float32), verbose=0)
        
        # Store the generated data in the array
        generated_samples[i] = missing_features_values[:, missing_features_indices]
    
    return {
        "samples": generated_samples
    }

#### Impute multiple missing data function

In [None]:
def round_and_clip(value, min_val, max_val):
    # Round to the nearest integer
    rounded_value = round(value)
    # Clip to the range [min_val, max_val]
    clipped_value = max(min_val, min(rounded_value, max_val))
    return int(clipped_value)

def cluster_categorical_features(value, col_index):
    col_name = X.columns[col_index]
    min_value = X[col_name].min()
    max_value = X[col_name].max()
    return round_and_clip(value, min_value, max_value)

In [None]:
import json

def imputing_missing_data(subsets, method='simple', number_of_samples=default_number_of_samples, model=None):
    """
    Impute missing data in subsets using different imputation methods.
    
    Parameters:
        subsets (list): List of subsets of data.
        method (str): Imputation method ('simple', 'multivariate', 'cgmm', 'vaeac', or 'gain').
        model: Trained model for certain imputation methods.
    """
    for subset_index, subset in enumerate(subsets):
        if method == 'simple':
            # Simple Imputer
            generated_data = simple_impute(subset)
        else:
            # Initialize to keep track of actual row index, because indices were shuffled
            row_in_subset_index = 0
            
            for row_index, row in subset.iterrows():
                # Get indices of unknown features
                missing_features_indices = [row.index.get_loc(col) for col in row.index if pd.isna(row[col])]
                
                # If all features are known, continue   
                if len(missing_features_indices) == 0:
                    continue
                
                generated_data = None
                
                if method == 'multivariate' or method == 'cgmm':
                    # Multivariate Imputer or Conditional GMM
                    generated_data = cgmm_impute(model, missing_features_indices, row, number_of_samples)
                elif method == 'vaeac' or method == 'gain':
                    # Variational AutoEncoder or Generative Adversarial Imputation Network
                    generated_data = vaeac_gain_impute(model, missing_features_indices, row, number_of_samples)
                    
                # Update unknown features with sampled data
                for feature_index in range(len(missing_features_indices)):
                    # Check if generated_data is a dictionary
                    if isinstance(generated_data, dict):
                        if 'mu' in generated_data and 'sigma' in generated_data:
                            # Convert mu and sigma to lists if they are numpy arrays
                            mu = generated_data['mu'].tolist() if isinstance(generated_data['mu'], np.ndarray) else generated_data['mu']
                            sigma = generated_data['sigma'].tolist() if isinstance(generated_data['sigma'], np.ndarray) else generated_data['sigma']
                            samples = [sample[feature_index] for sample in generated_data['samples']]
                            
                            # Apply rounding and clipping to categorical features
                            if X.columns[missing_features_indices[feature_index]] in categorical_columns:
                                samples = [cluster_categorical_features(sample, missing_features_indices[feature_index]) for sample in samples]
                            
                            data_to_insert = json.dumps({
                                "samples": samples,
                                "mu": mu,
                                "sigma": sigma
                            })
                        else:
                            samples = [sample[feature_index] for sample in generated_data['samples']]
                            data_to_insert = json.dumps({
                                "samples": samples
                            })
                        
                        subset.at[row_index, subset.columns[missing_features_indices[feature_index]]] = data_to_insert
                
                row_in_subset_index += 1

### Scoring

In [None]:
from scipy.stats import norm
from properscoring import crps_ensemble

def get_scoring(subsets, method='simple', print_results=False):
    """
    Calculate scores (NMSE, Log Score, and CRPS) for features in subsets of data.
    
    Parameters:
        subsets (list): List of subsets of data.
        method (str): Imputation method ('simple', 'multivariate', 'cgmm', 'vaeac', or 'gain').
        print_results (bool): Whether to print scores or not.
        
    Returns:
        dict: Dictionary containing scores for each feature in the subsets organized by score type.
    """
    # Convert method to lowercase for case-insensitive comparison
    method = method.lower()
    
    # Deserialize any strings in subsets
    for subset in subsets:
        for col in subset.columns:
            subset[col] = subset[col].apply(lambda x: json.loads(x) if isinstance(x, str) else x)
    
    all_subsets_scores = {}  # Dictionary to store scores for each subset
    
    # Iterate through subsets
    for subset_index, subset in enumerate(subsets):
        feature_scores = {}  # Dictionary for each type of score per feature
        
        # Identify features with missing values
        if method == 'simple':
            missing_features_indices = [col_index for col_index, col in enumerate(list_of_subsets[subset_index].columns) if list_of_subsets[subset_index][col].isnull().all()]
        else:
            missing_features_indices = [col_index for col_index, col in enumerate(subset.columns) if subset[col].apply(lambda x: isinstance(x, (list, dict))).any()]

        if not missing_features_indices:
            continue  # Skip if no missing values
        
        # Calculate scores for each row in the subset
        for row_index, row in subset.iterrows():
            try:
                original_values = X.iloc[row_index, missing_features_indices].values
            except Exception as e:
                print(e)
            
            # Calculate scores for each feature with missing values
            for feature_index, col_index in enumerate(missing_features_indices):
                feature_name = subset.columns[col_index]
                generated_samples = row.iloc[col_index]
                original_value = original_values[feature_index]
                
                # Initialize dictionary only with NMSE, others will be added as needed
                if feature_name not in feature_scores:
                    feature_scores[feature_name] = {'nmse': []}
                
                # If generated samples is not a dictionary, wrap it in a list
                if not isinstance(generated_samples, dict):
                    generated_samples = {'samples': [generated_samples]}

                # Calculate Log Score
                if 'mu' in generated_samples and 'sigma' in generated_samples:
                    epsilon = 1e-10
                    
                    # Read mean and covariance
                    mu = np.mean(generated_samples['mu']) if isinstance(generated_samples['mu'], (list, np.ndarray)) else generated_samples['mu']
                    sigma = np.mean(generated_samples['sigma']) if isinstance(generated_samples['sigma'], (list, np.ndarray)) else generated_samples['sigma']
                    sigma = max(sigma, epsilon)  # Ensure sigma is positive
                    
                    # Check for NaN values
                    if np.isnan(mu) or np.isnan(sigma):
                        continue
                    
                    # Log score calculation
                    log_score = -np.log(norm.pdf(original_value, loc=mu, scale=sigma) + epsilon)
                    
                    feature_scores[feature_name].setdefault('log_score', []).append(log_score)
                
                # Calculate CRPS for ensemble predictions
                elif 'samples' in generated_samples:
                    samples_size = len(generated_samples['samples'])
                    original_value_array = np.repeat(original_value, samples_size)
                    crps_score = crps_ensemble(original_value_array, generated_samples['samples'])
                    feature_scores[feature_name].setdefault('crps', []).append(crps_score)
                
                # Calculate Mean Squared Error
                squared_errors = [(original_value - x)**2 for x in generated_samples['samples']]
                feature_scores[feature_name]['nmse'].append(squared_errors)
        
        # Average scores for each feature
        for feature_name, scores in feature_scores.items():
            for score_type, values in scores.items():
                if values:
                    # Check for NaN values
                    if np.isnan(values).any():
                        raise Exception("NaN values found in scores!")
                    
                    # Calculate mean score
                    mean_score = np.mean(values)
                    
                    if score_type == 'nmse':
                        variance = np.var(X_test[feature_name])
                        
                        # Confirm that variance is not 0, handle it
                        if variance == 0:
                            if mean_score != 0:
                                raise Exception("Mean Squared Error cannot be different than 0 when variance is equal to 0!")
                            else:
                                mean_score = 0
                        else:
                            mean_score = mean_score / variance
                            
                    # Round the mean score to 3 decimal places
                    feature_scores[feature_name][score_type] = np.round(mean_score, 3)
        
        all_subsets_scores[subset_index] = feature_scores
        
        # Print scores if required
        if print_results:
            print(f"Scores for Subset {subset_index + 1}:")
            for feature_name, scores in feature_scores.items():
                print(f"Feature {feature_name}: ", end="")
                for score_type, score_value in scores.items():
                    print(f"{score_type.upper()} = {score_value}, ", end="")
                print()  # New line for each feature
            
    return all_subsets_scores


### Classification

In [None]:
from joblib import load
from sklearn.metrics import accuracy_score, roc_auc_score
import warnings

warnings.filterwarnings('ignore', message="X does not have valid feature names")

# Load the trained classifier model
classifier = load('..\helpers\predictive_models\cardio_classifier.h5')

def get_classification_result(subsets, method='simple', should_print=False):
    """
    Calculate accuracy and AUC scores for subsets of data using a trained classifier.
    
    Parameters:
        subsets (list): List of subsets of data.
        method (str): Imputation method ('simple', 'multivariate', 'cgmm', 'vaeac', or 'gain').
        should_print (bool): Whether to print AUC scores or not.
        
    Returns:
        list: List of accuracy and AUC scores for each subset.
    """
    # Convert method to lowercase for case-insensitive comparison
    method = method.lower()
    
    # Deserialize any strings in subsets
    for subset in subsets:
        for col_index in subset.columns:
            subset[col_index] = subset[col_index].apply(lambda x: json.loads(x) if isinstance(x, str) else x)
    
    # Initialize lists to store classification results and scores
    classification_results = []
    accuracy_per_subset = []
    auc_per_subset = []

    # Iterate through subsets
    for subset_index, subset in enumerate(subsets):
        subset_results = []
        
        # Iterate through rows in the subset DataFrame
        for row_index, row in subset.iterrows():
            output_probs = []
            
            # Process each row based on the method used
            if method != 'simple':
                serialized_arrays = []
                non_serialized_values = []
                
                # Split row values into serialized arrays and non-serialized values
                for col_index, value in enumerate(row):
                    if isinstance(value, dict):
                        serialized_arrays.append((col_index, value['samples']))
                    else:
                        non_serialized_values.append((col_index, value))
                
                # Generate combined rows by combining serialized arrays with non-serialized values
                for i in range(default_number_of_samples):
                    combined_row = non_serialized_values.copy()
                    
                    # For each serialized array, append the corresponding value at the current index
                    for col_index, serialized_array in serialized_arrays:
                        assert len(serialized_array) == default_number_of_samples
                        combined_row.append((col_index, serialized_array[i]))
                    
                    # Set the combined row in a correct order of features
                    combined_row_array = np.zeros(shape=len(combined_row))
                    for col_index, value in combined_row:
                        combined_row_array[col_index] = value

                    
                    output_probs.append(combined_row_array)
            else:
                output_probs.append(row.values.tolist())
            
            # Predict probabilities
            predicted_probs = classifier.predict_proba(np.vstack(output_probs))
            subset_results.append(predicted_probs)
        
        classification_results.append(subset_results)

    # Create an empty list to store dictionaries of results
    results_list = []

    # Calculate AUC scores and accuracy for each subset
    for subset_index, subset_results in enumerate(classification_results):
        true_labels = y.loc[subsets[subset_index].index]
        
        subset_predicted_probs = []
        
        # Determine predicted probabilities for each row in the subset
        for output_probs in subset_results:
            predicted_prob = np.mean(output_probs[:, 1])
            subset_predicted_probs.append(predicted_prob)
        
        # Convert probabilities to binary predictions based on the threshold
        subset_predictions = [1 if prob > 0.5 else 0 for prob in subset_predicted_probs]
        
        subset_accuracy = accuracy_score(true_labels, subset_predictions)
        subset_auc = roc_auc_score(true_labels, subset_predicted_probs)
        
        accuracy_per_subset.append(round(subset_accuracy, 2))
        auc_per_subset.append(round(subset_auc, 2))
        
        # Append results to the list of dictionaries
        results_list.append({'Accuracy': np.round(subset_accuracy * 100, 2), 'AUC': np.round(subset_auc, 3)})

    # Convert the list of dictionaries to a DataFrame
    results_table = pd.DataFrame(results_list)

    # Print classification scores in a table
    if should_print:
        print(results_table)
        
    return accuracy_per_subset, auc_per_subset


## SimpleImputer with mean strategy

#### Preparation

In [None]:
imputer_subsets = copy.deepcopy(list_of_subsets)

#### Imputation

In [None]:
imputing_missing_data(imputer_subsets, 'simple')

imputer_subsets[0].head(5)

#### Scoring

In [None]:
simple_imputer_score = get_scoring(imputer_subsets, 'simple', True)

#### Classification

In [None]:
simple_imputer_accuracy, simple_imputer_auc = get_classification_result(imputer_subsets, 'simple')

In [None]:
results_dict['simple_imputer'] = {'score': simple_imputer_score, 'accuracy': simple_imputer_accuracy, 'auc': simple_imputer_auc}

## Multivariate normal distribution

#### Preparation

In [None]:
multivariate_subsets = copy.deepcopy(list_of_subsets)

In [None]:
from sklearn.mixture import GaussianMixture

# Create Gaussian Mixture Model with a single component
gmm = GaussianMixture(n_components=1, random_state=RANDOM_STATE)
gmm.fit(X_train)

#### Imputation

In [None]:
imputing_missing_data(multivariate_subsets, 'multivariate', default_number_of_samples, gmm)

multivariate_subsets[0].head(5)

#### Scoring

In [None]:
multivariate_score = get_scoring(multivariate_subsets, 'multivariate', True)

#### Classification

In [None]:
multivariate_accuracy, multivariate_auc = get_classification_result(multivariate_subsets, 'multivariate', True)

In [None]:
results_dict['multivariate'] = {'score': multivariate_score, 'accuracy': multivariate_accuracy, 'auc': multivariate_auc}

## Conditional GMM

#### Preparation

In [None]:
cgmm_subsets = copy.deepcopy(list_of_subsets)

In [None]:
import matplotlib.pyplot as plt

def compute_bic(data, n_components_range):
    """
    Computes the Bayesian Information Criterion (BIC) for Gaussian Mixture Models
    with different numbers of components.

    Parameters:
        data (array-like): Input data.
        n_components_range (range): Range of number of components to evaluate.

    Returns:
        list: BIC values for each number of components.
    """
    # List to store BIC values
    bic = []
    
    # Loop through number of components and compute BIC for each
    for n_components in n_components_range:
        # Create Gaussian Mixture Model with specified number of components
        gmm = GaussianMixture(n_components=n_components, random_state=RANDOM_STATE)
        gmm.fit(data)  # Fit the model to the data
        bic.append(gmm.bic(data))  # Calculate BIC and add to list
    
    return bic  # Return list of BIC values

# Used to simplify getting optimal number of components based on previous run
optimal_n_components = None

if optimal_n_components is None:
    # Range of number of components to evaluate
    n_components_range = range(1, 51)

    # Compute BIC values
    bic_values = compute_bic(X_train, n_components_range)

    # Optimal number of components
    optimal_n_components = n_components_range[np.argmin(bic_values)]

    # Plotting BIC values
    plt.plot(n_components_range, bic_values, marker='o', label='BIC Values')
    plt.xlabel('Number of Components')
    plt.ylabel('BIC Value')
    plt.title('BIC for Gaussian Mixture Models')
    plt.grid(True)

    # Add legend
    plt.legend()
    plt.savefig('../images/without_missingness/bic.png')
    plt.show()

In [None]:
# Create Gaussian Mixture Model with optimal number of components
gmm = GaussianMixture(n_components=optimal_n_components, random_state=RANDOM_STATE)
gmm.fit(X_train)

#### Imputation

In [None]:
imputing_missing_data(cgmm_subsets, 'cgmm', default_number_of_samples, gmm)

cgmm_subsets[0].head(5)

#### Scoring

In [None]:
cgmm_score = get_scoring(cgmm_subsets, 'cgmm', True)

#### Classification

In [None]:
cgmm_accuracy, cgmm_auc = get_classification_result(cgmm_subsets, 'cgmm')

In [None]:
results_dict['cgmm'] = {'score': cgmm_score, 'accuracy': cgmm_accuracy, 'auc': cgmm_auc}

## Variational Autoencoder with Arbitrary Conditioning

#### Preparation

In [None]:
vaeac_subsets = copy.deepcopy(list_of_subsets)

In [None]:
from sklearn.decomposition import TruncatedSVD

# Perform Singular Value Decomposition (SVD) on training data
svd = TruncatedSVD(n_components=min(X_train.shape), random_state=RANDOM_STATE)
svd.fit(X_train)

# Calculate cumulative explained variance
explained_variance_ratio = svd.explained_variance_ratio_
cumulative_variance_ratio = np.cumsum(explained_variance_ratio)

# Choose threshold to preserve 90% of total variance
threshold_index = np.argmax(cumulative_variance_ratio >= 0.90)
threshold = svd.singular_values_[threshold_index]

print(f"Starting threshold to preserve 90% of total variance: {threshold}")

# Analyze singular values
singular_values = svd.singular_values_
num_non_trivial = np.sum(singular_values > threshold)  # Choose a threshold to determine non-trivial singular values

# Select latent space dimensionality
latent_dim = num_non_trivial

print(f"Number of non-trivial singular values: {num_non_trivial}")
print(f"Selected latent space dimensionality: {latent_dim}")

In [None]:
# Check if the file exists
file_path = "../images/without_missingness/cumulative_explained_variance.png"

if not os.path.isfile(file_path):
    # Plot cumulative explained variance
    plt.figure(figsize=(10, 6))
    plt.plot(np.arange(1, len(cumulative_variance_ratio) + 1), cumulative_variance_ratio, marker='o', linestyle='-')
    plt.title('Cumulative Explained Variance')
    plt.xlabel('Number of Components')
    plt.ylabel('Cumulative Explained Variance')
    plt.axhline(y=0.9, color='r', linestyle='--', label='90% Explained Variance')
    plt.axvline(x=threshold_index + 1, color='g', linestyle='--', label=f'Threshold Component ({threshold_index + 1})')
    plt.legend()
    plt.grid(True)
    plt.savefig('../images/without_missingness/cumulative_explained_variance.png')
    plt.show()

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Lambda
from tensorflow.keras.models import Model

input_dim = X_train.shape[1]
inputs = Input(shape=(input_dim,))
encoded = inputs
encoded = Dense(128, activation='relu')(encoded)
encoded = Dense(64, activation='relu')(encoded)
z_mean = Dense(latent_dim)(encoded)
z_log_var = Dense(latent_dim)(encoded)

# Reparameterization trick
def sampling(args):
    z_mean, z_log_var = args
    epsilon = tf.random.normal(shape=(tf.shape(z_mean)[0], latent_dim))
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon

z = Lambda(sampling)([z_mean, z_log_var])

# Define the decoder
decoded = z
decoded = Dense(64, activation='relu')(decoded)
decoded = Dense(128, activation='relu')(decoded)
outputs = Dense(input_dim)(decoded)

# Create the VAE model
vaeac = Model(inputs, outputs)

# Compile the model
vaeac.compile(optimizer='adam', loss='mse')  # Use MSE as the reconstruction loss

# Train the model
history = vaeac.fit(X_train, X_train, epochs=10, batch_size=32, verbose=1)

#### Imputation

In [None]:
imputing_missing_data(vaeac_subsets, 'vaeac', default_number_of_samples, vaeac)

vaeac_subsets[0].head(5)

#### Scoring

In [None]:
vaeac_score = get_scoring(vaeac_subsets, 'vaeac')

#### Classification

In [None]:
vaeac_accuracy, vaeac_auc = get_classification_result(vaeac_subsets, 'vaeac')

In [None]:
results_dict['vaeac'] = {'score': vaeac_score, 'accuracy': vaeac_accuracy, 'auc': vaeac_auc}

## Generative Adversarial Imputation Network

#### Preparation

In [None]:
gain_subsets = copy.deepcopy(list_of_subsets)

In [None]:
gain = load('..\helpers\generative_models\cardio_gain_generator.h5')

#### Imputation

In [None]:
imputing_missing_data(gain_subsets, 'gain', default_number_of_samples, gain)

gain_subsets[0].head(5)

#### Scoring

In [None]:
gain_score = get_scoring(gain_subsets, 'gain')

#### Classification

In [None]:
gain_accuracy, gain_auc = get_classification_result(gain_subsets, 'gain')

In [None]:
results_dict['gain'] = {'score': gain_score, 'accuracy': gain_accuracy, 'auc': gain_auc}

## Comparison of results

In [None]:
from tabulate import tabulate

# Define the directory where results are stored
results_directory = '..\\results\\without_missingness'

# Get the list of existing result files to determine the next run number
existing_files = os.listdir(results_directory)
run_numbers = [int(file.split("_")[1].split(".")[0]) for file in existing_files if file.startswith("run_")]

# Determine the next run number
next_run_number = max(run_numbers, default=0) + 1

# Create tables for accuracy, AUC, and scores
accuracy_table = [[""] + list(results_dict.keys())]
auc_table = [[""] + list(results_dict.keys())]
score_table = [[""] + list(results_dict.keys())]

for i in range(len(next(iter(results_dict.values()))["accuracy"])):
    accuracy_row = [i+1] + [results_dict[key]["accuracy"][i] for key in results_dict.keys()]
    auc_row = [i+1] + [results_dict[key]["auc"][i] for key in results_dict.keys()]
    score_row = [i+1] + [results_dict[key]["score"].get(i, "") for key in results_dict.keys()]
    accuracy_table.append(accuracy_row)
    auc_table.append(auc_row)
    score_table.append(score_row)

# Generate the tabulated strings for accuracy, AUC, and scores
tabulated_accuracy_table = tabulate(accuracy_table, headers="firstrow", tablefmt="grid")
tabulated_auc_table = tabulate(auc_table, headers="firstrow", tablefmt="grid")
tabulated_score_table = tabulate(score_table, headers="firstrow", tablefmt="grid")

# Define the file name for the new result
new_file_name = f"run_{next_run_number}.txt"
file_path = os.path.join(results_directory, new_file_name)

# Save accuracy, AUC, and score tables to the same file with separation
with open(file_path, "w") as file:
    file.write("Number of datapoints: " + str(np.round(X_train.shape[0] * fraction_of_data)) + "\n" + "Number of samples: " + str(default_number_of_samples) + "\n\n")
    file.write("Accuracy:\n")
    file.write(tabulated_accuracy_table + "\n\n")
    file.write("AUC:\n")
    file.write(tabulated_auc_table + "\n\n")
    file.write("Scores:\n")
    file.write(tabulated_score_table)

print(f"Results saved to {new_file_name}.")