In [None]:
### Package Imports ###
import tensorflow as tf
import keras
import numpy as np
import pandas as pd

import matplotlib
import matplotlib.pyplot as plt

import os
import time
import h5py
import functools
import mitdeeplearning as mdl
from tqdm import tqdm

In [None]:
### Package Setup ###
start_time = time.localtime(time.time())

CWD = os.getcwd()
# print(CWD)

matplotlib.rcParams['font.family'] = "Times New Roman"

### Ensure training on GPU ###
assert len(tf.config.list_physical_devices('GPU')) > 0
physical_devices = tf.config.experimental.list_physical_devices('GPU')
if physical_devices:
    for device in physical_devices:
        tf.config.experimental.set_memory_growth(device, True)

# Data Loading

In [None]:
### Functions: Load and Visualize Dataset ###

### Function: load dataset ###
@keras.saving.register_keras_serializable(package='capstone',name='load_dataset')
def load_dataset(path_to_training_data):
    with h5py.File(path_to_training_data) as f:
        # Print the keys (names) of all groups and datasets in the file
        print("Keys:", list(f.keys()))

        # Iterate through each key and print more detailed information
        for key in f.keys():
            if isinstance(f[key], h5py.Dataset):
                print(f"Dataset: {key}")
                print("  Shape:", f[key].shape)
                print("  Data type:", f[key].dtype)
                
    ### Instantiate Loader Function ###
    return mdl.lab2.TrainingDatasetLoader(path_to_training_data)

### Function: visualize dataset ###
@keras.saving.register_keras_serializable(package='capstone',name='visualize_dataset')
def visualize_dataset(loader):
    ### Visualize our data ###
    number_of_training_examples = loader.get_train_size()
    print(number_of_training_examples)
    (images, labels) = loader.get_batch(100)
    malignant_images = images[np.where(labels==1)[0]]
    benign_images = images[np.where(labels==0)[0]]

    idx_malignant = 23
    idx_benign = 9

    plt.figure(figsize=(5,5))
    plt.subplot(1, 2, 1)
    plt.imshow(malignant_images[idx_malignant])
    plt.title("Malignant"); plt.grid(False)

    plt.subplot(1, 2, 2)
    plt.imshow(benign_images[idx_benign])
    plt.title("Benign"); plt.grid(False)

In [None]:
### Instantiate Loaders: train/val 90/10 ###
loader_ISIC = load_dataset(f'{CWD}/datasets/split-90train-10val/train_ISIC.h5')
loader_ISIC_DiDI = load_dataset(f'{CWD}/datasets/split-90train-10val/train_ISIC_DiDI.h5')
loader_ISIC_ArGI = load_dataset(f'{CWD}/datasets/split-90train-10val/train_ISIC_ArGI.h5')

loader_val_ISIC = load_dataset(f'{CWD}/datasets/split-90train-10val/val_ISIC.h5')
loader_val_ISIC_DiDI = load_dataset(f'{CWD}/datasets/split-90train-10val/val_ISIC_DiDI.h5')
loader_val_ISIC_ArGI =  load_dataset(f'{CWD}/datasets/split-90train-10val/val_ISIC_ArGI.h5')

In [None]:
# ### Visualize Training Datasets ###
# visualize_dataset(loader_ISIC)
# visualize_dataset(loader_ISIC_DiDI)
# visualize_dataset(loader_ISIC_ArGI)

# Create Model Architectures

In [None]:
### Standard CNN ###

"""
Instantiation helper functions:
- resize_images: used in lambda function layer to resize the input images
"""
@keras.saving.register_keras_serializable(package='capstone', name='resize_images')
def resize_images(x):
    return tf.image.resize(x, (64, 64))

"""
Instantiation function:
- make_standard_ResNet50_V2: instantiates a new keras model with the ResNet50v2 CNN architecture. 
"""
@keras.saving.register_keras_serializable(package='capstone', name='make_standard_ResNet50_V2')
def make_standard_ResNet50_V2(n_outputs = 1):
    
    Resize = tf.keras.layers.Lambda(resize_images)
    Flatten = tf.keras.layers.Flatten
    Dense = functools.partial(tf.keras.layers.Dense, activation='relu')
    ResNet50V2 = tf.keras.applications.ResNet50V2(
        include_top=False,
        weights="imagenet", # Utilizing Transfer Learning, also maintains consistency
        input_tensor=None,
        input_shape=(64,64,3),
        pooling=None,
        classes=1000,
        classifier_activation="softmax",
    )
    ResNet50V2 = tf.keras.Model(inputs = ResNet50V2.layers[1].input, 
                                outputs = ResNet50V2.layers[-1].output)

    model = tf.keras.Sequential()
    
    model.add(Resize)
    model.add(ResNet50V2)
    model.add(Flatten())
    model.add(Dense(512))
    model.add(Dense(n_outputs, activation=None))

    return model

In [None]:
### DB-VAE ###

"""
Instantiation helper functions:
- make_decoder_network: creates decoder section of the debiasing-variational autoencoder. 
    Structure is akin to ResNet50v2 except inverted.
- sampling: VAE reparameterization trick (Amini 2024)
- vae_loss_function: as its name suggests, a VAE loss function, also a helper function for debiasing_loss_function
- debiasing_loss_function: as its name suggests, a loss function for the debiasing portion of the DB-VAE

"""
@keras.saving.register_keras_serializable(package='capstone', name='make_decoder_network')
def make_decoder_network(latent_dim = 100, n_filters = 12 ):
    """
    Layer Types, Functional Definition
    """
    Conv2DTranspose = functools.partial(tf.keras.layers.Conv2DTranspose, padding='same', activation='relu')
    Dense = functools.partial(tf.keras.layers.Dense, activation='relu')
    Reshape = tf.keras.layers.Reshape 
    BatchNormalization = tf.keras.layers.BatchNormalization
    LeakyReLU = tf.keras.layers.LeakyReLU
    # Decoder
    decoder = tf.keras.Sequential([
        Dense(units=4*4*6*n_filters),
        Reshape(target_shape=(4,4,6*n_filters)),

        Conv2DTranspose(256, (4, 4), strides=(2, 2), padding='same'),
        BatchNormalization(),
        LeakyReLU(alpha=0.2),

        Conv2DTranspose(128, (4, 4), strides=(2, 2), padding='same'),
        BatchNormalization(),
        LeakyReLU(alpha=0.2),

        Conv2DTranspose(64, (4, 4), strides=(2, 2), padding='same'),
        BatchNormalization(),
        LeakyReLU(alpha=0.2),

        Conv2DTranspose(3, (4, 4), strides=(2, 2), padding='same', activation='sigmoid')
    ])
    return decoder

@keras.saving.register_keras_serializable(package='capstone', name='sampling_VAE_reparameterization')
def sampling(z_mean, z_logsigma):
    batch, latent_dim = z_mean.shape
    epsilon = tf.random.normal(shape=(batch, latent_dim))
    z = z_mean + tf.math.exp(0.5 * z_logsigma) * epsilon
    return z

@keras.saving.register_keras_serializable(package='capstone', name='vae_loss_function')
def vae_loss_function(x, x_recon, mu, logsigma, kl_weight=0.0005):
  latent_loss = 0.5 * tf.reduce_sum(tf.exp(logsigma) + tf.square(mu) - 1.0 - logsigma, axis=1)
  reconstruction_loss = tf.reduce_mean(tf.abs(x-x_recon), axis=(1,2,3))
  vae_loss = kl_weight * latent_loss + reconstruction_loss
  return vae_loss

@keras.saving.register_keras_serializable(package='capstone',name='debiasing_loss_function')
def debiasing_loss_function(x, x_pred, y, y_logit, mu, logsigma):
  vae_loss = vae_loss_function(x, x_pred, mu, logsigma)
  classification_loss = tf.nn.sigmoid_cross_entropy_with_logits(labels=y, logits=y_logit)
  malignance_indicator = tf.cast(tf.equal(y, 1), tf.float32)
  total_loss = tf.reduce_mean(
      classification_loss +
      malignance_indicator * vae_loss
  )
  return total_loss, classification_loss

"""
Instantiation:
Class definition of the DB_VAE
"""
@keras.saving.register_keras_serializable(package='capstone')
class DB_VAE(tf.keras.Model):
  def __init__(self, latent_dim):
    super(DB_VAE, self).__init__()
    self.latent_dim = latent_dim

    # Define the number of outputs for the encoder. Recall that we have
    # `latent_dim` latent variables, as well as a supervised output for the
    # classification.
    num_encoder_dims = 2*self.latent_dim + 1

    self.encoder = make_standard_ResNet50_V2(num_encoder_dims)
    self.decoder = make_decoder_network()

  def encode(self, x):
    encoder_output = self.encoder(x)
    y_logit = tf.expand_dims(encoder_output[:, 0], -1)
    z_mean = encoder_output[:, 1:self.latent_dim+1]
    z_logsigma = encoder_output[:, self.latent_dim+1:]

    return y_logit, z_mean, z_logsigma

  def reparameterize(self, z_mean, z_logsigma):
    z = sampling(z_mean, z_logsigma)
    return z

  def decode(self, z):
    reconstruction = self.decoder(z)
    return reconstruction

  def call(self, x):
    y_logit, z_mean, z_logsigma = self.encode(x)
    z = self.reparameterize(z_mean, z_logsigma)
    recon = self.decode(z)
    return y_logit, z_mean, z_logsigma, recon

  def predict(self, x):
    y_logit, z_mean, z_logsigma = self.encode(x)
    return y_logit
  

"""
Training helper functions:
- get_latent_mu: finds mean of latent distribution
- get_training_sample_probabilities: recomputes the sampling probabilities for images within a batch 
    based on how they distribute across the training data
"""
# Function to return the means for an input image batch
@keras.saving.register_keras_serializable(package='capstone',name='get_latent_mu')
def get_latent_mu(images, dbvae, batch_size=1024, latent_dim=100):
    N = images.shape[0]
    mu = np.zeros((N, latent_dim))
    for start_ind in range(0, N, batch_size):
        end_ind = min(start_ind+batch_size, N+1)
        batch = (images[start_ind:end_ind]).astype(np.float32)/255.
        _, batch_mu, _ = dbvae.encode(batch)
        mu[start_ind:end_ind] = batch_mu
    return mu

@keras.saving.register_keras_serializable(package='capstone',name='get_training_sample_probabilities')
def get_training_sample_probabilities(images, dbvae, bins=10, smoothing_fac=0.001, latent_dim=100):
    print("Recomputing the sampling probabilities")
    mu = get_latent_mu(images, dbvae)
    training_sample_p = np.zeros(mu.shape[0])
    for i in range(latent_dim):
        latent_distribution = mu[:,i]
        hist_density, bin_edges =  np.histogram(latent_distribution, density=True, bins=bins)
        bin_edges[0] = -float('inf')
        bin_edges[-1] = float('inf')
        bin_idx = np.digitize(latent_distribution, bin_edges)
        hist_smoothed_density = hist_density + smoothing_fac
        hist_smoothed_density = hist_smoothed_density / np.sum(hist_smoothed_density)
        p = 1.0/(hist_smoothed_density[bin_idx-1])
        p = p / np.sum(p)
        training_sample_p = np.maximum(p, training_sample_p)
    training_sample_p /= np.sum(training_sample_p)

    return training_sample_p

In [None]:
### Set Training Hyperparameters ###

### Hyperparameters for CNN Training ###
params_CNN = dict( 
  batch_size = 32,
  num_epochs = 25,
  learning_rate = 1e-2, # 5e-4 was initial rate set by Amini
  min_lr = 1e-5, # for adaptive learning rate
  factor = 0.8, # for adaptive learning rate
  patience_lr = 5, # for adaptive learning rate
  patience_stop = 10, # for early stopping
  optimizer = 'SGD', # 'Adam' or 'SGD'
)

### Hyperparameters for DB-VAE Training ###
params_DB_VAE = dict(
    batch_size = 32,
    num_epochs = 25, 
    learning_rate = 5e-4,
    latent_dim = 100,
    optimizer = 'SGD', # 'Adam' or 'SGD'
)

# Training

In [None]:
@keras.saving.register_keras_serializable(package='capstone',name='graph_metrics')
def graph_metrics(model_num, completed_epochs, metrics):
    size_axis_titles = 16
    size_title = 18
    size_legend = 14
    
    fig, [ax_acc, ax_loss] = plt.subplots(1, 2)
    fig.set_size_inches((16, 7))
    fig.suptitle(f'Model {model_num} Training', fontsize=size_title)
    
    ax_acc.set_xlabel('Epoch', fontsize=size_axis_titles)
    ax_acc.set_ylabel('Accuracy', fontsize=size_axis_titles)
    ax_acc.set_xbound(1, completed_epochs+1)
    ax_acc.set_ybound(0, 1.0)
    ax_acc.plot(metrics['train_acc'], label='Training')
    ax_acc.plot(metrics['val_acc'], label='Validation')
    ax_acc.legend(loc='upper right', fontsize=size_legend)

    ax_loss.set_xlabel('Epoch', fontsize=size_axis_titles)
    ax_loss.set_ylabel('Loss', fontsize=size_axis_titles)
    ax_loss.set_xbound(1, completed_epochs+1)
    ax_loss.set_ybound(0, 2.5)
    ax_loss.plot(metrics['train_loss'], label='Training')
    ax_loss.plot(metrics['val_loss'], label='Validation')
    ax_loss.legend(loc='upper right', fontsize=size_legend)

    fig.savefig(f'{CWD}/models/Model_{model_num}_Training.png', bbox_inches='tight')
    fig.show()

In [None]:
@keras.saving.register_keras_serializable(package='capstone',name='train_base_CNN')
def train_base_CNN(model_num, data_loader, params, validation_loader):
    model = make_standard_ResNet50_V2()
    if (params['optimizer'] == 'Adam'):
        optimizer = tf.keras.optimizers.Adam(params['learning_rate']) # Adam
    elif (params['optimizer'] == 'SGD'):
        optimizer = tf.keras.optimizers.SGD(learning_rate = params['learning_rate']) # stochastic gradient descent
    # Binary classification task necessitates binary crossentropy loss
    loss_fn = tf.keras.losses.BinaryCrossentropy(from_logits = True)

    train_acc_metric = tf.keras.metrics.BinaryAccuracy()
    train_loss_metric = tf.keras.metrics.BinaryCrossentropy()
    val_acc_metric = tf.keras.metrics.BinaryAccuracy()
    val_loss_metric = tf.keras.metrics.BinaryCrossentropy()

    @tf.function
    def train_step(x,y):
        with tf.GradientTape() as tape:
            logits = model(x, training=True)
            loss_value = loss_fn(y, logits)
        grads = tape.gradient(loss_value, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        train_acc_metric.update_state(y, logits)
        train_loss_metric.update_state(y, logits)
        return loss_value
    
    @tf.function
    def test_step(x,y):
        logits = model(x, training=False)
        val_acc_metric.update_state(y, logits)
        val_loss_metric.update_state(y, logits)

    completed_epochs = 0 
    ### Conditions for Early Stopping ###
    wait_val_loss = 0
    best_val_loss = float('inf')
    ### Conditions for Adjusting Learning Rate ###
    wait_val_acc = 0
    best_val_acc = float(0)

    ### Store Metrics ###
    TRAIN_ACC = []
    TRAIN_LOSS = []
    VAL_ACC = []
    VAL_LOSS = []

    for epoch in range(params['num_epochs']):
        print("\nStart of epoch %d" % (epoch,))
        start_time = time.time()

        ### Training Steps ###
        for idx in tqdm(range(data_loader.get_train_size()//params["batch_size"])):
            x, y = data_loader.get_batch(params['batch_size'])
            loss_value = train_step(x, y)
        
        train_acc = train_acc_metric.result()
        train_loss = train_loss_metric.result()
        train_acc_metric.reset_states()
        train_loss_metric.reset_states()
        print("Training acc over epoch: %.4f" % (train_acc.numpy()))
        print("Training loss over epoch: %.4f" % (train_loss.numpy()))

        ### Validation Steps ###
        for idx in tqdm(range(validation_loader.get_train_size()//params['batch_size'])):
            x_val, y_val = validation_loader.get_batch(params['batch_size'])
            test_step(x_val, y_val)
        
        val_acc = val_acc_metric.result()
        val_loss = val_loss_metric.result()
        val_acc_metric.reset_states()
        val_loss_metric.reset_states()
        print("Validation acc: %.4f" % (float(val_acc),))
        print("Validation loss: %.4f" % (float(val_loss),))
        print("Time taken: %.2fs" % (time.time() - start_time))

        ### Store Metrics ###
        TRAIN_ACC.append(float(train_acc))
        TRAIN_LOSS.append(float(train_loss))
        VAL_ACC.append(float(val_acc))
        VAL_LOSS.append(float(val_loss))

        completed_epochs += 1
        wait_val_loss += 1
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            wait_val_loss = 0
        if wait_val_loss >= params['patience_stop']:
            print("EARLY STOP")
            break

        wait_val_acc += 1
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            wait_val_acc = 0
        if wait_val_acc >= params['patience_lr']:
            if (optimizer.lr != params['min_lr']): # if the optimizer isn't at minimum LR
                print("learning rate changed")
                old_lr = optimizer.lr
                optimizer.lr = old_lr * params['factor'] # multiply optimizer LR by factor
                new_lr = optimizer.lr
                assert abs(old_lr - new_lr) < 1e-9 # ensure the rate actually did change
                # I set the tolerance to 1e-9; that's way below the min LR.
    
    ### Compile Model Training Metrics ###
    metrics = {
        'train_acc': TRAIN_ACC,
        'train_loss': TRAIN_LOSS,
        'val_acc': VAL_ACC,
        'val_loss': VAL_LOSS,
    }
    metrics = pd.DataFrame(metrics)
    print(metrics)

    ### Visualize Training Metrics ###
    graph_metrics(model_num, completed_epochs, metrics)

    ### Save Model ###
    model.save(f'{CWD}/models/Model_{model_num}.keras')
    
    return completed_epochs, metrics

In [None]:
@keras.saving.register_keras_serializable(package='capstone',name='train_DB_VAE')
def train_DB_VAE(model_num, data_loader, params, validation_loader):
    model = DB_VAE(params['latent_dim'])

    if (params['optimizer'] == 'Adam'):
        optimizer = tf.keras.optimizers.Adam(params['learning_rate']) # Adam
    elif (params['optimizer'] == 'SGD'):
        optimizer = tf.keras.optimizers.SGD(learning_rate = params['learning_rate']) # stochastic gradient descent

    ### We'll track typical binary losses as usual ###
    train_acc_metric = tf.keras.metrics.BinaryAccuracy()
    train_loss_metric = tf.keras.metrics.BinaryCrossentropy()
    val_acc_metric = tf.keras.metrics.BinaryAccuracy()
    val_loss_metric = tf.keras.metrics.BinaryCrossentropy()
    
    
    @tf.function
    def debiasing_train_step(x, y):
        with tf.GradientTape() as tape:
            y_logit, z_mean, z_logsigma, x_recon = model(x)
            loss, class_loss = debiasing_loss_function(x, x_recon, y, y_logit, z_mean, z_logsigma)
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

        train_acc_metric.update_state(y, y_logit)
        train_loss_metric.update_state(y, y_logit)
        return loss
    
    @tf.function
    def test_step(x,y):
        logits, _, _, _ = model(x, training=False)
        val_acc_metric.update_state(y, logits)
        val_loss_metric.update_state(y, logits)

    all_imgs = data_loader.get_all_train_faces()

    completed_epochs = 0 
    ### Store Metrics ###
    TRAIN_ACC = []
    TRAIN_LOSS = []
    VAL_ACC = []
    VAL_LOSS = []
    DB_LOSS = []
    for epoch in range(params["num_epochs"]):
        print("\nStart of epoch %d" % (epoch,))
        start_time = time.time()

        p_lesions = get_training_sample_probabilities(all_imgs, model)
        db_loss = np.empty(shape=[data_loader.get_train_size() // params["batch_size"]])

        for idx in tqdm(range(data_loader.get_train_size() // params["batch_size"])):
            # load a batch of data
            (x, y) = data_loader.get_batch(params["batch_size"], p_pos=p_lesions)

            # loss optimization
            loss = debiasing_train_step(x, y)
            db_loss[idx] = loss
        
        train_acc = train_acc_metric.result()
        train_loss = train_loss_metric.result()
        db_loss = db_loss.mean()
        train_acc_metric.reset_states()
        train_loss_metric.reset_states()
        print("Training acc over epoch: %.4f" % (train_acc.numpy()))
        print("Training loss over epoch: %.4f" % (train_loss.numpy()))
        print("Debiasing loss over epoch: %.4f" % (db_loss.mean()))

        for idx in tqdm(range(validation_loader.get_train_size()//params['batch_size'])):
            x_val, y_val = validation_loader.get_batch(params['batch_size'])
            test_step(x_val, y_val)
        
        val_acc = val_acc_metric.result()
        val_loss = val_loss_metric.result()
        val_acc_metric.reset_states()
        val_loss_metric.reset_states()
        print("Validation acc: %.4f" % (float(val_acc),))
        print("Validation loss: %.4f" % (float(val_loss),))
        print("Time taken: %.2fs" % (time.time() - start_time))
        
        ### Store Metrics ###
        TRAIN_ACC.append(float(train_acc))
        TRAIN_LOSS.append(float(train_loss))
        VAL_ACC.append(float(val_acc))
        VAL_LOSS.append(float(val_loss))
        DB_LOSS.append(float(db_loss))

        completed_epochs += 1

    ### Compile Model Training Metrics ###
    metrics = {
        'train_acc': TRAIN_ACC,
        'train_loss': TRAIN_LOSS,
        'val_acc': VAL_ACC,
        'val_loss': VAL_LOSS,
    }
    metrics = pd.DataFrame(metrics)
    print(metrics)

    ### Visualize Training Metrics ###
    graph_metrics(model_num, completed_epochs, metrics)

    ### Save Model ###
    model.save(f'{CWD}/models/Model_{model_num}.keras')

    return completed_epochs, metrics

In [None]:
models = []
completed = []
training_metrics = []

In [None]:
## Automated Training ###
if ('models' not in os.listdir(CWD)):
    os.mkdir(f'{CWD}/models/')
for i in range(1,7):
    if (i % 3 == 1):
        loader = loader_ISIC
        validation_loader = loader_val_ISIC
    elif (i % 3 == 2):
        loader = loader_ISIC_DiDI
        validation_loader = loader_val_ISIC_DiDI
    elif (i % 3 == 0):
        loader = loader_ISIC_ArGI
        validation_loader = loader_val_ISIC_ArGI
    
    if (i <= 3):
        completed_epochs, metrics = train_base_CNN(i, loader, params_CNN, validation_loader)
    elif (i <= 6):
        completed_epochs, metrics = train_DB_VAE(i, loader, params_DB_VAE, validation_loader)

    completed.append(completed_epochs)
    training_metrics.append(metrics)

    tf.keras.backend.clear_session()

## Research Ethics
This code makes use of the mitdeeplearning package (Amini, 2024) for the data loading function. 
The DB-VAE for Models 4-6 is inspired by the Debiasing Computer Vision Lab notebook from 6.S191.

### Copyright 2024 MIT 6.S191 Introduction to Deep Learning. All Rights Reserved. 
 
Licensed under the MIT License. You may not use this file except in compliance 
with the License. Use and/or modification of this code outside of 6.S191 must 
reference: 

© MIT 6.S191: Introduction to Deep Learning 
http://introtodeeplearning.com 

In [None]:
### Model Training Metadata ###

end_time = time.localtime(time.time())

f = open(f"{CWD}/models/MODEL_INFO.txt", "a")
f.write(f"Start Time: {start_time.tm_year}/{start_time.tm_mon}/{start_time.tm_mday}, {start_time.tm_hour}:{start_time.tm_min}:{start_time.tm_sec}\n")
f.write(f"End Time: {end_time.tm_year}/{end_time.tm_mon}/{end_time.tm_mday}, {end_time.tm_hour}:{end_time.tm_min}:{end_time.tm_sec}")
f.write('\n')
f.write("params_CNN = {\n")
f.write(f'    optimizer = {params_CNN["optimizer"]}\n')
f.write(f'    batch_size = {params_CNN["batch_size"]}\n')
f.write(f'    num_epochs = {params_CNN["num_epochs"]}\n')
f.write(f'    learning_rate = {params_CNN["learning_rate"]}\n')
f.write(f'    min_lr = {params_CNN["min_lr"]}\n')
f.write(f'    factor = {params_CNN["factor"]}\n')
f.write(f'    patience_lr = {params_CNN["patience_lr"]}\n')
f.write(f'    patience_stop = {params_CNN["patience_stop"]}\n')
f.write('}\n')
f.write('\n')
f.write("params_DB_VAE = {\n")
f.write(f'    optimizer = {params_DB_VAE["optimizer"]}\n')
f.write(f'    batch_size = {params_DB_VAE["batch_size"]}\n')
f.write(f'    num_epochs = {params_DB_VAE["num_epochs"]}\n')
f.write(f'    learning_rate = {params_DB_VAE["learning_rate"]}\n')
f.write(f'    latent_dim = {params_DB_VAE["latent_dim"]}\n')
f.write('}\n')
f.write('\n')
for i in range(6):
    f.write(f'Model {i+1} completed {completed[i]} training epochs.\n')
    training_metrics[i].to_excel(f'{CWD}/models/Model_{i+1}_training_metadata.xlsx')
f.close()