In [None]:
import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID";
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'

# <font color='red'>**Loading trained networks**</font>
## Useful functions

In [None]:
import numpy as np
from tqdm import tqdm
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.preprocessing.image import img_to_array
from numpy import asarray
import tensorflow as tf
AUTOTUNE = tf.data.AUTOTUNE
import matplotlib.pyplot as plt
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D
import imageio
import os
import pandas as pd
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from PIL import Image
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn import metrics
from sklearn.metrics import roc_curve, auc
from sklearn.metrics import precision_recall_fscore_support as score
from tensorflow.keras import layers
from sklearn import preprocessing
from tensorflow.keras.utils import plot_model

In [None]:
!pip install -q git+https://github.com/tensorflow/examples.git

In [None]:
from tensorflow_examples.models.pix2pix import pix2pix

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(gpus[0], True)

In [None]:
BUFFER_SIZE = 1000
BATCH_SIZE = 1
IMG_WIDTH = 256
IMG_HEIGHT = 256
OUTPUT_CHANNELS = 3

### Import and reuse the Pix2Pix models

In [None]:
OUTPUT_CHANNELS = 3

generator_g = pix2pix.unet_generator(OUTPUT_CHANNELS, norm_type='instancenorm')
generator_f = pix2pix.unet_generator(OUTPUT_CHANNELS, norm_type='instancenorm')

discriminator_x = pix2pix.discriminator(norm_type='instancenorm', target=False)
discriminator_y = pix2pix.discriminator(norm_type='instancenorm', target=False)

### Initializing optimizers, generatos and discriminators

In [None]:
generator_g_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
generator_f_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

discriminator_x_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_y_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

In [None]:
checkpoint_path = "../models/translation/rgb/" #----->folder where the model will be stored 

ckpt = tf.train.Checkpoint(generator_g=generator_g,
                           generator_f=generator_f,
                           discriminator_x=discriminator_x,
                           discriminator_y=discriminator_y,
                           generator_g_optimizer=generator_g_optimizer,
                           generator_f_optimizer=generator_f_optimizer,
                           discriminator_x_optimizer=discriminator_x_optimizer,
                           discriminator_y_optimizer=discriminator_y_optimizer)

ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

# if a checkpoint exists, restore the latest checkpoint.
if ckpt_manager.latest_checkpoint:
    ckpt.restore(ckpt_manager.latest_checkpoint)
    print ('Latest checkpoint restored!!')

ckpt.restore(ckpt_manager.latest_checkpoint)
if ckpt_manager.latest_checkpoint:
    print("Restored from {}".format(ckpt_manager.latest_checkpoint))
else:
    print("Initializing from scratch.")

# Making net test

In [None]:
# for layer in generator_g.layers:
#     print("layer.name: ", layer.name)
#     try:
#         print("output shape: ", layer.output_shape)
#     except:
#         print("no output shape")

In [None]:
# out1 = generator_g.get_layer('sequential_8')
# out2 = generator_g.get_layer('concatenate')
# print(out1.output_shape)
# print(out2.get_input_shape_at(0))
# print(out2.get_input_shape_at(-1))
# out2.get_weights()

In [None]:
# # Assuming 'out2' is a Concatenate layer
# input_layer_name1 = out2.get_input_at(0)[0].name.split('/')[0]
# input_layer_name2 = out2.get_input_at(0)[1].name.split('/')[0]

# print("Layer name for get_input_shape_at(0):", input_layer_name1)
# print("Layer name for get_input_shape_at(1):", input_layer_name2)

In [None]:
# #l1 = generator_g.get_layer(name='sequential_8')
# #custom embedding space dim
# initializer = tf.keras.initializers.Constant(1.)
# l1 = generator_g.get_layer(name='concatenate')
# input = l1.get_input_at(0)[0]
# x = layers.Conv2D(filters=input.shape[-1], kernel_size=(2,2), kernel_initializer=initializer)(input)
# x = layers.Reshape((input.shape[-1],))(x)
# emb2 = Model(inputs=generator_g.input, outputs=x)

In [None]:
#emb2.summary()

# until here

# <font color='red'>**Load and preprocess data**</font>

In [None]:
str2idx = {
    'CuNi1': 0,
    'CuNi2': 1,
    'CuNi3': 2
}

idx2str = {
    0: 'CuNi1',
    1: 'CuNi2', 
    2: 'CuNi3'
}

## Helper functions

In [None]:
def ohe_class(index):
    """
    One-Hot Encoding for Classification Task

    This function takes an integer 'index' representing the class label and performs
    one-hot encoding for a classification task. One-hot encoding is a technique used
    to convert categorical data (class labels) into a binary vector representation,
    where the index of the class label is marked with 1 and all other elements are 0.

    Parameters:
        index (int): An integer representing the class label that needs to be one-hot encoded.
                     It must be a non-negative integer less than the number of classes.

    Returns:
        numpy.ndarray: A NumPy array representing the one-hot encoded label. The length of
                       the array is equal to the number of classes, and the element at the
                       specified 'index' is set to 1, while all other elements are set to 0.

    Example:
        Suppose there are three classes: 0, 1, and 2. To one-hot encode class 1, use:
        >>> class_index = 1
        >>> encoded_label = ohe_class(class_index)
        >>> print(encoded_label)
        Output: [0 1 0]

    Note:
        The function assumes that the number of classes is fixed to 3, as the length of
        the one-hot encoded label is hard-coded to 3. If your classification task involves
        a different number of classes, you will need to modify the function accordingly.
    """
    # Create an array of zeros with length 3 and integer data type
    ohe_label = np.zeros(3, dtype=int)
    
    # Set the element at 'index' to 1 to represent the one-hot encoding
    ohe_label[index] = 1
    
    # Return the one-hot encoded label as an array
    return ohe_label


In [None]:
# scaling the images to [-1, 1]
def normalize(image):
    image = tf.cast(image, tf.float32)
    image = (image / 127.5) - 1
    return image

def preprocess_image(image):
    image = normalize(image)
    return image

In [None]:
def rgb2gray(path, size, pixels):
    img2 = np.zeros((pixels.shape))
    a = load_img(path, target_size=size, color_mode= "grayscale")
    img2[:,:,0] = a
    img2[:,:,1] = a
    img2[:,:,2] = a

    return img2

In [None]:
def load_images(path, rgb, size=(256, 256)):
    """
    Load Images and Corresponding Labels from a Directory into Memory.

    This function loads all images from a specified directory and their corresponding
    labels (assumed to be encoded in the image filenames) into memory. The images are
    loaded, resized to the specified dimensions, and converted into numpy arrays.
    Labels are extracted from the filenames and converted into one-hot encoded vectors.

    Parameters:
        path (str): The path to the directory containing the images.
        size (tuple, optional): A tuple (width, height) specifying the target size
                                for resizing the images. Default is (256, 256).
        rgb (bool, optional): Set to True to load images in RGB color mode,
                              False to load in grayscale mode. Default is False.

    Returns:
        numpy.ndarray: A NumPy array containing the image data. Each element in the
                       array is an image represented as a numpy array.
        list: A list of one-hot encoded labels corresponding to each image in the
              same order as the image data. Each label is represented as a NumPy
              array of length equal to the number of classes.

    Note:
        The function uses Keras' 'load_img' and 'img_to_array' functions to load
        and convert the images. Ensure that Keras or an appropriate library is
        installed before using this function.

    Example:
        >>> data_path = "/path/to/images/"
        >>> image_data, labels = load_images(data_path, size=(128, 128), rgb=True)
        >>> print(image_data.shape)
        Output: (num_images, 128, 128, 3)  # Assuming num_images is the total number of images.
        >>> print(len(labels))
        Output: num_images  # Number of images, each with a corresponding one-hot encoded label.
    """
    data_list = list()
    label_list = list()

    # if not rgb:
    #     color_mode = "grayscale"
    # else:
    #     color_mode = "rgb"

    # Enumerate filenames in the directory, assuming all are images
    for filename in tqdm(os.listdir(path)):
        # Load and resize the image
        pixels = load_img(os.path.join(path, filename), target_size=size, color_mode="rgb")
        # Convert to numpy array
        pixels = img_to_array(pixels)

        if rgb==False:
            print("CONVIRTIENDO A GRAY SCALE!")
            #convert rgb to gray
            pixels = rgb2gray(os.path.join(path, filename), size, pixels)
        else:
            None

        # Store the image data
        data_list.append(pixels)

        # For labels
        clase = filename.split('_')[0]
        # Assuming 'str2idx' is a dictionary mapping class names to their respective indices
        indx = str2idx[clase]
        # Get one-hot encoding from the index
        ohe_label = ohe_class(indx)
        label_list.append(ohe_label)

    return np.asarray(data_list), label_list


In [None]:
def saving_emb(split, clase, group, embeddings, labels, dim):
    """
    Save Embeddings, Labels, and Videos to Files.

    This function takes embeddings, labels, and videos obtained from a model and
    saves them to separate files for later use. The data is saved as NumPy arrays.

    Parameters:
        split (str): Indicates the data split, either 'train' or 'test', to determine
                     the destination directory for saving the files.
        clase (str): The class name or identifier to be included in the file names
                     for better organization.
        embeddings (list): A list of embeddings (feature vectors) obtained from a model.
        labels (list): A list of one-hot encoded labels corresponding to the embeddings.
        videos (list): A list of video data associated with the embeddings (optional).

    Note:
        The function converts the input lists 'embeddings', 'labels', and 'videos'
        into NumPy arrays before saving them. Ensure that the data is properly formatted
        before calling this function.
    """
    # Convert the input lists to NumPy arrays
    embeddings_arr = np.array(embeddings)
    labels_arr = np.array(labels)

    print("emb dimension: ", embeddings_arr.shape)
    print("label dimension: ", labels_arr.shape)

    # Create the file path based on the split and class name
    if split == 'train':        
        file_name = "../embeddings/rgb/train_" + group + "/" + clase + '/'
    else:
        file_name = "../embeddings/rgb/test_" + group + "/" + clase + '/'

    print("saving on: ", file_name)

    # Create the directory if it does not exist
    if not os.path.exists(file_name):
        os.makedirs(file_name)

    # Save the embeddings, labels, and videos (if provided) as separate files
    np.save(file_name + "Embeddings", embeddings_arr)
    np.save(file_name + "Labels", labels_arr)


In [None]:
def toDataSet(path_origen, rgb):
    """
    Convert Images and Labels to TensorFlow Dataset.

    This function loads images and corresponding labels from a specified directory,
    converts them into TensorFlow datasets, applies preprocessing to the images,
    and returns a combined dataset containing the image and label pairs.

    Parameters:
        path_origen (str): The path to the directory containing the images.

    Returns:
        tf.data.Dataset: A TensorFlow dataset containing image and label pairs.
                        The images are preprocessed and batched, and the labels
                        are cast to int64 data type.

    Note:
        This function assumes that the 'load_images' function is defined and returns
        a list of image data and labels. It also assumes the availability of 'BATCH_SIZE',
        'AUTOTUNE', and 'BUFFER_SIZE' variables for data preprocessing.

    """
    # Load images and labels using the 'load_images' function
    data, labels = load_images(path_origen, rgb)

    # Convert the data to NumPy array
    data_array = np.asarray(data)

    # Create a TensorFlow dataset for the image data
    data_ds = tf.data.Dataset.from_tensor_slices(data_array)

    # Create a TensorFlow dataset for the labels and batch them
    labels_ds = tf.data.Dataset.from_tensor_slices(tf.cast(labels, tf.int64)).batch(BATCH_SIZE)

    # Apply image preprocessing, cache, shuffle, and batch the image dataset
    data_ds = data_ds.map(preprocess_image, num_parallel_calls=AUTOTUNE).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

    # Combine the image and label datasets
    data_label_ds = tf.data.Dataset.zip((data_ds, labels_ds))

    return data_label_ds


# <font color='red'>**Generator embeddings**</font>

In [None]:
def load_emb_model(generator, dim):
    """
    Load Embedding Model from a Generator Model.

    This function takes a generator model, extracts the intermediate embedding layer,
    and creates a new model (embedding model) that outputs the embeddings obtained
    from the intermediate layer. The function returns this new embedding model.

    Parameters:
        generator_g (tf.keras.Model): The generator model from which to extract
                                      the intermediate embedding layer.

    Returns:
        tf.keras.Model: A new TensorFlow model (embedding model) that takes the same
                        input as the generator model and outputs the embeddings.

    Note:
        This function assumes that the generator model has a layer named 'concatenate'
        representing the intermediate embedding layer.

    """
    print("making emb model")

    #new
    # initializer = tf.keras.initializers.Constant(1.)
    # l1 = generator_g.get_layer(name='concatenate')
    # input = l1.get_input_at(0)[0]
    # x = layers.Conv2D(filters=input.shape[-1], kernel_size=(2,2), kernel_initializer=initializer)(input)
    # x = layers.Reshape((input.shape[-1],))(x)
    # emb2 = Model(inputs=generator_g.input, outputs=x)
    #until here

    #custom embedding space dim
    initializer = tf.keras.initializers.Constant(1.)


    if generator == "generator_g":
        print("generator_g")
        generator = generator_g
        l1 = generator.get_layer(name='concatenate')
        input = l1.get_input_at(0)[0]
        x = layers.Conv2D(filters=input.shape[-1], kernel_size=(2,2), kernel_initializer=initializer)(input)
        x = layers.Reshape((input.shape[-1],))(x)
        emb2 = Model(inputs=generator.input, outputs=x)
    else:
        print("generator_f")
        generator = generator_f
        l1 = generator.get_layer(name='concatenate_1')
        input = l1.get_input_at(0)[0]
        x = layers.Conv2D(filters=input.shape[-1], kernel_size=(2,2), kernel_initializer=initializer)(input)
        x = layers.Reshape((input.shape[-1],))(x)
        emb2 = Model(inputs=generator.input, outputs=x)
        
    

    
    
    # emb = Model(generator.input, l1.output)

    # #for solve problem
    # #inputs = keras.Input(shape=(295, 2, 2, 1024), name='img')
    # input = emb.output
    # x = layers.Conv2D(filters=dim, kernel_size=(2,2), kernel_initializer=initializer)(input)
    # x = layers.Reshape((dim,))(x)
    # emb2 = Model(inputs=emb.input, outputs=x)
    
    return emb2

## Generating embeddings

In [None]:
# for layer in generator_g.layers:
#     print(layer.name)

In [None]:
# #============= parcialmente resuelto ================
# l1 = generator_g.get_layer(name='sequential_8')
# # Get the last layer of the Sequential model
# last_layer = l1.layers[-1]

# # Calculate the size for the reshape layer
# reshape_size = last_layer.output_shape[-1]  # Adjust this value as needed

# # Create a Reshape layer with the desired size
# reshape_layer = tf.keras.layers.Reshape(target_shape=(reshape_size,))

# # Create a new model by appending the reshape layer to the end of the original model
# reshaped_output = reshape_layer(last_layer.output)
# decoder_model = tf.keras.Model(inputs=l1.input, outputs=reshaped_output)
# decoder_model.summary()

In [None]:
#if group == 'dry': generator = 'generator_g' else: generator = 'generator_f'
split = 'train'
group = 'wet'
rgb = True
dim = 512 #only for ablation study
gen_path = '../imgs_results/rgb/' + split + '_' + group + '/'

if group == "dry":
     generator = 'generator_g'
else:
     generator = 'generator_f'



#loading embedding model
emb2 = load_emb_model(generator, dim)
clases = ['CuNi1', 'CuNi2', 'CuNi3']

for tipo in clases:
    print("working on: ", tipo)
    embeddings, labels = [], []
    tipo_pth = gen_path + tipo + '/'
    print("tipo_pth: ", tipo_pth)
    clase = tipo
    print("clase: ", clase)
    print("convirtiendo a tf.Dataset...")
    data_ds = toDataSet(tipo_pth, rgb)
    can = len(data_ds)
    print("cantidad: ", can)
    label = [clase]*can
    labels.extend(label)
    for img, label in tqdm(data_ds):
        out = emb2(img)#emb2([img])#
        embeddings.extend(out)
    print("saving ", tipo, " class...")
    saving_emb(split, clase, group, embeddings, labels, dim)

In [1]:
import numpy as np
np_path = "../embeddings/rgb/test_dry/CuNi3/Embeddings.npy"
labels = np.load(np_path)
print(len(labels))

55


In [4]:
labels.max()

0.0