### Imports
pyplot per i grafici

numpy per la gestione degli array/tensori

os, glob, pathlib per gestione filesystem/cartelle/files

wandb per il tracking degli esperimenti

pandas per il dataframe proveniente dal .csv

time per aggiungere il timestamp al file del modello e delle immagini sintetiche


In [None]:
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np
import os
import glob
import pathlib
import wandb
from wandb.keras import WandbCallback
import pandas as pd
import time

### Definizione iperparametri
Frutto di lungo processo di trial/error. I valori dei learning rates utilizzati (per l'algoritmo Adam) evitano che le funzioni di costo di discriminator e generator non divergano dopo poche decine di epochs, consentendo un training stabile per oltre 1500 epochs (~2h su Colab, 4h sul mio hardware)


In [None]:
batch_size = 8          # default tutorial MNIST = 64, default Gennaro = 5
num_channels = 1
num_classes = 2         # default MNIST = 10 (cifre 0-9), ora Healthy o Patient -> 2
image_size = 64         # i .png del dataset sono 64x64
latent_dim = 128        # provato 256 per ovviare al mode collapse, poca differenza

lr_disc = 0.00002       
lr_gen = 0.00002        # default ADAM = 0.001, default tutorial MNIST = 0.0003, divergono (g_loss altissima)
decay_disc = 0.9        # default = 0.9, provato 0.5 ma diverge
decay_gen = 0.9         # default = 0.9, provato 0.5 ma diverge
n_epochs = 501         
lbl_smooth = 0          
patience = 200        # valore per l'EarlyStopping usato per i primi addedstramenti
loss_func = "binary_crossentropy"

### Preprocessing

##### Controllo percorsi e visualizzo immagini del dataset dal filesystem

In [None]:
home_path = str(pathlib.Path.home())
dataset_dir = pathlib.PurePath(home_path, 'Documents', 'Neural Networks', 'datasets')
print(f'Cartella dei datasets:', dataset_dir)

seed_name = "meander02"
src_path_train = pathlib.PurePath(dataset_dir, 'accXaccY', '64', '50_50', 'meander', '02', 'training')
print(f'Cartella del dataset di training', src_path_train)

sub_class = [f for f in os.listdir(src_path_train) if not f.startswith('.')]
print(f'Labels trovate nei dati di training:', sub_class)

fig = plt.figure(figsize=(10,5))

path = os.path.join(src_path_train,sub_class[0])

filelist = glob.glob(path+'/*')

for i in range(4):
    print(filelist[i])
    plt.subplot(240 + 1 + i)
    img = plt.imread(filelist[i])
    plt.imshow(img, cmap=plt.get_cmap('gray'))

src_path_test = pathlib.PurePath(dataset_dir, 'accXaccY', '64', '50_50', 'meander', '02', 'test')
print(f'\nCartella del dataset di test' ,src_path_test)

sub_class = [f for f in os.listdir(src_path_test) if not f.startswith('.')]
print(f'Labels trovate nei dati di test:', sub_class)

path = os.path.join(src_path_test, sub_class[1])

filelist = glob.glob(path+'/*')
for i in range(4,8):
    print(filelist[i])
    plt.subplot(240 + 1 + i)
    img = plt.imread(filelist[i])
    plt.imshow(img, cmap=plt.get_cmap('gray'))


##### Carico il dataset tramite il metodo di keras 'image_dataset_from_directory'
Il metodo genera un oggetto "tf.data.Dataset"

In [None]:
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    directory=src_path_train,
    validation_split=0.3,                   # stesso usato da Gennaro
    subset='training',                      # divisione del set
    labels='inferred',                      # prende i modi delle classi dalle sottocartelle del set
    label_mode='categorical',               # labels sono codificate come categorical vector con valori binari (sostituisce il metodo utils.to_categorical)
    class_names=['H', 'P'],                 # devono corrispondere ai nomi delle sottocartelle
    color_mode='grayscale',                 # canale dei colori
    batch_size=batch_size,                  
    image_size=(image_size, image_size),
    shuffle=False,                          # uso il seed originale, senza modificare l'ordine dei samples
    # seed=123
)
val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    directory=src_path_train,
    validation_split=0.3,
    subset='validation',
    image_size=(image_size, image_size),
    batch_size=batch_size,
    labels="inferred",
    label_mode='categorical',
    class_names=['H', 'P'],
    color_mode="grayscale",
    shuffle=False,
    # seed=123
)

test_ds = tf.keras.preprocessing.image_dataset_from_directory(
    directory = src_path_test,
    labels = "inferred",
    label_mode='categorical',
    class_names = ['H', 'P'],
    color_mode = "grayscale",
    batch_size = batch_size,                         # uso un singolo sample come batch per la fase di test
    image_size = (image_size, image_size),
    shuffle = False,
    # seed=123
)

# Verifico corretto caricamento dei diversi sets e delle labels
class_names_train = train_ds.class_names
print(f'Lista delle classi (train): {class_names_train}')

class_names_test = test_ds.class_names
print(f'Lista delle classi (test): {class_names_test}')

n_classes = len(class_names_train)

##### Salvo numero di samples per ogni set

In [None]:
target_names = [item for item in os.listdir(src_path_train) if os.path.isdir(os.path.join(src_path_train, item))]
nb_train_samples = ( sum([len(files) for _, _, files in os.walk(src_path_train)]) - 1 )
nb_validation_samples = round(nb_train_samples * 0.3) - 1
validation_steps = nb_validation_samples / batch_size
nb_synth_samples = 0
nb_test_samples = ( sum([len(files) for _, _, files in os.walk(src_path_test)]) - 1)
total_nb_samples = nb_train_samples + nb_test_samples
nb_classes = len(target_names)

##### Visualizzo immagini dopo aver caricato il dataset
Mi assicuro che il dataset venga caricato correttamente, con le labels come titolo

[1. 0.] = Healthy

[0. 1.] = Patient

In [None]:
plt.figure(figsize=(15, 15))
for images, labels in train_ds.shuffle(buffer_size=5).take(1):
  print(images.shape)
  for i in range(5):
    ax = plt.subplot(1, 5, i + 1)
    plt.imshow(images[i], cmap=plt.get_cmap('gray'))
    plt.title(np.array(labels[i]))

plt.figure(figsize=(15, 15))
for images, labels in test_ds.shuffle(buffer_size=5).take(1):
  for i in range(5):
    ax = plt.subplot(1, 5, i + 1)
    plt.imshow(images[i], cmap=plt.get_cmap('gray'))
    plt.title(np.array(labels[i]))


##### Verifica shapes di immagini e labels
mi deve restituire:

(batch_size, 64, 64, 1) per le immagini

(batch_size, 2) per le labels


In [None]:
for image_batch, labels_batch in train_ds:
  print(f'Shape delle immagini di training (batch):', image_batch.shape)
  # print(image_batch[0])
  print(f'Shape delle labels di training (batch):', labels_batch.shape)
  print(labels_batch[0])
  break

for image_batch, labels_batch in test_ds:
  print(f'Shape delle immagini di test (batch):', image_batch.shape)
  # print(image_batch[0])
  print(f'Shape delle labels di test (batch):', labels_batch.shape)
  print(labels_batch[0])
  break

##### Standardizzo i dati
Normalizzo i valori nell'intervallo [0,1] per utilizzare un'attivazione di tipo "sigmoid" nei modelli

In [None]:
normalization_layer = tf.keras.layers.Rescaling(1./255)                         # per normalizzare in [0, 1]
nl = '01'
# normalization_layer = tf.keras.layers.Rescaling(1./127.5, offset=-1)          # per normalizzare in [-1, 1]
# nl = '-11'
normalized_train_ds = train_ds.map(lambda x, y: (normalization_layer(x), y))
image_batch, labels_batch = next(iter(normalized_train_ds))
first_image = image_batch[0]
first_label = labels_batch[0]
# Testo che i valori siano effettivamente in [0,1]
print("Intervallo train set: ", np.min(first_image), np.max(first_image))
print("Intervallo labels: ", np.min(first_label), np.max(first_label))

normalized_val_ds = val_ds.map(lambda x, y: (normalization_layer(x), y))
image_batch, labels_batch = next(iter(normalized_val_ds))
first_image = image_batch[0]
# Testo che i valori siano effettivamente in [0,1].
print("Intervallo validation set: ", np.min(first_image), np.max(first_image))

normalized_test_ds = test_ds.map(lambda x, y: (normalization_layer(x), y))
image_batch, labels_batch = next(iter(normalized_test_ds))
first_image = image_batch[0]
# Testo che i valori siano effettivamente in [0,1].
print("Intervallo test set: ", np.min(first_image), np.max(first_image))


##### Definisco numero di canali in input alle due reti

In [None]:
generator_in_channels = latent_dim + num_classes        # 128 punti dello spazio + numero classi (H/P)
discriminator_in_channels = num_channels + num_classes  # numero di canali (1=grayscale) + num classi (H/P)
print(generator_in_channels, discriminator_in_channels)

### Definizione modelli
Sia Generatore che Discriminatore sono definite tramite le Sequential API di Keras

In [None]:
# initializer = tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.02)         # best practice per stabilizzare il training di una DCGAN

##### Generatore e Discriminatore

Implementazione DCGAN

https://keras.io/examples/generative/dcgan_overriding_train_step/


In [None]:

discriminator = tf.keras.Sequential(
    [
        # L'input al generatore è un sample (reale o sintetico) con shape (64,64,num_canali+num_classi)
        tf.keras.layers.InputLayer(input_shape=(64, 64, discriminator_in_channels), name="disc_input"),
        # (64,64,3) -> (32,32,64) = downsampling
        tf.keras.layers.Conv2D(64, kernel_size=4, strides=2, padding="same", name="conv1"),
        tf.keras.layers.LeakyReLU(alpha=0.2, name="relu1"), # best practice
        # (32,32,64) -> (16,16,128) = downsampling
        tf.keras.layers.Conv2D(128, kernel_size=4, strides=2, padding="same", name="conv2"),
        tf.keras.layers.LeakyReLU(alpha=0.2, name="relu2"),
        # (16,16,128) -> (8,8,128) = downsampling
        tf.keras.layers.Conv2D(128, kernel_size=4, strides=2, padding="same", name="conv3"),
        tf.keras.layers.LeakyReLU(alpha=0.2, name="relu3"),
        tf.keras.layers.Flatten(name="flatten1"),
        tf.keras.layers.Dropout(0.2, name="dropout1"),
        # (8,8,128) -> prediction (fake/real)
        tf.keras.layers.Dense(1, activation="sigmoid", name="output"),
    ],
    name="discriminator",
)
# Mi assicuro che l'output abbia la giusta shape
assert discriminator.output_shape == (None, 1)
discriminator.summary()

generator = tf.keras.Sequential(
    [
        tf.keras.layers.InputLayer(input_shape=(generator_in_channels,), name="gen_input"),
        # Vogliamo generare latent_dim(128) + num_classes coefficienti per ricostruirli in una map da
        # 8x8x(128 + num_classes).
        tf.keras.layers.Dense((8 * 8 * generator_in_channels), name="dense1"),
        tf.keras.layers.Reshape((8, 8, generator_in_channels), name="reshape1"),
        # kernel deve avere dim multiple dello stride (no checkerboard effect)
        # (8,8,latent_dim+n_classes) -> (16,16,128) = upsampling  
        tf.keras.layers.Conv2DTranspose(128, kernel_size=4, strides=2, padding="same", name="t_conv1"),
        tf.keras.layers.LeakyReLU(alpha=0.2, name="relu1"),  # best practice
        # (16,16,128) -> (32,32,256) = upsampling
        tf.keras.layers.Conv2DTranspose(256, kernel_size=4, strides=2, padding="same", name="t_conv2"),
        tf.keras.layers.LeakyReLU(alpha=0.2, name="relu2"),
        # (32,32,256) -> (64,64,512) = upsampling
        tf.keras.layers.Conv2DTranspose(512, kernel_size=4, strides=2, padding="same", name="t_conv3"),
        tf.keras.layers.LeakyReLU(alpha=0.2, name="relu3"),
        # (64,64,512) -> (64,64,1) = output, immagine 64x64 greyscale
        tf.keras.layers.Conv2D(1, kernel_size=5, padding="same", activation="sigmoid", name="output"),
    ],
    name="generator",
)
# Mi assicuro che l'output abbia la giusta shape
assert generator.output_shape == (None, 64, 64, 1)
generator.summary()

In [None]:
tf.keras.utils.plot_model(model=discriminator, to_file="generator_graph.png", show_shapes=True)

In [None]:
tf.keras.utils.plot_model(model=generator, to_file="generator_graph.png", show_shapes=True)

##### Uso il generatore non addestrato per generare un'immagine

In [None]:
noise = tf.random.normal([2,generator_in_channels])
generated_image = generator(noise, training=False)
print(generated_image.shape)
plt.imshow(generated_image[0, :, :, 0], cmap='gray')

##### Model subclassing e override dei metodi train_step e test_step

Implementazione CGAN

https://keras.io/examples/generative/conditional_gan/

In [None]:

class ConditionalGAN(tf.keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(ConditionalGAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim
        self.gen_loss_tracker = tf.keras.metrics.Mean(name="g_loss")
        self.disc_loss_tracker = tf.keras.metrics.Mean(name="d_loss")

    @property
    def metrics(self):
        return [self.gen_loss_tracker, self.disc_loss_tracker, self.train_mse_metric, self.train_acc_metric]   # aggiunto le due metriche custom (MSE e Accuracy)

    def compile(self, d_optimizer, g_optimizer, loss_fn, train_mse_metric, train_acc_metric):                   # aggiunto le due metriche custom (MSE e Accuracy)
        super(ConditionalGAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn
        self.train_mse_metric = train_mse_metric # MSE
        self.train_acc_metric = train_acc_metric # BinaryAccuracy
    
    # Le sezioni commentate in inglese sono quelle di default e non sono state modificate   
    def train_step(self, data):
        # Unpack the data.
        real_images, one_hot_labels = data
        
        # Add dummy dimensions to the labels so that they can be concatenated with
        # the images. This is for the discriminator.
        image_one_hot_labels = one_hot_labels[:, :, None, None]
        image_one_hot_labels = tf.repeat(
            image_one_hot_labels, repeats=[image_size * image_size]
        )
        image_one_hot_labels = tf.reshape(
            image_one_hot_labels, (-1, image_size, image_size, num_classes)
        )

        # Sample random points in the latent space and concatenate the labels.
        # This is for the generator.
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
        random_vector_labels = tf.concat(
            [random_latent_vectors, one_hot_labels], axis=1
        )

        # Decode the noise (guided by labels) to fake images.
        generated_images = self.generator(random_vector_labels)

        # Combine them with real images. Note that we are concatenating the labels
        # with these images here.
        fake_image_and_labels = tf.concat([generated_images, image_one_hot_labels], -1)
        real_image_and_labels = tf.concat([real_images, image_one_hot_labels], -1)
        combined_images = tf.concat(
            [fake_image_and_labels, real_image_and_labels], axis=0
        )

        # Assemble labels discriminating real from fake images.
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )

        # Train the discriminator.
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights))
        

        # Sample random points in the latent space.
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
        random_vector_labels = tf.concat(
            [random_latent_vectors, one_hot_labels], axis=1
        )

        # Assemble labels that say "all real images".
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            fake_images = self.generator(random_vector_labels)
            fake_image_and_labels = tf.concat([fake_images, image_one_hot_labels], -1)
            predictions = self.discriminator(fake_image_and_labels)
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))

        # Monitor loss.
        self.disc_loss_tracker.update_state(d_loss)
        self.gen_loss_tracker.update_state(g_loss)
        
        # Aggiungo le metriche richieste
        self.train_acc_metric.update_state(labels, self.discriminator(combined_images, training=True))
        self.train_mse_metric.update_state(labels, self.discriminator(combined_images, training=True))
        # Restituisco tutte le metriche
        return { m.name: m.result() for m in self.metrics }
    
    # override del test step per implementazione di model.evaluate e aggiunta metriche
    def test_step(self, data):
        # Unpack the data.
        real_images, one_hot_labels = data
        # Add dummy dimensions to the labels so that they can be concatenated with
        # the images. This is for the discriminator.
        image_one_hot_labels = one_hot_labels[:, :, None, None]
        image_one_hot_labels = tf.repeat(
            image_one_hot_labels, repeats=[image_size * image_size]
        )
        image_one_hot_labels = tf.reshape(
            image_one_hot_labels, (-1, image_size, image_size, num_classes)
        )

        # Sample random points in the latent space and concatenate the labels.
        # This is for the generator.
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
        random_vector_labels = tf.concat(
            [random_latent_vectors, one_hot_labels], axis=1
        )

        # Decode the noise (guided by labels) to fake images.
        generated_images = self.generator(random_vector_labels)

        # Combine them with real images. Note that we are concatenating the labels
        # with these images here.
        fake_image_and_labels = tf.concat([generated_images, image_one_hot_labels], -1)
        real_image_and_labels = tf.concat([real_images, image_one_hot_labels], -1)
        combined_images = tf.concat(
            [fake_image_and_labels, real_image_and_labels], axis=0
        )

        # Assemble labels discriminating real from fake images.
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )

        # Assemble labels that say "all real images".
        misleading_labels = tf.zeros((batch_size, 1))
        
        # Da qui in poi aggiungo le loss e le metriche evitando backpropagation e training
        val_d_logits = self.discriminator(combined_images, training=False)
        val_g_logits = self.discriminator(fake_image_and_labels, training=False)

        # Compute the loss value
        self.val_d_loss_value = self.loss_fn(labels, val_d_logits)
        self.val_g_loss_value = self.loss_fn(misleading_labels, val_g_logits)
        
        # Monitor loss.
        self.disc_loss_tracker.update_state(self.val_d_loss_value)
        self.gen_loss_tracker.update_state(self.val_g_loss_value)
        # Aggiungo le metriche richieste
        self.train_acc_metric.update_state(labels, self.discriminator(combined_images, training=False))
        self.train_mse_metric.update_state(labels, self.discriminator(combined_images, training=False))
        
        # Restituisco tutte le metriche
        return { m.name: m.result() for m in self.metrics }


### Logging e tracking

##### Definizione dei Callbacks

In [None]:
timestr = time.strftime("%Y%m%d-%H%M%S")

log_dir = os.path.join(os.getcwd(), 'logs')
csv_dir = os.path.join(log_dir, 'csv')
model_checkpoints_dir = os.path.join(log_dir, 'model_checkpoints')
model_checkpoints_timestamp = pathlib.Path(model_checkpoints_dir, timestr)
model_checkpoints_timestamp.mkdir(parents=True, exist_ok=True)
# Include the epoch in the file name (uses `str.format`)
checkpoint_path = pathlib.Path(model_checkpoints_timestamp, "cp-epo{epoch:04d}-g_loss{g_loss:.2f}-d_loss{d_loss:.2f}-acc{accuracy:.2f}-mse{mse:.2f}")
checkpoint_dir = os.path.dirname(checkpoint_path)
print(checkpoint_path)
print(checkpoint_dir)

csv_file = csv_dir + '/training_results_' + str(seed_name) + "_epoch" + str(n_epochs) + "_" + str(timestr) + '.csv'
print(csv_file)

def get_callbacks(log_dir):
  return [
    tf.keras.callbacks.EarlyStopping(   # usato solo inizialmente quando le loss divergevano subito
      monitor='g_loss',
      patience=patience,
      verbose=1,
      restore_best_weights=True,
    ),
    tf.keras.callbacks.CSVLogger(csv_file),   # costruisco un file csv con tutte le metriche
    tf.keras.callbacks.ModelCheckpoint(       # per salvare i modelli (circa 50MB per checkpoint). Salvo solo weights perchè non posso salvare l'intero modello con subclassing
      filepath=checkpoint_path,
      # monitor='val_g_loss',
      verbose=1,
      save_best_only=False,
      save_weights_only=True,
      save_freq='epoch',
      period=10,
    )
  ]


Custom callback per la generazione di immagini durante il training

In [None]:
generated_images_dir = "logs/generated_images/training"
timestamp_dir = pathlib.Path(generated_images_dir, timestr)
timestamp_dir.mkdir(parents=True, exist_ok=True)

class GANMonitor(tf.keras.callbacks.Callback):
    def __init__(self, num_img=3, latent_dim=generator_in_channels):
        self.num_img = num_img
        self.latent_dim = latent_dim

    def on_epoch_end(self, epoch, logs=None):
        # start = time.time()
        random_latent_vectors = tf.random.normal(shape=(self.num_img, self.latent_dim))
        generated_images = self.model.generator(random_latent_vectors)
        generated_images *= 255
        generated_images.numpy()
        for i in range(self.num_img):
            img = tf.keras.preprocessing.image.array_to_img(generated_images[i])
            if (epoch+1) % 10 == 0:
                # img.save(os.path.join(timestamp_dir, "synth_{}_epo{}_{}.png".format(i, epoch+1, timestr)))
                # print("\nSynthetic image saved with name synth_{}_epo{}_{}.png".format(i, epoch+1, timestr))
                img.save(os.path.join(timestamp_dir, "synth_{}_epo{}.png".format(i, epoch+1)))
                if (i==0):
                    print("\nImmagine generata: synth_{}_epo{}.png".format(i, epoch+1))
                else:
                    print("Immagine generata: synth_{}_epo{}.png".format(i, epoch+1))

##### Inizializzo parametri wandb

Usati per il logging sul sito utilizzato per tracciare gli esperimenti. Non rilevante.

In [None]:
wandb.init(project="wandb_projectname", entity="wandb_username", 
        config= {
                "dataset": seed_name,
                "train_samples": nb_train_samples,
                "validation_samples": nb_validation_samples,
                "synth_samples": nb_synth_samples,
                "test_samples": nb_test_samples,
                "latent_dim": latent_dim,
                "lr_discr": lr_disc,
                "decay_discr": decay_disc,
                "lr_gen": lr_gen,
                "decay_gen": decay_gen,
                "n_epochs": n_epochs,
                "batch_size": batch_size,
                "norm": nl,
                "loss_func": loss_func,
                "label_smoothing": lbl_smooth,
                }
)
config = wandb.config


### Training

In [None]:

cond_gan = ConditionalGAN(
    discriminator=discriminator, generator=generator, latent_dim=latent_dim
)
cond_gan.compile(
    d_optimizer=tf.keras.optimizers.Adam(learning_rate=lr_disc, beta_1=decay_disc), 
    g_optimizer=tf.keras.optimizers.Adam(learning_rate=lr_gen, beta_1=decay_gen),   
    loss_fn=tf.keras.losses.BinaryCrossentropy(),
    train_mse_metric=tf.keras.metrics.MeanSquaredError(name='mse'),                 # aggiunto
    train_acc_metric=tf.keras.metrics.BinaryAccuracy(name='accuracy')               # aggiunto
)

##### Stampo riepilogo degli iperparametri

Non rilevante.

In [None]:
print('Training di una C-GAN ...')
print(f'- dataset: ', seed_name)
print(f'- lista delle classi: ', class_names_train)
print(f'- # di classi: ', n_classes)
print(f'- # di campioni per il training: ', nb_train_samples)
print(f'- # di campioni per la validation: ', nb_validation_samples)
print(f'- # di campioni per il test: ', nb_test_samples)
print(f'- # totale di campioni: ', total_nb_samples) 
print(f'- percentuale di campioni per il training: ', round(nb_train_samples/total_nb_samples*100, 2), '%')
print(f'- percentuale di campioni per la validation: ', round(nb_validation_samples/nb_train_samples*100), '%')
print(f'- percentuale di campioni per il test: ', round(nb_test_samples/total_nb_samples*100, 2), ' %')
print(f'- dimensioni dello spazio latente: ', latent_dim)
print(f'- # di epochs: ', n_epochs - 1)
print(f'- diemnsioni dei batch: ', batch_size)
print(f'- learning rate disc.: ', lr_disc)
print(f'- learning rate gen.: ', lr_gen)
print(f'- normalizzazione: ', nl)
print(f'- # di validation steps: ', round(validation_steps))

##### Inizio training

In [None]:
history = cond_gan.fit(
    normalized_train_ds,
    epochs=n_epochs,
    validation_data=(normalized_val_ds),
    validation_steps=3, # generalmente validation_steps = validation_samples / batch_size
    callbacks=[GANMonitor(num_img=5), get_callbacks(log_dir=log_dir), WandbCallback()]
    )

##### Salvo il modello del generatore finale per utilizzo futuro

In [None]:
trained_generator = cond_gan.generator
# trained_generator.save("models/cgan_gen_" + str(seed_name) + "_epoch" + str(n_epochs) + "_" + str(timestr) + ".h5")
# print("Modello salvato.")

##### Fase di test
Uso il metodo model.evaluate() per avere le metriche corrispondenti al test dataset.

Ciò è possibile solo dopo l'override del train_step del metodo model.fit(), altrimenti restituisce errore per il subclassing

In [None]:
evaluate = cond_gan.evaluate(normalized_test_ds, return_dict=True)

##### Stampo le metriche come dataframe
(nel caso non volessi utilizzare wandb)

In [None]:
print("{:<15} {:<10}".format('Metric','Value'))
metric_value = []
for k, v in evaluate.items():
    metric, value = k, v
    print("{:<15} {:<10}".format("test_"+metric, value))
    metric_value.append(evaluate.get(metric))
print(metric_value)

##### Aggiungo le metriche del test dataset al csv generato dal callback
(nel caso non volessi utilizzare wandb)

In [None]:
df = pd.read_csv(csv_file)
df['test_accuracy'] = metric_value[3]
df['test_mse'] = metric_value[2]
df['test_d_loss'] = metric_value[1]
df['test_g_loss'] = metric_value[0]
df['dataset_name'] = seed_name
df.to_csv(csv_file)
last_row = df.iloc[-1]
print(last_row)

##### Termino il logging su wandb
1) Logging manuale delle metriche di test
2) Invio email al termine del training
3) Termino la run e salvo i dati

In [None]:
wandb.log({"test_g_loss": metric_value[0], "test_d_loss": metric_value[1], "test_mse": metric_value[2], "test_accuracy": metric_value[3]})
wandb.alert(title="Run finished", text="Run finished")

In [None]:
wandb.finish()

Caricamento weights da un'eventuale checkpoint e successiva compilazione per avere modello completo.
Il file del modello verrà utilizzato nello script per la generazione delle immagini "image_generation.ipynb"

In [None]:
# weight_file = "logs/model_checkpoints/20220508-010316 mea02_1000epo/cp-epo0030-g_loss0.78-d_loss0.66-acc0.66-mse0.23"
# cp_name = "cp-epo0030-g_loss0.78-d_loss0.66-acc0.66-mse0.23"
# cond_gan.load_weights(weight_file).expect_partial()
# print("Weights caricati con successo. Modello pronto per essere compilato")

# best_trained_generator = cond_gan.generator
# best_trained_generator.compile()
# timestr = time.strftime("%Y%m%d-%H%M%S")
# best_trained_generator.save("models/best_cgan_gen_" + cp_name + "_" + str(timestr) + ".h5")
# print("Modello migliore salvato.")
