In [None]:
import os
import torch
import numpy as np
import tensorflow as tf
import dataloader as dset
import matplotlib.pyplot as plt
import tensorflow.keras.backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Lambda
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Multiply
from tensorflow.keras.applications import VGG16
from tensorflow.keras.layers import MaxPooling2D
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import EarlyStopping
from data_augmenter import enrich_and_shuffle_dataset
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import GlobalAveragePooling2D, ReLU, Flatten

In [None]:
def configure_gpu():
    gpus = tf.config.list_physical_devices('GPU')
    if gpus:
        try:
            for gpu in gpus:
                tf.config.experimental.set_memory_growth(gpu, True)
            print("Memory growth set successfully")
        except RuntimeError as e:
            print("Failed to set memory growth: ", e)

configure_gpu()

In [None]:
os.environ['KMP_DUPLICATE_LIB_OK']='True'

LMDB_PATH_HOST="..\dataset\lmdb.hwr_40-1.0"
TRN_DATA="..\dataset\data\lines.filtered_max_width.tst.55.shuf"
TST_DATA="..\dataset\datalines.filtered_max_width.tst.55"

In [None]:
def create_pairs(images, labels):
    imagePairs = []
    labelPairs = []

    unique_labels = np.unique(labels)
    numclasses = len(unique_labels)
    idx = {label: np.where(labels == label)[0] for label in unique_labels}

    for ind in range(len(images)):
        currImage = images[ind]
        label = labels[ind]

        # Choose a positive pair from the same class
        if len(idx[label]) > 1:  # Check if there is more than one image in the class
            indB = np.random.choice([i for i in idx[label] if i != ind])
            indImage = images[indB]
            imagePairs.append([currImage, indImage])
            labelPairs.append([1])
        else:
            # Skip if no other images are in the same class
            continue

        # Choose a negative pair from different class
        different_classes = [l for l in unique_labels if l != label]
        if different_classes:
            diss_label = np.random.choice(different_classes)
            diss_idx = np.random.choice(idx[diss_label])
            diss_image = images[diss_idx]
            imagePairs.append([currImage, diss_image])
            labelPairs.append([0])

    return (np.array(imagePairs), np.array(labelPairs))

In [None]:
train_set = dset.DatasetFromLMDB(lmdb_path=LMDB_PATH_HOST, labels_path=TRN_DATA)

X = []
y = []

for index in range(len(train_set)):
    image, label, image_name = train_set[index]
    X.append(image)
    y.append(label.item())

X_tensor = torch.stack(X)
X = X_tensor.numpy()
X = np.transpose(X, (0, 2, 3, 1))
y = np.array(y)

In [None]:
X_train.shape

In [None]:
y_train.shape

In [None]:
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.4, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

X_train, y_train = enrich_and_shuffle_dataset(X_train, y_train)

(X_train, y_train) = create_pairs(X_train, y_train)
(X_test, y_test) = create_pairs(X_test, y_test)
(X_val, y_val) = create_pairs(X_val, y_val)

# Creating and Training the Model

In [None]:
def euclidean_distance(vectors):
    vector1, vector2 = vectors
    return K.sqrt(K.maximum(K.sum(K.square(vector1 - vector2), axis=1, keepdims=True), K.epsilon()))

def contrastive_loss(y_true, y_pred):
    y_true = K.cast(y_true, y_pred.dtype) 
    margin = 1.0
    square_pred = K.square(y_pred)
    margin_square = K.square(K.maximum(margin - y_pred, 0))
    return K.mean(y_true * square_pred + (1 - y_true) * margin_square)
    
image_shape = (200, 50, 3)
batch_size = 32
epochs = 25

# Load VGG16 without the fully connected layers at the top and with ImageNet weights
vgg16_base = VGG16(include_top=False, weights='imagenet', input_shape=(200, 50, 3))

# Decide which layers to freeze during training (optional, for fine-tuning)
for layer in vgg16_base.layers:
    layer.trainable = False 

# Adding custom layers on top of VGG16 to create embeddings
inputs = Input(shape=(200, 50, 3))
x = vgg16_base(inputs)
x = GlobalAveragePooling2D()(x)  # This converts the MxNxC tensor output into a 1xC tensor
x = Dense(512, activation='relu')(x)  # Example of a custom dense layer for embeddings
embeddings = Dense(128, activation='relu')(x)  # Output layer for embeddings
embedding_model = Model(inputs, embeddings)

# Define two inputs for the Siamese network
input_a = Input(shape=image_shape)
input_b = Input(shape=image_shape)

# Generate embeddings for both inputs
embedding_a = embedding_model(input_a)
embedding_b = embedding_model(input_b)

# Calculate the Euclidean distance between the embeddings
distance = Lambda(euclidean_distance)([embedding_a, embedding_b])

# Siamese Network Model
model = Model(inputs=[input_a, input_b], outputs=distance)

early_stopping_callback = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

model.compile(optimizer='adam', loss=contrastive_loss)

history = model.fit(
    [X_train[:, 0], X_train[:, 1]], y_train[:],
    validation_data=([X_val[:, 0], X_val[:, 1]], y_val[:]),
    batch_size = batch_size,
    epochs = epochs,
    callbacks=[early_stopping_callback])

In [None]:
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Input, Lambda, GlobalAveragePooling2D, Dense
from tensorflow.keras.models import Model
import tensorflow.keras.backend as K

image_shape = (200, 50, 3)
batch_size = 32
epochs = 25

def euclidean_distance(vectors):
    vector1, vector2 = vectors
    return K.sqrt(K.maximum(K.sum(K.square(vector1 - vector2), axis=1, keepdims=True), K.epsilon()))

def contrastive_loss(y_true, y_pred):
    y_true = K.cast(y_true, y_pred.dtype)
    margin = 1.0
    square_pred = K.square(y_pred)
    margin_square = K.square(K.maximum(margin - y_pred, 0))
    return K.mean(y_true * square_pred + (1 - y_true) * margin_square)

# Load ResNet50 without the fully connected layers at the top and with ImageNet weights
resnet_base = ResNet50(include_top=False, weights='imagenet', input_shape=image_shape)

# Freeze layers for fine-tuning (optional)
for layer in resnet_base.layers:
    layer.trainable = False

# Adding custom layers on top of ResNet to create embeddings
inputs = Input(shape=image_shape)
x = resnet_base(inputs)
x = GlobalAveragePooling2D()(x)  # Converts the MxNxC tensor output into a 1xC tensor
x = Dense(512, activation='relu')(x)  # Example of a custom dense layer for embeddings
embeddings = Dense(128, activation='relu')(x)  # Output layer for embeddings
embedding_model = Model(inputs, embeddings)

# Define two inputs for the Siamese network
input_a = Input(shape=image_shape)
input_b = Input(shape=image_shape)

# Generate embeddings for both inputs
embedding_a = embedding_model(input_a)
embedding_b = embedding_model(input_b)

# Calculate the Euclidean distance between the embeddings
distance = Lambda(euclidean_distance)([embedding_a, embedding_b])

# Siamese Network Model
model = Model(inputs=[input_a, input_b], outputs=distance)

from tensorflow.keras.callbacks import EarlyStopping

early_stopping_callback = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

model.compile(optimizer='adam', loss=contrastive_loss)

history = model.fit(
    [X_train[:, 0], X_train[:, 1]], y_train[:],
    validation_data=([X_val[:, 0], X_val[:, 1]], y_val[:]),
    batch_size = batch_size,
    epochs = epochs,
    callbacks=[early_stopping_callback])

# Evaluation

In [None]:
from sklearn.metrics import roc_curve, auc

batch_size = 32  # You can adjust this based on your system's capabilities
predictions = model.predict([X_test[:, 0], X_test[:, 1]], batch_size=batch_size)

similarity_scores = np.exp(-np.array(predictions))

# Compute ROC curve and ROC area
fpr, tpr, _ = roc_curve(y_test, similarity_scores)
roc_auc = auc(fpr, tpr)

# Plot ROC curve
plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.show()

# My Trial

In [None]:
def euclidean_distance(vecs):
    (imgA, imgB) = vecs
    # Normalize each vector to unit length
    imgA_normalized = imgA / K.sqrt(K.maximum(K.sum(K.square(imgA), axis=1, keepdims=True), K.epsilon()))
    imgB_normalized = imgB / K.sqrt(K.maximum(K.sum(K.square(imgB), axis=1, keepdims=True), K.epsilon()))
    
    # Calculate the squared Euclidean distance between the normalized vectors
    squared_differences = K.square(imgA_normalized - imgB_normalized)
    sum_squared_differences = K.sum(squared_differences, axis=1, keepdims=True)
    
    # Return the square root of the sum of squared differences
    return K.sqrt(K.maximum(sum_squared_differences, K.epsilon()))

def contrastiveLoss(y, y_preds, margin=1):
    y = tf.cast(y, y_preds.dtype)
    y_preds_squared = K.square(y_preds)
    margin_squared = K.square(K.maximum(margin - y_preds, 0))
    loss = K.mean(y * y_preds_squared + (1 - y) * margin_squared)
    return loss
    
def siamese_model(input_shape, embeddingDim=48):
    x = Input(input_shape)
    
    # Prvá konvolučná vrstva s pozornosťou
    conv1 = Conv2D(64, (3, 3), activation="relu")(x)
    x = BatchNormalization()(conv1)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.5)(x)

    # Druhá konvolučná vrstva s pozornosťou
    conv2 = Conv2D(128, (3, 3), activation="relu")(x)
    x = BatchNormalization()(conv2)
    x = MaxPooling2D(pool_size=(2, 2))(x)  
    x = Dropout(0.5)(x)

    # Tretia konvolučná vrstva a GAP
    x = Conv2D(256, (3, 3), activation="relu")(x)
    x = BatchNormalization()(x)
    x = GlobalAveragePooling2D()(x)

    # Fully connected layer pre embedding
    outputs = Dense(embeddingDim)(x)

    model = Model(inputs, outputs)
    return model

In [None]:
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

model.compile(loss = contrastiveLoss, optimizer="adam")

history = model.fit(
    [X_train[:, 0], X_train[:, 1]], y_train[:],
    validation_data=([X_val[:, 0], X_val[:, 1]], y_val[:]),
    batch_size = batch_size,
    epochs = epochs,
    callbacks=[early_stopping_callback])

In [None]:
image_shape = (200, 50, 3)
batch_size = 32
epochs = 25

imageA = Input(shape = image_shape) 
imageB = Input(shape = image_shape)

model_build = siamese_model(image_shape)
modelA = model_build(imageA)
modelB = model_build(imageB)

distance = Lambda(euclidean_distance)([modelA, modelB])
model = Model(inputs=[imageA, imageB], outputs=distance)