In [None]:
#COLAB
# from google.colab import drive
# drive.mount('/content/drive')

# Initialization

In [1]:
import tensorflow as tf
import os
import zipfile
import matplotlib.pyplot as plt
import numpy as np
from numpy import load
from tensorflow.keras.optimizers import Adam
from tqdm.notebook import tqdm_notebook
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, precision_recall_curve, auc, roc_curve
import seaborn as sns
tf.get_logger().setLevel('ERROR')

In [None]:
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  print('GPU device not found')
  #raise SystemError('GPU device not found')
else:
  print('Found GPU at: {}'.format(device_name))

In [3]:
def create_env(f_name):
    #COLAB
    folder_path = './drive/My Drive/'+ f_name

    #folder_path = './models/'+ f_name

    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
        print(f"Cartella '{folder_path}'creata con successo!")
    else:
        print(f"La cartella'{folder_path}'esiste già.")


    subfolders = ['EMA_enc_w', 'EMA_gen_w', 'EMA_xz_w','EMA_xx_w','EMA_zz_w','Rec_Img']

    for subfolder in subfolders:
        subfolder_path = os.path.join(folder_path, subfolder)
        if not os.path.exists(subfolder_path):
            os.makedirs(subfolder_path)
            print(f"Sottocartella '{subfolder}' creata con successo in '{folder_path}'.")
        else:
            print(f"La sottocartella '{subfolder}' esiste già in '{folder_path}'.")


def load_env(f_name):
    #COLAB
    #extract_path = './drive/My Drive/'
    
    extract_path = './models/'
    model_path = os.path.join(extract_path,f_name+'.zip')
    
    if os.path.exists(os.path.join(extract_path,f_name)):

        print(f"Env '{f_name}' esistente caricato'.")
    
    elif os.path.exists(os.path.join(model_path)):
        with zipfile.ZipFile(model_path, 'r') as zip_ref:
            zip_ref.extractall(extract_path)
        print('Estraggo dal file zip...')

    else:

        print('Cartella non trovata...')



In [None]:
TRAIN_LABEL = 'Bird'
MODE = 64
LATENT_DIM = 100
EMA = True
f_name = 'Model_ALAD_'+str(MODE)+'_'+TRAIN_LABEL+'_'+str(LATENT_DIM)
CHECKPOINT = False

#COLAB
#folder_path = './drive/My Drive/'+ f_name

folder_path = './models/'+ f_name

if not CHECKPOINT :
  create_env(f_name)

else:
  load_env(f_name)

In [5]:
SEED = 3
tf.config.experimental.enable_op_determinism()
tf.random.set_seed(SEED)

# Load the Data

In [6]:
# Converte le label in '0' per gli esempi 'normali', '1' per quelli 'anomali' 
def get_adapted_data(labels,normal):
  new_labels = np.where(labels == normal, 0, 1)

  return new_labels

In [None]:
# Caricamento del dataset 32x32
def load_data32(normal_label, class_names):
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()

    x_train = x_train.reshape((x_train.shape[0], 32, 32, 3))
    x_test = x_test.reshape((x_test.shape[0], 32, 32, 3))

    # Normalizza i dati nell'intervallo [0, 1]
    x_train_normalized = x_train.astype('float32') / 255.
    x_test_normalized = x_test.astype('float32') / 255.

    # Centra e normalizza ulteriormente nell'intervallo [-1, 1]
    x_train = (x_train_normalized) * 2. - 1.
    x_test = (x_test_normalized) * 2. - 1.

    # Suddivisione del training set in training e validation set
    x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.1, random_state=42)

    datas = {}

    for i in range(len(class_names)):
        y_train_adapted = get_adapted_data(labels=np.copy(y_train), normal=i)
        y_val_adapted = get_adapted_data(labels=np.copy(y_val), normal=i)
        y_test_adapted = get_adapted_data(labels=np.copy(y_test), normal=i)
        datas[i] = (y_train_adapted, y_val_adapted, y_test_adapted)

    x_train_new = np.copy(x_train)
    x_val_new = np.copy(x_val)
    x_test_new = np.copy(x_test)
    y_train_new = datas[normal_label][0]
    y_val_new = datas[normal_label][1]
    y_test_new = datas[normal_label][2]

    indici_normal = np.where(y_train_new == 0)[0]
    indici_normal_val = np.where(y_val_new == 0)[0]
    indici_anomaly = np.where(y_train_new == 1)[0]
    
    x_adapted = x_train_new[indici_normal]
    y_adapted = y_train_new[indici_normal]
    x_val_adapted = x_val_new[indici_normal_val]

    x_val_adapted.shape()

    return x_adapted, y_adapted, x_train_new, y_train_new, x_test_new, y_test_new, x_val_new, x_val_adapted, indici_normal, indici_normal_val, indici_anomaly


In [8]:
# Caricamento del dataset 64x64
def load_data64(normal_label, class_names):
  #COLAB
  #origin_dir = './drive/MyDrive/data_64/'
  origin_dir = './data/data_64_conv/'
  train_dir = os.path.join(origin_dir,'train_data_converted/')
  test_dir = os.path.join(origin_dir,'test_data_converted/')
  val_dir = os.path.join(origin_dir,'val_data_converted/')

  dict_data = load(os.path.join(train_dir,'x_train_64.npz'))
  x_train = dict_data['arr_0']

  dict_data = load(os.path.join(train_dir,'y_train_64.npz'))
  y_train = dict_data['arr_0']

  dict_data = load(os.path.join(test_dir,'x_test_64.npz'))
  x_test = dict_data['arr_0']

  dict_data = load(os.path.join(test_dir,'y_test_64.npz'))
  y_test = dict_data['arr_0']

  dict_data = load(os.path.join(val_dir,'x_val_64.npz'))
  x_val = dict_data['arr_0']

  dict_data = load(os.path.join(val_dir,'y_val_64.npz'))
  y_val = dict_data['arr_0']


  datas = {}

  for i in range(len(class_names)):
    y_train_adapted = get_adapted_data(labels = np.copy(y_train), normal = i)
    y_test_adapted = get_adapted_data(labels = np.copy(y_test), normal = i)
    y_val_adapted = get_adapted_data(labels = np.copy(y_val), normal = i)

    datas[i] = (y_train_adapted,y_test_adapted,y_val_adapted)


  x_train_new = np.copy(x_train)
  x_test_new = np.copy(x_test)
  x_val_new = np.copy(x_val)

  y_train_new = datas[normal_label][0]
  y_test_new = datas[normal_label][1]
  y_val_new = datas[normal_label][2]

  indici_normal = np.where(y_train_new == 0)[0]
  indici_normal_val = np.where(y_val_new == 0)[0]
  indici_anomaly = np.where(y_train_new == 1)[0]

  x_adapted = x_train_new[indici_normal]
  y_adapted = y_train_new[indici_normal]
  x_val_adapted = x_val_new[indici_normal_val]


  return x_adapted, y_adapted, x_train_new, y_train_new, x_test_new, y_test_new, x_val_new, x_val_adapted, indici_normal, indici_normal_val, indici_anomaly

In [9]:
if MODE == 32:
    batch_size = 32
    img_w = 32
    img_h = 32
    class_names =['Airplane', 'Automobile', 'Bird', 'Cat', 'Deer', 'Dog', 'Frog', 'Horse', 'Ship', 'Truck']
    normal_label = class_names.index(TRAIN_LABEL)
    x_adapted, y_adapted, x_train_new, y_train_new, x_test_new, y_test_new, x_val_new, x_val_adapted, indici_normal, indici_normal_val, indici_anomaly = load_data32(normal_label,class_names)

else:
    batch_size = 64
    img_w = 64
    img_h = 64
    class_names =['Berry','Bird','Dog','Flower','Other']
    normal_label = class_names.index(TRAIN_LABEL)
    x_adapted, y_adapted, x_train_new, y_train_new, x_test_new, y_test_new, x_val_new, x_val_adapted, indici_normal, indici_normal_val, indici_anomaly = load_data64(normal_label,class_names)


In [None]:
# Visualizziamo gli esempi della classe normal
random_index = np.random.choice(indici_normal, 9, replace=False)
plt.figure(figsize=(10, 10))
for i,idx in enumerate(random_index):
    plt.subplot(3, 3, i + 1)
    image = (x_train_new[idx] + 1) * 127.5
    image = np.clip(image, 0, 255).astype(np.uint8)
    plt.imshow(image)
    plt.xlabel("Anomaly Label: {} | Original Class: {}".format(y_train_new[idx],class_names[normal_label]))
    plt.xticks([])
    plt.yticks([])
plt.tight_layout()
plt.show()

# Architecture ALAD (32x32)

In [11]:
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, BatchNormalization, LeakyReLU, Flatten, Dense, ReLU, SpectralNormalization
from tensorflow.keras import layers
init_kernel = tf.random_normal_initializer(mean=0.0, stddev=0.01)

In [12]:
class Encoder_32(tf.keras.Model):

    def __init__(self, latent_dim):
        super(Encoder_32, self).__init__()
        self.latent_dim = latent_dim

        self.conv2 = SpectralNormalization(layers.Conv2D(128, kernel_size=4, kernel_initializer = init_kernel, strides=2, padding='SAME'))
        self.bn2 = layers.BatchNormalization()
        self.lr2 = layers.LeakyReLU(alpha=0.2)

        self.conv3 = SpectralNormalization(layers.Conv2D(256, kernel_size=4, kernel_initializer = init_kernel,strides=2, padding='SAME'))
        self.bn3 = layers.BatchNormalization()
        self.lr3 = layers.LeakyReLU(alpha=0.2)

        self.conv4 = SpectralNormalization(layers.Conv2D(512, kernel_size=4, kernel_initializer = init_kernel,strides=2, padding='SAME'))
        self.bn4 = layers.BatchNormalization()
        self.lr4 = layers.LeakyReLU(alpha=0.2)


        self.conv5 = SpectralNormalization(layers.Conv2D(latent_dim, kernel_size=4, kernel_initializer = init_kernel, strides=1, padding='VALID'))

        self.leaky_relu = layers.LeakyReLU(alpha=0.2)

    def call(self, x_inp):
        x = tf.reshape(x_inp, [-1, 32, 32, 3])

        x = self.conv2(x)
        x = self.bn2(x)
        x = self.lr2(x)
        x = self.conv3(x)
        x = self.bn3(x)
        x = self.lr3(x)
        x = self.conv4(x)
        x = self.bn4(x)
        x = self.lr4(x)
        x = self.conv5(x)
        x = tf.squeeze(x,[1, 2])

        return x

In [13]:
class Generator_32(tf.keras.Model):
    def __init__(self, latent_dim):
        super(Generator_32, self).__init__()
        self.latent_dim = latent_dim

        self.conv1 = layers.Conv2DTranspose(filters=512, kernel_size=4, kernel_initializer = init_kernel, strides=2, padding='VALID')
        self.bn1 = layers.BatchNormalization()
        self.relu1 = layers.ReLU()

        self.conv2 = layers.Conv2DTranspose(filters=256, kernel_size=4, kernel_initializer = init_kernel,strides=2, padding='SAME')
        self.bn2 = layers.BatchNormalization()
        self.relu2 = layers.ReLU()

        self.conv3 = layers.Conv2DTranspose(filters=128, kernel_size=4, kernel_initializer = init_kernel,strides=2, padding='SAME')
        self.bn3 = layers.BatchNormalization()
        self.relu3 = layers.ReLU()


        self.conv5 = layers.Conv2DTranspose(filters=3, kernel_size=4, kernel_initializer = init_kernel,strides=2, padding='SAME')


        self.tanh = layers.Activation('tanh')

    def call(self, x_inp):
        x_inp = tf.reshape(x_inp, [-1, 1, 1, self.latent_dim])
        x = self.conv1(x_inp)
        x = self.bn1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)
        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu3(x)
        x = self.conv5(x)
        x = self.tanh(x)

        return x

In [14]:
class DiscriminatorXZ_32(tf.keras.Model):
    def __init__(self, latent_dim):
        super(DiscriminatorXZ_32, self).__init__()
        self.latent_dim = latent_dim


        self.conv2_x = SpectralNormalization(layers.Conv2D(128, kernel_size=3, kernel_initializer = init_kernel, strides=2, padding='SAME'))
        self.bn2_x = layers.BatchNormalization()
        self.lr2_x = layers.LeakyReLU(alpha=0.2)

        self.conv3_x = SpectralNormalization(layers.Conv2D(256, kernel_size=3, kernel_initializer = init_kernel,strides=2, padding='SAME'))
        self.bn3_x = layers.BatchNormalization()
        self.lr3_x = layers.LeakyReLU(alpha=0.2)

        self.conv4_x = SpectralNormalization(layers.Conv2D(512, kernel_size=3, kernel_initializer = init_kernel,strides=2, padding='SAME'))
        self.bn4_x = layers.BatchNormalization()
        self.lr4_x = layers.LeakyReLU(alpha=0.2)

        # Definizione dei layer per z
        self.conv1_z = SpectralNormalization(layers.Conv2D(512, kernel_size=1, kernel_initializer = init_kernel,strides=1, padding='SAME'))
        self.lr1_z = layers.LeakyReLU()
        self.dropout1_z = layers.Dropout(rate=0.2)

        self.conv2_z = SpectralNormalization(layers.Conv2D(512, kernel_size=1, kernel_initializer = init_kernel,strides=1, padding='SAME'))
        self.lr2_z = layers.LeakyReLU()
        self.dropout2_z = layers.Dropout(rate=0.2)

        # Definizione del layer di fusione
        self.conv_fuse = SpectralNormalization(layers.Conv2D(1024, kernel_size=1, kernel_initializer = init_kernel,strides=1, padding='SAME'))
        self.relu_fuse = layers.LeakyReLU()
        self.dropout_fuse = layers.Dropout(rate=0.2)

        # Definizione del layer finale
        self.conv_final = SpectralNormalization(layers.Conv2D(1, kernel_size=1, kernel_initializer = init_kernel,strides=1, padding='SAME'))

    def call(self, inputs):
        x_inp,z_inp = inputs

        x = self.conv2_x(x_inp)
        x = self.bn2_x(x)
        x = self.lr2_x(x)

        x = self.conv3_x(x)
        x = self.bn3_x(x)
        x = self.lr3_x(x)

        x = self.conv4_x(x)
        x = self.bn4_x(x)
        x = self.lr4_x(x)

        x = tf.reshape(x, [-1, 1, 1, 512*4*4])

        z_inp = tf.reshape(z_inp, [-1, 1, 1, self.latent_dim])

        z = self.conv1_z(z_inp)
        z = self.lr1_z(z)
        z = self.dropout1_z(z)

        z = self.conv2_z(z)
        z = self.lr2_z(z)
        z = self.dropout2_z(z)

        # Fusione di x e z
        y = tf.concat([x, z], axis=-1)
        y = self.conv_fuse(y)
        y = self.relu_fuse(y)
        y = self.dropout_fuse(y)

        intermediate_layer = y

        y = self.conv_final(y)
        logits = tf.squeeze(y, [1, 2])

        return logits, intermediate_layer


In [15]:
class DiscriminatorXX_32(tf.keras.Model):
    def __init__(self):
        super(DiscriminatorXX_32, self).__init__()

        # Definizione dei layer
        self.concat_layer = layers.Concatenate(axis = 1)

        self.conv1 = SpectralNormalization(layers.Conv2D(64, kernel_size=5, kernel_initializer = init_kernel,strides=2, padding='SAME'))
        self.relu1 = layers.LeakyReLU(alpha=0.2)
        self.dropout1 = layers.Dropout(rate=0.2)

        self.conv2 = SpectralNormalization(layers.Conv2D(128, kernel_size=5, kernel_initializer = init_kernel,strides=2, padding='SAME'))
        self.relu2 = layers.LeakyReLU(alpha=0.2)
        self.dropout2 = layers.Dropout(rate=0.2)

        self.flatten = layers.Flatten()
        self.dense = layers.Dense(1)

    def call(self, inputs):
        x, rec_x = inputs

        # Concatenazione di x e rec_x
        x = self.concat_layer([x, rec_x])

        # Passaggio attraverso il primo strato convoluzionale
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.dropout1(x)

        # Passaggio attraverso il secondo strato convoluzionale
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.dropout2(x)


        # Appiattimento e passaggio attraverso il layer denso
        x = self.flatten(x)
        intermediate_layer = x
        logits = self.dense(x)

        return logits, intermediate_layer

In [16]:
class DiscriminatorZZ_32(tf.keras.Model):
    def __init__(self):
        super(DiscriminatorZZ_32, self).__init__()

        # Definizione dei layer
        self.concat_layer = layers.Concatenate(axis=-1)

        self.dense1 = layers.Dense(64,kernel_initializer = init_kernel)
        self.relu1 = layers.LeakyReLU()
        self.dropout1 = layers.Dropout(rate=0.2)

        self.dense2 = layers.Dense(32,kernel_initializer = init_kernel)
        self.relu2 = layers.LeakyReLU()
        self.dropout2 = layers.Dropout(rate=0.2)

        self.dense3 = layers.Dense(1)

    def call(self, inputs):
        z, rec_z = inputs

        # Concatenazione di z e rec_z
        y = self.concat_layer([z, rec_z])

        # Passaggio attraverso il primo strato denso
        y = self.dense1(y)
        y = self.relu1(y)
        y = self.dropout1(y)

        # Passaggio attraverso il secondo strato denso
        y = self.dense2(y)
        y = self.relu2(y)
        y = self.dropout2(y)

        intermediate_layer = y

        # Passaggio attraverso il terzo strato denso
        y = self.dense3(y)
        logits = y

        return logits, intermediate_layer

# Architecture ALAD (64x64)

In [17]:
class Encoder_64(tf.keras.Model):

    def __init__(self, latent_dim):
        super(Encoder_64, self).__init__()
        self.latent_dim = latent_dim
        self.conv1 = SpectralNormalization(layers.Conv2D(64, kernel_size=4,kernel_initializer = init_kernel, strides=2, padding='SAME'))
        self.bn1 = layers.BatchNormalization()
        self.lr1 = layers.LeakyReLU(alpha=0.2)

        self.conv2 = SpectralNormalization(layers.Conv2D(128, kernel_size=4,kernel_initializer = init_kernel, strides=2, padding='SAME'))
        self.bn2 = layers.BatchNormalization()
        self.lr2 = layers.LeakyReLU(alpha=0.2)

        self.conv3 = SpectralNormalization(layers.Conv2D(256, kernel_size=4,kernel_initializer = init_kernel, strides=2, padding='SAME'))
        self.bn3 = layers.BatchNormalization()
        self.lr3 = layers.LeakyReLU(alpha=0.2)

        self.conv4 = SpectralNormalization(layers.Conv2D(512, kernel_size=4,kernel_initializer = init_kernel, strides=2, padding='SAME'))
        self.bn4 = layers.BatchNormalization()
        self.lr4 = layers.LeakyReLU(alpha=0.2)

        self.conv5 = SpectralNormalization(layers.Conv2D(latent_dim, kernel_size=4,kernel_initializer = init_kernel, strides=2, padding='VALID'))

        self.leaky_relu = layers.LeakyReLU(alpha=0.2)

    def call(self, x_inp):
        x = tf.reshape(x_inp, [-1, 64, 64, 3])
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.lr1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.lr2(x)
        x = self.conv3(x)
        x = self.bn3(x)
        x = self.lr3(x)
        x = self.conv4(x)
        x = self.bn4(x)
        x = self.lr4(x)
        x = self.conv5(x)

        x = tf.squeeze(x,[1, 2])
        return x

In [18]:
class Generator_64(tf.keras.Model):
    def __init__(self, latent_dim):
        super(Generator_64, self).__init__()
        self.latent_dim = latent_dim
        
        self.conv1 = layers.Conv2DTranspose(filters=512, kernel_size=4,kernel_initializer = init_kernel, strides=2, padding='VALID')
        self.bn1 = layers.BatchNormalization()
        self.relu1 = layers.ReLU()

        self.conv2 = layers.Conv2DTranspose(filters=256, kernel_size=4,kernel_initializer = init_kernel, strides=2, padding='SAME')
        self.bn2 = layers.BatchNormalization()
        self.relu2 = layers.ReLU()

        self.conv3 = layers.Conv2DTranspose(filters=128, kernel_size=4,kernel_initializer = init_kernel, strides=2, padding='SAME')
        self.bn3 = layers.BatchNormalization()
        self.relu3 = layers.ReLU()

        self.conv4 = layers.Conv2DTranspose(filters=64, kernel_size=4,kernel_initializer = init_kernel, strides=2, padding='SAME')
        self.bn4 = layers.BatchNormalization()
        self.relu4 = layers.ReLU()

        self.conv5 = layers.Conv2DTranspose(filters=3, kernel_size= 4,kernel_initializer = init_kernel, strides = 2, padding='SAME')

        self.tanh = layers.Activation('tanh')

    def call(self, x_inp):
        x_inp = tf.reshape(x_inp, [-1, 1, 1, self.latent_dim])
        x = self.conv1(x_inp)
        x = self.bn1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)
        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu3(x)
        x = self.conv4(x)
        x = self.bn4(x)
        x = self.relu4(x)
        x = self.conv5(x)
        x = self.tanh(x)

        return x

In [19]:
class DiscriminatorXZ_64(tf.keras.Model):
    def __init__(self, latent_dim):
        super(DiscriminatorXZ_64, self).__init__()
        self.latent_dim = latent_dim

        # Definizione dei layer per x
        self.conv1_x = SpectralNormalization(layers.Conv2D(64, kernel_size=4,kernel_initializer = init_kernel, strides=2, padding='SAME'))
        self.bn1_x = layers.BatchNormalization()
        self.lr1_x = layers.LeakyReLU(alpha=0.2)

        self.conv2_x = SpectralNormalization(layers.Conv2D(128, kernel_size=4,kernel_initializer = init_kernel, strides=2, padding='SAME'))
        self.bn2_x = layers.BatchNormalization()
        self.lr2_x = layers.LeakyReLU(alpha=0.2)

        self.conv3_x = SpectralNormalization(layers.Conv2D(256, kernel_size=4,kernel_initializer = init_kernel, strides=2, padding='SAME'))
        self.bn3_x = layers.BatchNormalization()
        self.lr3_x = layers.LeakyReLU(alpha=0.2)

        self.conv4_x = SpectralNormalization(layers.Conv2D(512, kernel_size=4,kernel_initializer = init_kernel, strides=2, padding='SAME'))
        self.bn4_x = layers.BatchNormalization()
        self.lr4_x = layers.LeakyReLU(alpha=0.2)


        # Definizione dei layer per z
        self.conv1_z = SpectralNormalization(layers.Conv2D(512, kernel_size=1,kernel_initializer = init_kernel, strides=1, padding='SAME'))
        self.lr1_z = layers.LeakyReLU()
        self.dropout1_z = layers.Dropout(rate=0.2)

        self.conv2_z = SpectralNormalization(layers.Conv2D(512, kernel_size=1,kernel_initializer = init_kernel, strides=1, padding='SAME'))
        self.lr2_z = layers.LeakyReLU()
        self.dropout2_z = layers.Dropout(rate=0.2)

        # Definizione del layer di fusione
        self.conv_fuse = SpectralNormalization(layers.Conv2D(1024, kernel_size=1,kernel_initializer = init_kernel, strides=1, padding='SAME'))
        self.relu_fuse = layers.LeakyReLU()
        self.dropout_fuse = layers.Dropout(rate=0.2)

        # Definizione del layer finale
        self.conv_final = SpectralNormalization(layers.Conv2D(1, kernel_size=1,kernel_initializer = init_kernel, strides=1, padding='SAME'))

    def call(self, inputs):
        x_inp,z_inp = inputs
        # Passaggio per x
        x = self.conv1_x(x_inp)
        x = self.bn1_x(x)
        x = self.lr1_x(x)

        x = self.conv2_x(x)
        x = self.bn2_x(x)
        x = self.lr2_x(x)

        x = self.conv3_x(x)
        x = self.bn3_x(x)
        x = self.lr3_x(x)

        x = self.conv4_x(x)
        x = self.bn4_x(x)
        x = self.lr4_x(x)


        x = tf.reshape(x, [-1, 1, 1, 512*4*4])

        # Passaggio per z
        z_inp = tf.reshape(z_inp, [-1, 1, 1, self.latent_dim])

        z = self.conv1_z(z_inp)
        z = self.lr1_z(z)
        z = self.dropout1_z(z)

        z = self.conv2_z(z)
        z = self.lr2_z(z)
        z = self.dropout2_z(z)


        # Fusione di x e z
        y = tf.concat([x, z], axis=-1)
        y = self.conv_fuse(y)
        y = self.relu_fuse(y)
        y = self.dropout_fuse(y)

        intermediate_layer = y

        # Passaggio finale
        y = self.conv_final(y)
        logits = tf.squeeze(y, [1, 2])


        return logits, intermediate_layer


In [20]:
class DiscriminatorXX_64(tf.keras.Model):
    def __init__(self):
        super(DiscriminatorXX_64, self).__init__()

        
        self.concat_layer = layers.Concatenate(axis=1)

        self.conv1 = SpectralNormalization(layers.Conv2D(64, kernel_size=5,kernel_initializer = init_kernel, strides=2, padding='SAME'))
        self.relu1 = layers.LeakyReLU(alpha=0.2)
        self.dropout1 = layers.Dropout(rate=0.2)

        self.conv2 = SpectralNormalization(layers.Conv2D(128, kernel_size=5,kernel_initializer = init_kernel, strides=2, padding='SAME'))
        self.relu2 = layers.LeakyReLU(alpha=0.2)
        self.dropout2 = layers.Dropout(rate=0.2)


        self.flatten = layers.Flatten()
        self.dense = layers.Dense(1)

    def call(self, inputs):
        x, rec_x = inputs

        # Concatenazione di x e rec_x
        x = self.concat_layer([x, rec_x])

        # Passaggio attraverso il primo strato convoluzionale
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.dropout1(x)

        # Passaggio attraverso il secondo strato convoluzionale
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.dropout2(x)


        # Appiattimento e passaggio attraverso il layer denso
        x = self.flatten(x)
        intermediate_layer = x
        logits = self.dense(x)


        return logits, intermediate_layer

In [21]:
class DiscriminatorZZ_64(tf.keras.Model):
    def __init__(self):
        super(DiscriminatorZZ_64, self).__init__()

        # Definizione dei layer
        self.concat_layer = layers.Concatenate(axis=-1)

        self.dense1 = layers.Dense(64,kernel_initializer = init_kernel)
        self.relu1 = layers.LeakyReLU()
        self.dropout1 = layers.Dropout(rate=0.2)

        self.dense2 = layers.Dense(32,kernel_initializer = init_kernel)
        self.relu2 = layers.LeakyReLU()
        self.dropout2 = layers.Dropout(rate=0.2)

        self.dense3 = layers.Dense(1)

    def call(self, inputs):
        z, rec_z = inputs

        # Concatenazione di z e rec_z
        y = self.concat_layer([z, rec_z])

        # Passaggio attraverso il primo strato denso
        y = self.dense1(y)
        y = self.relu1(y)
        y = self.dropout1(y)

        # Passaggio attraverso il secondo strato denso
        y = self.dense2(y)
        y = self.relu2(y)
        y = self.dropout2(y)


        intermediate_layer = y

        # Passaggio attraverso il terzo strato denso
        y = self.dense3(y)
        logits = y


        return logits, intermediate_layer

# Training utilities

In [22]:
latent_dim = LATENT_DIM

In [23]:
def init_models(mode = MODE, latent_dim = latent_dim):
  if mode == 32:
    encoder = Encoder_32(latent_dim)
    generator = Generator_32(latent_dim)
    discriminator_xz = DiscriminatorXZ_32(latent_dim)
    discriminator_xx = DiscriminatorXX_32()
    discriminator_zz = DiscriminatorZZ_32()

  else:
    encoder = Encoder_64(latent_dim)
    generator = Generator_64(latent_dim)
    discriminator_xz = DiscriminatorXZ_64(latent_dim)
    discriminator_xx = DiscriminatorXX_64()
    discriminator_zz = DiscriminatorZZ_64()

  return encoder, generator, discriminator_xz, discriminator_xx, discriminator_zz

In [24]:
encoder, generator, discriminator_xz, discriminator_xx, discriminator_zz = init_models()

Optimizers

In [25]:
discriminator_xz_optimizer = Adam(learning_rate=0.0002, beta_1=0.5)

discriminator_xx_optimizer = Adam(learning_rate=0.0002, beta_1=0.5)

discriminator_zz_optimizer = Adam(learning_rate=0.0002, beta_1=0.5)

generator_optimizer = Adam(learning_rate=0.0002, beta_1=0.5)

encoder_optimizer = Adam(learning_rate=0.0002, beta_1=0.5)

Loss

In [26]:
def discriminator_xz_loss(l_encoder, l_generator):
    loss_dis_enc = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(
          labels=tf.ones_like(l_encoder), logits=l_encoder))
    loss_dis_gen = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(
          labels=tf.zeros_like(l_generator), logits=l_generator))

    dis_loss_xz = loss_dis_gen + loss_dis_enc

    return dis_loss_xz


def discriminator_xx_loss(x_logit_real, x_logit_fake):
    x_real_dis = tf.nn.sigmoid_cross_entropy_with_logits(
          logits=x_logit_real, labels=tf.ones_like(x_logit_real))
    x_fake_dis = tf.nn.sigmoid_cross_entropy_with_logits(
            logits=x_logit_fake, labels=tf.zeros_like(x_logit_fake))
    dis_loss_xx = tf.reduce_mean(x_real_dis + x_fake_dis)

    return dis_loss_xx


def discriminator_zz_loss(z_logit_real, z_logit_fake):
    z_real_dis = tf.nn.sigmoid_cross_entropy_with_logits(
          logits=z_logit_real, labels=tf.ones_like(z_logit_real))
    z_fake_dis = tf.nn.sigmoid_cross_entropy_with_logits(
          logits=z_logit_fake, labels=tf.zeros_like(z_logit_fake))

    dis_loss_zz = tf.reduce_mean(z_real_dis + z_fake_dis)

    return dis_loss_zz


def cycle_consistency_loss(x_logit_real,x_logit_fake,z_logit_real,z_logit_fake):

    x_real_gen = tf.nn.sigmoid_cross_entropy_with_logits(
          logits=x_logit_real, labels=tf.zeros_like(x_logit_real))

    x_fake_gen = tf.nn.sigmoid_cross_entropy_with_logits(
            logits=x_logit_fake, labels=tf.ones_like(x_logit_fake))

    z_real_gen = tf.nn.sigmoid_cross_entropy_with_logits(
            logits=z_logit_real, labels=tf.zeros_like(z_logit_real))

    z_fake_gen = tf.nn.sigmoid_cross_entropy_with_logits(
            logits=z_logit_fake, labels=tf.ones_like(z_logit_fake))

    cost_x = tf.reduce_mean(x_real_gen + x_fake_gen)
    cost_z = tf.reduce_mean(z_real_gen + z_fake_gen)

    cycle_consistency_loss = cost_x + cost_z

    return cycle_consistency_loss


def loss_gen(l_generator,cycle_loss):

    gen_loss_xz = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(
        labels=tf.ones_like(l_generator), logits=l_generator))

    loss_generator = gen_loss_xz + cycle_loss

    return loss_generator



def loss_enc(l_encoder,cycle_loss):

    enc_loss_xz = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(
          labels=tf.zeros_like(l_encoder), logits=l_encoder))

    loss_encoder = enc_loss_xz + cycle_loss

    return loss_encoder

In [27]:
@tf.function
def apply_gradients_enc(gradients):
    encoder_optimizer.apply_gradients(zip(gradients, encoder.trainable_variables))


@tf.function
def apply_gradients_gen(gradients):
    generator_optimizer.apply_gradients(zip(gradients, generator.trainable_variables))


@tf.function
def apply_gradients_dxz(gradients):
    discriminator_xz_optimizer.apply_gradients(zip(gradients, discriminator_xz.trainable_variables))


@tf.function
def apply_gradients_dxx(gradients):
    discriminator_xx_optimizer.apply_gradients(zip(gradients, discriminator_xx.trainable_variables))


@tf.function
def apply_gradients_dzz(gradients):
    discriminator_zz_optimizer.apply_gradients(zip(gradients, discriminator_zz.trainable_variables))


In [28]:
def init_train(x_train,batch_size,ran_from,ran_to,latent_dim):
  z = np.random.normal(size = [batch_size, latent_dim])
  x = x_train[ran_from:ran_to]

  z_gen = encoder(x, training = True)
  x_gen = generator(z,training = True)

  rec_x = generator(z_gen, training=True)
  rec_z = encoder(x_gen, training = True)

  l_encoder, _ = discriminator_xz([x,z_gen],training = True)
  l_generator, _ = discriminator_xz([x_gen, z],training = True)

  x_logit_real, _ = discriminator_xx([x, x],training = True)
  x_logit_fake, _ = discriminator_xx([x, rec_x],training = True)

  z_logit_real, _ = discriminator_zz([z, z],training = True)
  z_logit_fake, _ = discriminator_zz([z, rec_z],training = True)

  return  l_encoder, l_generator, x_logit_real, x_logit_fake, z_logit_real, z_logit_fake


def train_step(x_train,batch_size,ran_from,ran_to,latent_dim):

  with tf.GradientTape(persistent=True) as disc_tape:

      l_encoder, l_generator, x_logit_real, x_logit_fake, z_logit_real, z_logit_fake = init_train(x_train,batch_size,ran_from,ran_to,latent_dim)

      # Discriminator loss

      disc_xz_loss = discriminator_xz_loss(l_encoder, l_generator)
      disc_xx_loss = discriminator_xx_loss(x_logit_real, x_logit_fake)
      disc_zz_loss = discriminator_zz_loss(z_logit_real, z_logit_fake)
      disc_loss = disc_xz_loss + disc_xx_loss + disc_zz_loss

  # Gradient Update

  disc_xz_gradients = disc_tape.gradient(disc_xz_loss, discriminator_xz.trainable_variables)
  disc_xx_gradients = disc_tape.gradient(disc_xx_loss, discriminator_xx.trainable_variables)
  disc_zz_gradients = disc_tape.gradient(disc_zz_loss, discriminator_zz.trainable_variables)

  apply_gradients_dxz(disc_xz_gradients)
  apply_gradients_dxx(disc_xx_gradients)
  apply_gradients_dzz(disc_zz_gradients)


  with tf.GradientTape(persistent = True) as recon_tape:

      l_encoder, l_generator, x_logit_real, x_logit_fake, z_logit_real, z_logit_fake = init_train(x_train, batch_size, ran_from, ran_to, latent_dim)
      # Reconstruction loss

      cycle_loss = cycle_consistency_loss(x_logit_real,x_logit_fake,z_logit_real,z_logit_fake)
      encoder_loss = loss_enc(l_encoder,cycle_loss)
      generator_loss = loss_gen(l_generator,cycle_loss)

  # Gradient update

  encoder_gradients = recon_tape.gradient(encoder_loss, encoder.trainable_variables)
  generator_gradients = recon_tape.gradient(generator_loss, generator.trainable_variables)

  apply_gradients_enc(encoder_gradients)
  apply_gradients_gen(generator_gradients)


  return encoder_loss, generator_loss,disc_loss, disc_xz_loss, disc_xx_loss, disc_zz_loss

# Calcola l'errore di ricostruzione medio sul Validation Set
def val_recon_error(x_val):

  recon_err = []
  for x in x_val:
      z_gen_ema = encoder(x, training= False)
      rec_x_ema = generator(z_gen_ema, training= False)

      rec = x - rec_x_ema
      flattened_rec = tf.reshape(rec, [-1])

      score_l2 = tf.norm(flattened_rec, ord=2)
      recon_err.append(score_l2)

  return np.mean(recon_err)


In [29]:
# Dummy train step per inizializzare i pesi della rete
rf = 0
rt = batch_size

train_step(tf.ones([batch_size,img_w,img_h,3]),batch_size,rf, rt, latent_dim)

(<tf.Tensor: shape=(), dtype=float32, numpy=13.190414>,
 <tf.Tensor: shape=(), dtype=float32, numpy=11.364692>,
 <tf.Tensor: shape=(), dtype=float32, numpy=3.9731746>,
 <tf.Tensor: shape=(), dtype=float32, numpy=1.2315513>,
 <tf.Tensor: shape=(), dtype=float32, numpy=1.3552995>,
 <tf.Tensor: shape=(), dtype=float32, numpy=1.3863239>)

In [30]:
def EMA_init(ema_v, model):
    ema_v.apply(model.trainable_variables)
    averages_model = {}
    for tv in model.trainable_variables:
        averages_model[tv.name] = ema_v.average(tv)
    return averages_model

def update_EMA(ema_encoder, ema_generator, ema_discriminator_xz, ema_discriminator_xx, ema_discriminator_zz):
    ema_encoder.apply(encoder.trainable_variables)
    ema_generator.apply(generator.trainable_variables)
    ema_discriminator_xz.apply(discriminator_xz.trainable_variables)
    ema_discriminator_xx.apply(discriminator_xx.trainable_variables)
    ema_discriminator_zz.apply(discriminator_zz.trainable_variables)

def retrieve_EMA_weights(ema_v,model):
  # Carica le medie mobili esponenziali nel modello
    for var in model.trainable_variables:
      var.assign(var, ema_v.average(var))

def init_weight_EMA(encoder, generator, discriminator_xz, discriminator_xx, discriminator_zz,ema_decay = 0.999):
  # Creare un oggetto ExponentialMovingAverage per ogni modello
  ema_encoder = tf.train.ExponentialMovingAverage(decay=ema_decay)
  ema_generator = tf.train.ExponentialMovingAverage(decay=ema_decay)
  ema_discriminator_xz = tf.train.ExponentialMovingAverage(decay=ema_decay)
  ema_discriminator_xx = tf.train.ExponentialMovingAverage(decay=ema_decay)
  ema_discriminator_zz = tf.train.ExponentialMovingAverage(decay=ema_decay)

  average_encoder = EMA_init(ema_encoder,encoder)
  checkpoint_encoder = tf.train.Checkpoint(averaged_weights=average_encoder)

  average_generator = EMA_init(ema_generator,generator)
  checkpoint_generator = tf.train.Checkpoint(averaged_weights=average_generator)

  average_disc_xz = EMA_init(ema_discriminator_xz,discriminator_xz)
  checkpoint_disc_xz = tf.train.Checkpoint(averaged_weights=average_disc_xz)

  average_disc_xx = EMA_init(ema_discriminator_xx,discriminator_xx)
  checkpoint_disc_xx = tf.train.Checkpoint(averaged_weights=average_disc_xx)

  average_disc_zz = EMA_init(ema_discriminator_zz,discriminator_zz)
  checkpoint_disc_zz = tf.train.Checkpoint(averaged_weights=average_disc_zz)

  return ema_encoder, ema_generator, ema_discriminator_xz, ema_discriminator_xx, ema_discriminator_zz, checkpoint_encoder, checkpoint_generator, checkpoint_disc_xz, checkpoint_disc_xx, checkpoint_disc_zz


In [31]:
ema_encoder,\
ema_generator,\
ema_discriminator_xz,\
ema_discriminator_xx,\
ema_discriminator_zz,\
checkpoint_encoder,\
checkpoint_generator,\
checkpoint_disc_xz,\
checkpoint_disc_xx,\
checkpoint_disc_zz = init_weight_EMA(encoder, generator, discriminator_xz, discriminator_xx, discriminator_zz)

In [32]:
def save_model(model,filename):
  model_path = os.path.join(folder_path,f'Models/{filename}')
  model.save_weights(model_path)

def save_average(checkpoint,foldername):
  path_ema = os.path.join(folder_path,f'{foldername}/')
  checkpoint.save(path_ema)


def load_w(folder_path, encoder, generator, discriminator_xz, discriminator_xx, discriminator_zz):
    model_path = os.path.join(folder_path,'Models')
    encoder.load_weights(os.path.join(model_path,'Encoder'))
    generator.load_weights(os.path.join(model_path,'Generator'))
    discriminator_xz.load_weights(os.path.join(model_path,'DiscriminatorXZ'))
    discriminator_xx.load_weights(os.path.join(model_path,'DiscriminatorXX'))
    discriminator_zz.load_weights(os.path.join(model_path,'DiscriminatorZZ'))

def load_average(folder_path, checkpoint_encoder, checkpoint_generator, checkpoint_disc_xz, checkpoint_disc_xx, checkpoint_disc_zz):
   checkpoint_encoder.restore(tf.train.latest_checkpoint(os.path.join(folder_path,'EMA_enc_w')))
   checkpoint_generator.restore(tf.train.latest_checkpoint(os.path.join(folder_path,'EMA_gen_w')))
   checkpoint_disc_xz.restore(tf.train.latest_checkpoint(os.path.join(folder_path,'EMA_xz_w')))
   checkpoint_disc_xx.restore(tf.train.latest_checkpoint(os.path.join(folder_path,'EMA_xx_w')))
   checkpoint_disc_zz.restore(tf.train.latest_checkpoint(os.path.join(folder_path,'EMA_zz_w')))

In [33]:
# Mostra e salva le immagini originali vs quelle ricostruite durante la fase di training
def plot_combined_images(data, title, index, save, n):
    random_index = np.random.choice(index.copy(), n, replace=False)
    fig, axes = plt.subplots(2, n, figsize=(8, 8))
    for i, idx in enumerate(random_index):
        # Plot immagini originali
        ax = axes[0, i]
        image = (data[idx] + 1) * 127.5
        image = np.clip(image, 0, 255).astype(np.uint8)
        ax.imshow(image)
        ax.set_xticks([])  
        ax.set_yticks([])

        if n > 5:
            axes[0, 0].set_title('')

        else:
            ax.set_title(f"Original {i+1}")

        # Plot immagini ricostruite
        ax = axes[1, i]
        t = tf.convert_to_tensor(data[idx], dtype=tf.float32)
        x_enc = encoder(t)
        x_recon = generator(x_enc)
        image = np.squeeze(x_recon)
        image = (image + 1) * 127.5
        image = np.clip(image, 0, 255).astype(np.uint8)
        ax.imshow(image)
        ax.set_xticks([])
        ax.set_yticks([])

        if n > 5:
            axes[1, 0].set_title('')
        else:
            ax.set_title(f"Reconstructed {i+1}")

    plt.tight_layout()  # Ottimizza la disposizione delle immagini
    if save:
        image_path = os.path.join(folder_path+'/Rec_Img','Epoch_'+ title + '_OriginalvsRecon.png')
        plt.savefig(image_path)
    plt.show()


In [34]:
# Salva i risultati su un file csv
def append_reconstruction_error_csv(epoch, error, file_path):
    df = pd.DataFrame([(epoch, error)], columns=['Epoch', 'Reconstruction Error'])
    if not os.path.isfile(file_path):
        df.to_csv(file_path, index=False)
    else:
        df.to_csv(file_path, mode='a', header=False, index=False)

In [35]:
# Plotting del grafico che mostra l'andamento dell'errore di ricostruzione
def plot_recon_error(file_path):
  df = pd.read_csv(file_path)

  plt.plot(df['Epoch'], df['Reconstruction Error'], marker='o')
  plt.xlabel('Epoch')
  plt.ylabel('Reconstruction Error')
  plt.title('Reconstruction Error Over Epochs')
  plt.grid(True)
  plt.show()


# Training

In [None]:
num_epochs = 100
nr_batches_train = int(x_adapted.shape[0] / batch_size)
run = 0
best_recon_error = float("inf")
error_log_file = os.path.join(folder_path,'reconstruction_errors.csv')

if CHECKPOINT:
    print('Recover Models...')
    run += (100 - num_epochs)
    load_w(folder_path, encoder, generator, discriminator_xz, discriminator_xx, discriminator_zz)
    if EMA:
      print('Recover EMA averages...')
      load_average(folder_path, checkpoint_encoder, checkpoint_generator, checkpoint_disc_xz, checkpoint_disc_xx, checkpoint_disc_zz )
      retrieve_EMA_weights(ema_encoder,encoder)
      retrieve_EMA_weights(ema_generator,generator)
      retrieve_EMA_weights(ema_discriminator_xx,discriminator_xx)
      retrieve_EMA_weights(ema_discriminator_xz,discriminator_xz)
      retrieve_EMA_weights(ema_discriminator_zz,discriminator_zz)

    best_recon_error = val_recon_error(x_val_adapted)
    print(f'Best Reconstruction Error was {best_recon_error}')

for epoch in range(num_epochs):

        print(f'Epoch {epoch+1}/{num_epochs}')

        for i in tqdm_notebook(range(nr_batches_train)):
            ran_from = i * batch_size
            ran_to = (i + 1) * batch_size

            batch_size_current_iter = batch_size

            encoder_loss, generator_loss,disc_loss, disc_xz_loss, disc_xx_loss, disc_zz_loss = train_step(x_adapted, batch_size, ran_from, ran_to, latent_dim)


        generator_loss /= nr_batches_train
        encoder_loss /= nr_batches_train
        disc_loss /= nr_batches_train
        disc_xz_loss /= nr_batches_train
        disc_xx_loss /= nr_batches_train
        disc_zz_loss /= nr_batches_train


        print("Epoch %d | loss gen = %.4f | loss enc = %.4f | loss_disc = %.4f | loss dis xz = %.4f | loss dis xx = %.4f | loss dis zz = %.4f | "% ((epoch+1), generator_loss,encoder_loss,disc_loss, disc_xz_loss, disc_xx_loss,disc_zz_loss))

        # Update EMA
        if EMA:
          update_EMA(ema_encoder, ema_generator, ema_discriminator_xz, ema_discriminator_xx, ema_discriminator_zz)



        if (((epoch+1) % 5) == 0):

            recon_error = val_recon_error(x_val_adapted)
            append_reconstruction_error_csv(epoch+run+1, recon_error, error_log_file)

            plot_combined_images(x_val_new, ''+str(epoch+run+1), indici_normal_val, save = True, n = 5)

            print("Reconstruction error on Validation = %.4f"%(recon_error))

            if recon_error <= best_recon_error:

                best_recon_error = recon_error

                print("Saving Best Model...")

                save_model(encoder,'Encoder')
                save_model(generator,'Generator')
                save_model(discriminator_xz,'DiscriminatorXZ')
                save_model(discriminator_xx,'DiscriminatorXX')
                save_model(discriminator_zz,'DiscriminatorZZ')

                if EMA:
                    print('Save EMA averages...')
                    save_average(checkpoint_encoder,'EMA_enc_w')
                    save_average(checkpoint_generator,'EMA_gen_w')
                    save_average(checkpoint_disc_xz,'EMA_xz_w')
                    save_average(checkpoint_disc_xx,'EMA_xx_w')
                    save_average(checkpoint_disc_zz,'EMA_zz_w')


plot_recon_error(error_log_file)

# Testing utilities

In [40]:
def test_scores(x_test, encoder, generator, discriminator_xx):

  scores_l1 = []
  scores_l2 = []
  scores_fm = []
  scores_ch = []


  for x in x_test:
      x = tf.expand_dims(x, axis=0)

      z_gen_ema = encoder(x, training= False)
      rec_x_ema = generator(z_gen_ema, training= False)

      input_x = [x,x]
      input_rec_x = [x,rec_x_ema]

      l_encoder_emaxx, inter_layer_inp_emaxx = discriminator_xx(input_x, training= False)
      l_generator_emaxx, inter_layer_rct_emaxx = discriminator_xx(input_rec_x, training= False)


      score_ch = tf.nn.sigmoid_cross_entropy_with_logits(
                        labels=tf.ones_like(l_generator_emaxx),
                        logits=l_generator_emaxx)


      rec = x - rec_x_ema
      flattened_rec = tf.reshape(rec, [-1])

      score_l1 = tf.norm(flattened_rec, ord=1)

      score_l2 = tf.norm(flattened_rec, ord=2)

      inter_layer_inp, inter_layer_rct = inter_layer_inp_emaxx, inter_layer_rct_emaxx

      fm = inter_layer_inp - inter_layer_rct

      flattened_fm = tf.reshape(fm, [-1])

      score_fm = tf.norm(flattened_fm, ord= 1)

      scores_l1.append(tf.reshape(score_l1, [-1]).numpy())
      scores_l2.append(tf.reshape(score_l2, [-1]).numpy())
      scores_fm.append(tf.reshape(score_fm, [-1]).numpy())
      scores_ch.append(tf.reshape(score_ch, [-1]).numpy())

  return scores_l1, scores_l2, scores_fm, scores_ch


In [41]:
def load_model(f_path, latent_dim):
  encoder, generator, discriminator_xz, discriminator_xx, discriminator_zz = init_models(latent_dim = latent_dim)
  load_w(f_path, encoder, generator, discriminator_xz, discriminator_xx, discriminator_zz)

  if EMA:
    ema_encoder,\
    ema_generator,\
    ema_discriminator_xz,\
    ema_discriminator_xx,\
    ema_discriminator_zz,\
    checkpoint_encoder,\
    checkpoint_generator,\
    checkpoint_disc_xz,\
    checkpoint_disc_xx,\
    checkpoint_disc_zz = init_weight_EMA(encoder, generator, discriminator_xz, discriminator_xx, discriminator_zz)

    load_average(f_path, checkpoint_encoder, checkpoint_generator, checkpoint_disc_xz, checkpoint_disc_xx, checkpoint_disc_zz)
    retrieve_EMA_weights(ema_encoder,encoder)
    retrieve_EMA_weights(ema_generator,generator)
    retrieve_EMA_weights(ema_discriminator_xx,discriminator_xx)

  return encoder, generator, discriminator_xx


def save_scores(models, lv, output_file):
    all_data =[]
    for m,ld in zip(models,lv):
        
        #COLAB
        #path = os.path.join('./drive/My Drive/',m)
        
        path = os.path.join('./models/',m)
        
        encoder, generator, discriminator_xx = load_model(path, ld)

        print(f'Calcolo gli score per il modello{m}')

        scores_l1, scores_l2, scores_fm, scores_ch = test_scores(x_test_new, encoder, generator, discriminator_xx)
        mre_l2 = np.mean(scores_l2)
        mre_fm = np.mean(scores_fm)
        roc_auc_l1 = roc_auc_score(y_test_new, scores_l1)
        roc_auc_l2 = roc_auc_score(y_test_new, scores_l2)
        roc_auc_fm = roc_auc_score(y_test_new, scores_fm)
        roc_auc_ch = roc_auc_score(y_test_new, scores_ch)

        data = {
            'model_name': m,
            'latent_dim': ld,
            'mean_recon_error(l2)': mre_l2,
            'mean_recon_error(fm)': mre_fm,
            'roc_l1': roc_auc_l1,
            'roc_l2': roc_auc_l2,
            'roc_fm': roc_auc_fm,
            'roc_ch': roc_auc_ch
        }

        all_data.append(data)

    df = pd.DataFrame(all_data)

    if os.path.isfile(output_file):
        # Se il file esiste, aggiungi i nuovi dati al file esistente
        df_existing = pd.read_csv(output_file)
        df_combined = pd.concat([df_existing, df], ignore_index=True)
        df_combined.to_csv(output_file, index=False)
    else:
        # Se il file non esiste, crea un nuovo file
        df.to_csv(output_file, index=False)


# Testing by different latent dimension

In [None]:
TEST_ALL = True

In [None]:
def normalize_column(column):
    norm = np.linalg.norm(column)
    return column / norm


def load_scores(f):
  df = pd.read_csv(f)
  df['mean_recon_error(l2)_normalized'] = normalize_column(df['mean_recon_error(l2)'])
  df['mean_recon_error(fm)_normalized'] = normalize_column(df['mean_recon_error(fm)'])
  return df

# Si caricano gli scores per i modelli allenati secondo diversa dimensione latente
def test_all(scores_file,df,models,lv):

  if os.path.isfile(scores_file):
    print('File presente, carico i risultati...')
    df = load_scores(scores_file)
  else:
    print('File non presente, calcolo gli scores per ogni modello...')
    save_scores(models, lv, scores_file)
    df = load_scores(scores_file)
  
  return df

In [None]:
def plot_mre(df):
  # Creazione del grafico a linee per Mean Reconstruction Errors
  plt.figure(figsize=(10, 6))
  plt.plot(df['latent_dim'], df['mean_recon_error(l2)_normalized'], marker='o', label='Mean Recon Error (L2)')
  plt.plot(df['latent_dim'], df['mean_recon_error(fm)_normalized'], marker='s', label='Mean Recon Error (FM)')
  plt.xlabel('Latent Dimension')
  plt.ylabel('Mean Reconstruction Error')
  plt.title('Mean Reconstruction Errors for Different Models')
  plt.legend()
  plt.grid(True)

  
  plt.xticks(df['latent_dim'])

  plt.tight_layout()

  #COLAB
  #image_path = os.path.join('./drive/My Drive/',f'mre_all_{TRAIN_LABEL}.png')

  image_path = os.path.join('./evaluation/',f'mre_all_{TRAIN_LABEL}.png')
  plt.savefig(image_path)
  plt.show()

In [None]:
def add_value_labels(bars,ax):
    for bar in bars:
        height = bar.get_height()
        ax.annotate(f'{height:.2f}',
        xy=(bar.get_x() + bar.get_width() / 2, height),
        xytext=(0, 3),  # 3 punti di offset verticale
        textcoords="offset points",
        ha='center', va='bottom')

In [None]:
def plot_roc(df):
  bar_width = 0.2
  x = np.arange(len(df['model_name']))

  
  fig, ax = plt.subplots(figsize=(10, 8))

  
  colors = ['lightblue', 'lightgreen', 'lightcoral', 'lightpink']

  
  bars1 = ax.bar(x - 1.5 * bar_width, df['roc_l1'], bar_width, label='ROC AUC (L1)', color=colors[0],edgecolor='black', linewidth=1)
  bars2 = ax.bar(x - 0.5 * bar_width, df['roc_l2'], bar_width, label='ROC AUC (L2)', color=colors[1],edgecolor='black', linewidth=1)
  bars3 = ax.bar(x + 0.5 * bar_width, df['roc_fm'], bar_width, label='ROC AUC (FM)', color=colors[2],edgecolor='black', linewidth=1)
  bars4 = ax.bar(x + 1.5 * bar_width, df['roc_ch'], bar_width, label='ROC AUC (CH)', color=colors[3],edgecolor='black', linewidth=1)


  
  add_value_labels(bars1,ax)
  add_value_labels(bars2,ax)
  add_value_labels(bars3,ax)
  add_value_labels(bars4,ax)

  
  ax.set_ylim(0, 1)
  ax.set_xlabel('Model')
  ax.set_ylabel('ROC AUC Score')
  ax.set_title('ROC AUC Scores for Different Models')
  ax.set_xticks(x)
  ax.set_xticklabels(df['latent_dim'])
  ax.legend()

  plt.tight_layout()

  #COLAB
  #image_path = os.path.join('./drive/My Drive/',f'roc_all_{TRAIN_LABEL}.png')

  image_path = os.path.join('./evaluation/',f'roc_all_{TRAIN_LABEL}.png')
  plt.savefig(image_path)
  plt.show()

In [None]:
if TEST_ALL:
  #COLAB
  #scores_file = os.path.join('./drive/My Drive/',f'score_{TRAIN_LABEL}.csv')
  scores_file = os.path.join('./evaluation/',f'score_{TRAIN_LABEL}.csv')

  df = pd.DataFrame()
  models = [f'Model_ALAD_64_{TRAIN_LABEL}_100',f'Model_ALAD_64_{TRAIN_LABEL}_300',f'Model_ALAD_64_{TRAIN_LABEL}_500',f'Model_ALAD_64_{TRAIN_LABEL}_800']
  lv = [100, 300, 500, 800]

  df = test_all(scores_file,df,models,lv)
  plot_mre(df)
  plot_roc(df)

# Evaluate single Model

In [None]:
scores_l1 = []
scores_l2 = []
scores_fm = []
scores_ch = []

if os.path.exists(folder_path):
  print('Caricamento modello...')
  encoder, generator, discriminator_xx = load_model(folder_path, latent_dim)
  scores_l1, scores_l2, scores_fm, scores_ch = test_scores(x_test_new, encoder, generator, discriminator_xx)
  print('Modello caricato con successo!!')

else:
  print(f'Cartella {folder_path} non trovata')

In [44]:
index_test_normal = np.where(y_test_new == 0)[0]
index_test_anomaly = np.where(y_test_new == 1)[0]

In [None]:
plot_combined_images(x_test_new, 'Test_Normal', index_test_normal, save = True,n = 10)

In [None]:
plot_combined_images(x_test_new, 'Test Anomaly', index_test_anomaly, save = True,n = 10)

In [None]:
# Calcola l'AUC-ROC per ciascuna metrica
auc_roc_l1 = roc_auc_score(y_test_new, scores_l1)
auc_roc_l2 = roc_auc_score(y_test_new, scores_l2)
auc_roc_fm = roc_auc_score(y_test_new, scores_fm)
auc_roc_ch = roc_auc_score(y_test_new, scores_ch)

print(f'AUC-ROC L1: {auc_roc_l1:.2f}')
print(f'AUC-ROC L2: {auc_roc_l2:.2f}')
print(f'AUC-ROC FM: {auc_roc_fm:.2f}')
print(f'AUC-ROC CH: {auc_roc_ch:.2f}')

# Calcola le curve ROC per ciascuna metrica
fpr_l1, tpr_l1, _ = roc_curve(y_test_new, scores_l1)
fpr_l2, tpr_l2, _ = roc_curve(y_test_new, scores_l2)
fpr_fm, tpr_fm, _ = roc_curve(y_test_new, scores_fm)
fpr_ch, tpr_ch, _ = roc_curve(y_test_new, scores_ch)

plt.figure(figsize=(14, 6))

colors = ['orange', 'green', 'coral', 'lightblue']
# Subplot per la curva ROC
plt.subplot(1, 2, 1)
plt.plot(fpr_l1, tpr_l1, color = colors[0], lw=2, label=f'L1 (AUC = {auc_roc_l1:.2f})')
plt.plot(fpr_l2, tpr_l2, color = colors[1], lw=2, label=f'L2 (AUC = {auc_roc_l2:.2f})')
plt.plot(fpr_fm, tpr_fm, color = colors[2], lw=2, label=f'FM (AUC = {auc_roc_fm:.2f})')
plt.plot(fpr_ch, tpr_ch, color = colors[3], lw=2, label=f'CH (AUC = {auc_roc_ch:.2f})')

plt.plot([0, 1], [0, 1], color='gray', lw=1, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.grid(True)

# Subplot per il grafico a barre AUC-ROC
plt.subplot(1, 2, 2)
models = ['L1', 'L2', 'FM', 'CH']
auc_roc_scores = [auc_roc_l1, auc_roc_l2, auc_roc_fm, auc_roc_ch]


bars = plt.bar(models, auc_roc_scores, color=colors,edgecolor='black', linewidth=1)

for bar, score in zip(bars, auc_roc_scores):
    plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01, f'{score:.2f}', ha='center', va='bottom')

plt.ylim([0, 1])
plt.xlabel('Scores')
plt.ylabel('AUC-ROC')
plt.title('AUC-ROC Scores')
plt.grid(axis='y', linestyle='--', alpha=0.7)

plt.tight_layout()

#COLAB
#image_path = os.path.join('./drive/My Drive/',f'roc_{TRAIN_LABEL}_{LATENT_DIM}_{MODE}.png')

image_path = os.path.join('./evaluation/',f'roc_{TRAIN_LABEL}_{LATENT_DIM}_{MODE}.png')
plt.savefig(image_path)
plt.show()