# Importing the required libraries

In [None]:
import os
import numpy as np
import tensorflow as tf
import pandas as pd
import cv2
import matplotlib.pyplot as plt
import glob 
import shutil
from tensorflow.keras import backend as K
from tensorflow.keras.applications.vgg19 import VGG19
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Concatenate, Subtract, concatenate, Input, Flatten, Activation, Dense, Dropout, Lambda, Conv2D, BatchNormalization, MaxPooling2D
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.callbacks import EarlyStopping

# Utility functions

In [None]:
def test_accuracy(model):
    test_loss, test_accuracy = model.evaluate(X_test, y_test)
    print("Test Loss:", test_loss)
    print("Test Accuracy:", test_accuracy)
    
def plot_loss(history):
    # Historique des valeurs de précision d'entraînement et de validation
    train_loss = history.history['loss']
    val_loss = history.history['val_loss']

    # Historique des numéros d'époque
    epochs = range(1, len(train_loss) + 1)

    # Tracer la courbe de précision d'entraînement
    plt.plot(epochs, train_loss, 'b', label='Train Loss')
    # Tracer la courbe de précision de validation
    plt.plot(epochs, val_loss, 'r', label='Validation Loss')
    plt.title('Training and Validation Losses')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    # Afficher le graphique
    plt.show()
    
    
def plot_accuracy(history):
    # Historique des valeurs de précision d'entraînement et de validation
    train_accuracy = history.history['accuracy']
    val_accuracy = history.history['val_accuracy']

    # Historique des numéros d'époque
    epochs = range(1, len(train_accuracy) + 1)

    # Tracer la courbe de précision d'entraînement
    plt.plot(epochs, train_accuracy, 'b', label='Train Accuracy')
    # Tracer la courbe de précision de validation
    plt.plot(epochs, val_accuracy, 'r', label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    # Afficher le graphique
    plt.show()
    
def predicting_on_dataset(X_pred, model):
    # Select a subset of the test data for visualization
    subset_size = 300
    X_subset = X_pred[:subset_size]

    # Make predictions on the subset of test data
    predictions = model.predict(X_subset)

    # Plot the images and predictions
    fig, axes = plt.subplots(subset_size, 2, figsize=(10, subset_size*2))
    for i in range(subset_size):
        # Plot first image
        axes[i, 0].imshow(X_subset[0][i])
        axes[i, 0].axis('off')

        # Plot second image
        axes[i, 1].imshow(X_subset[1][i])
        axes[i, 1].axis('off')

        # Add predicted score as title
        score = predictions[i]  # Assuming the second element represents the score
        axes[i, 1].set_title(score)

    plt.tight_layout()
    plt.show()
    
def data_aug(train_left, train_right, train_label, nb, save_folder):

    # Create saving folder
    aug_folder = safe_folder_creation(os.path.join(save_folder))

    # Specify data generator parameters
    datagenargs = {
        'rotation_range': 2, 'width_shift_range': 0.2, 'height_shift_range': 0.2,
        'shear_range': 0.1,
        'zoom_range': 0.25, 'horizontal_flip': True, 'fill_mode': 'nearest'
    }

    #  Create generators
    left_datagen = ImageDataGenerator(**datagenargs)
    right_datagen = ImageDataGenerator(**datagenargs)

    # Initialization of data
    train_left_aug = list(train_left)
    train_right_aug = list(train_right)
    train_label_aug = list(train_label)
    img_size = train_left[0].shape[0]

    # Display processing advancement
    print("Creating new inputs...")
    pbar = progressbar.ProgressBar()
    # Create nb augmented images from an original one
    for duel in pbar(range(len(train_label))):
        for _ in range(nb):
            # Create one augmented image from the left one
            ori_left_img = train_left[duel]
            left_img = ori_left_img.reshape((1,) + ori_left_img.shape)
            aug_img = left_datagen.flow(left_img, batch_size=1)
            left_aug_img = aug_img[0].reshape(ori_left_img.shape)

            # Create one augmented image from the right one
            ori_right_img = train_right[duel]
            right_img = ori_right_img.reshape((1,) + ori_right_img.shape)
            aug_img = right_datagen.flow(right_img, batch_size=1)
            right_aug_img = aug_img[0].reshape(ori_right_img.shape)

            # Add to list
            train_left_aug.append(left_aug_img)
            train_right_aug.append(right_aug_img)
            train_label_aug.append(train_label[duel])

    # Convert to array
    train_left_aug = np.array(train_left_aug)
    train_right_aug = np.array(train_right_aug)
    train_label_aug = np.array(train_label_aug)
    train_data_aug = [train_left_aug, train_right_aug]
    
    return train_data_aug, train_label_aug

# Preparing the data

In [None]:
data = pd.read_csv("data\question_1\duels_question_1.csv",usecols=[0,1,2], header=None)
data.columns = ["Image 1", "Image 2", "labels"]

#Deleting the no preference data
data = data[data["labels"] != "No preference"]

### Splitting and formatting the data for the comparison model using the duels data

In [None]:
shape = 224

def prepare_dataset_arrays(image_folder, data, shape):

    image1_names = data.iloc[:,0].values
    image2_names = data.iloc[:,1].values
    labels = data.iloc[:,2].values

    image1_array = []
    image2_array = []
    
    for image1_name, image2_name in zip(image1_names, image2_names):
        for filename in os.listdir(image_folder):
            if image1_name in filename:
                image1_path = os.path.join(image_folder, filename)
                image1 = cv2.imread(image1_path)
                image1 = cv2.resize(image1, (shape, shape))
                image1 = image1.astype(np.float32) / 255.0
                image1_array.append(image1)
            elif image2_name in filename:
                image2_path = os.path.join(image_folder, filename)
                image2 = cv2.imread(image2_path)
                image2 = cv2.resize(image2, (shape, shape))
                image2 = image2.astype(np.float32) / 255.0
                image2_array.append(image2)
                
    return image1_array, image2_array, labels

image1_array, image2_array, labels = prepare_dataset_arrays("data\question_1\Sample_web_green", data, shape)


### Creating the prediction dataset

In [None]:
def prepare_prediction_siamese(directory, shape):
    image_pred = []
    for img in glob.glob(directory):
        image1 = cv2.imread(img)
        image1 = cv2.resize(image1, (shape, shape))
        image1 = image1.astype(np.float32) / 255.0
        image_pred.append(image1)
    
    image_pred_1 = tf.convert_to_tensor(np.array(image_pred[:300])) 
    image_pred_2 = tf.convert_to_tensor(np.array(image_pred[300:600])) 

    X_pred = [image_pred_1, image_pred_2]
    
    return X_pred

X_pred = prepare_prediction_siamese("data/question_1/ForPrediction/*/*", shape)

### Creating the Training, Validation and Testing datasets with a split of (60%, 20%, 20%)

In [None]:
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'
def prepare_dataset_for_network(image1_array, image2_array, labels):
    # Format the labels of left and right images
    labels_formatted = []

    for label in labels:
        if label == "left":
            labels_formatted.append([1,0])  # 0 represents left image
        elif label == "right":
            labels_formatted.append([0,1])  # 1 represents right image

    labels_formatted = np.array(labels_formatted)

    # Conversion of the lists into numpy arrays
    image1_array = np.array(image1_array)
    image2_array = np.array(image2_array)

    labels_formatted = tf.convert_to_tensor(labels_formatted)

    image1_array = tf.convert_to_tensor(image1_array)
    image2_array = tf.convert_to_tensor(image2_array)
    
    # Split the data into training, validation, and test sets using array slicing
    train_size = int(0.6 * len(image1_array))
    valid_size = int(0.2 * len(image1_array))

    X_train = [image1_array[:train_size], image2_array[:train_size]]
    y_train = labels_formatted[:train_size]

    X_valid = [image1_array[train_size:train_size + valid_size], image2_array[train_size:train_size + valid_size]]
    y_valid = labels_formatted[train_size:train_size + valid_size]

    X_test = [image1_array[train_size + valid_size:], image2_array[train_size + valid_size:]]
    y_test = labels_formatted[train_size + valid_size:]
    
    return (X_train, y_train), (X_valid, y_valid), (X_test, y_test)

(X_train, y_train), (X_valid, y_valid), (X_test, y_test) = prepare_dataset_for_network(image1_array, image2_array, labels)

# Building the siamese network

## Building the model for the comparison between the two pictures

In [None]:
def comparison_siamese_model(input_shape):

    base_model = VGG19(weights='imagenet', include_top=False, input_shape=input_shape)
    for layer in base_model.layers[:-4]:
        layer.trainable=False

    # Create inputs for pairs of images
    input_1 = Input(shape=input_shape)
    input_2 = Input(shape=input_shape)

    # Get embeddings of the images using the shared VGG19 model
    output_1 = base_model(input_1)
    output_2 = base_model(input_2)

    concat = concatenate([output_1, output_2])

    # Classification layer to predict similarity
    flatten = Flatten()(concat)
    x = Conv2D(512, (3, 3), activation='relu', padding='same')(concat)
    x = Dropout(0.3)(x)
    x = Conv2D(512, (3, 3), activation='relu', padding='same')(x)
    x = Dropout(0.1)(x)
    x = Flatten()(x)
    output = Dense(2, activation='sigmoid')(x)

    # Create the complete siamese model
    siamese_model = Model(inputs=[input_1, input_2], outputs=output)
    # Compile the model
    siamese_model.compile(loss="binary_crossentropy", optimizer=Adam(learning_rate=0.000001), metrics=['accuracy'])

    # Print model summary
    siamese_model.summary()
    
    return siamese_model

# Train the siamese network

early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

siamese_comparison_model = comparison_siamese_model((224, 224, 3))

history = siamese_comparison_model.fit(X_train, y_train, batch_size=64, epochs=5, validation_data=(X_valid, y_valid), callbacks=[early_stopping])

### Testing accuracy check

In [None]:
# Evaluate the model on the test set
test_accuracy(siamese_comparison_model)

### Plotting the accuracy metric for the validation and training datasets

In [None]:
plot_accuracy(history)

### Plotting the loss for the validation and training datasets

In [None]:
plot_loss(history)

### Plotting some of the results from the prediction on the Prediction dataset

In [None]:
predicting_on_dataset([X_pred[0],X_pred[0]], siamese_comparison_model)

# Building the ranking model

In [None]:
def prepare_label_for_ranking(labels):
    # Format the labels of left and right images
    labels_formatted = []

    for label in labels:
        if label == "left":
            labels_formatted.append(0)  # 0 represents left image
        elif label == "right":
            labels_formatted.append(1)  # 1 represents right image

    labels_formatted = np.array(labels_formatted)
    labels_formatted = tf.convert_to_tensor(labels_formatted)
    # Split the data into training, validation, and test sets using array slicing
    train_size = int(0.6 * len(image1_array))
    valid_size = int(0.2 * len(image1_array))
    
    y_train = labels_formatted[:train_size]
    y_valid = labels_formatted[train_size:train_size + valid_size]
    y_test = labels_formatted[train_size + valid_size:]
    
    return y_train, y_valid, y_test

y_train, y_valid, y_test = prepare_label_for_ranking(labels)

### Guillaume's model

In [None]:
def create_ranking_network(img_size):
    """
    Create ranking network which give a score to an image.

    :param img_size: size of input images during training
    :type img_size: tuple(int)
    :return: ranking network model
    :rtype: keras.Model
    """
    # Create feature extractor from VGG19
    feature_extractor = VGG19(weights="imagenet", include_top=False, input_shape=(img_size, img_size, 3))
    for layer in feature_extractor.layers[:-4]:
        layer.trainable = False

    # Add dense layers on top of the feature extractor
    inp = Input(shape=(img_size, img_size, 3), name='input_image')
    base = feature_extractor(inp)
    base = Flatten(name='Flatten')(base)

    # Block 1
    base = Dense(32, activation='relu', name='Dense_1')(base)
    base = BatchNormalization(name='BN1')(base)
    base = Dropout(0.490, name='Drop_1')(base)

    # Block 2
    base = Dense(128, activation='relu', name='Dense_2')(base)
    base = BatchNormalization(name='BN2')(base)
    base = Dropout(0.368, name='Drop_2')(base)

    # Final dense
    base = Dense(1, name="Dense_Output")(base)
    base_network = Model(inp, base, name='Scoring_model')
    return base_network


def create_meta_network(img_size, weights=None):
    """
    Create meta network which is used to to teach the ranking network.

    :param img_size: dimension of input images during training.
    :type img_size: tuple(int)
    :param weights: path to the weights use for initialization
    :type weights: str
    :return: meta network model
    :rtype: keras.Model
    """

    # Create the two input branches
    input_left = Input(shape=(img_size, img_size, 3), name='left_input')
    input_right = Input(shape=(img_size, img_size, 3), name='right_input')
    base_network = create_ranking_network(img_size)
    left_score = base_network(input_left)
    right_score = base_network(input_right)

    # Subtract scores
    diff = Subtract()([left_score, right_score])

    # Pass difference through sigmoid function.
    prob = Activation("sigmoid", name="Activation_sigmoid")(diff)
    model = Model(inputs=[input_left, input_right], outputs= prob, name="Meta_Model")

    if weights:
        print('Loading weights ...')
        model.load_weights(weights)


    sgd = SGD(learning_rate=1e-6, decay=1e-6, momentum=0.393, nesterov=True)
    model.compile(optimizer=Adam(learning_rate=0.000001), loss="binary_crossentropy", metrics=['accuracy'])

    return model

meta_network = create_meta_network(224)
meta_network.summary()
meta_network.fit(X_train, y_train, batch_size=16, epochs=20, validation_data=(X_valid, y_valid))

In [None]:
ranking_model = meta_network.get_layer('Scoring_model')
ranking_model.save_weights('ranking_model_weights.h5')

In [None]:
ranking_model = create_ranking_network(224)
ranking_model.load_weights('ranking_model_weights.h5')

In [None]:
# Predict scores for the images
scores = ranking_model.predict(X_pred[0])

# Create an array of indices to maintain the original order
indices = np.arange(len(scores))

# Sort the indices based on the scores in descending order
sorted_indices = sorted(indices, key=lambda x: scores[x], reverse=True)

# Set the number of columns for the grid
num_columns = 5

# Calculate the number of rows based on the number of images and columns
num_images = len(X_pred[0])
num_rows = int(np.ceil(num_images / num_columns))

# Create a figure and axes for the grid
fig, axes = plt.subplots(num_rows, num_columns, figsize=(15, 3*num_rows))

# Iterate over the sorted indices and plot the images in the grid
for i, index in enumerate(sorted_indices):
    row = i // num_columns
    col = i % num_columns

    # Plot the image with the corresponding score
    ax = axes[row, col]
    ax.imshow(X_pred[0][index])
    ax.axis('off')
    ax.set_title(f"Score: {scores[index]}")

# Adjust the layout and display the grid of images
plt.tight_layout()
plt.show()