In [3]:
import random
import numpy as np
import cv2 as cv
from scipy import ndimage
import os
from sklearn.metrics import roc_curve, roc_auc_score

In [4]:
DATASET_PATH = 'C:\\Users\\ester\\faculdade\\tcc\\database_gpds\\SignatureGPDSSyntheticSignaturesManuscripts\\firmasSINTESISmanuscritas\\'
NUM_OF_WRITERS_TO_LOAD = 10
GENUINE_SIGNATURES_PREFIX = 'c-'
FORGED_SIGNATURES_PREFIX = 'cf-'
TRAINING_SET_RATIO = 0.7
VALIDATION_SET_RATIO = 0.2
NUM_GENUINE_PER_WRITER = 24
NUM_FORGED_PER_WRITER = 30

In [5]:
def load_grayscale_image(image_path):
    '''
    Carrega a imagem em escala de cinzas e devolve um ndarray.
    '''
    image = cv.imread(image_path)
    gray_image = cv.cvtColor(image, cv.COLOR_BGR2GRAY)
    image_array = np.array(gray_image)

    return image_array

In [6]:
def load_data(directory_path, num_of_individuals):
    data = []
    individuals_counter = 0

    for subdir, _, files in os.walk(directory_path):
        if subdir == directory_path:
            continue

        individual = os.path.basename(subdir)
        genuine_signatures = [
            load_grayscale_image(os.path.join(directory_path, subdir, file)) \
            for file in files if file.startswith(GENUINE_SIGNATURES_PREFIX)
        ]
        forged_signatures = [
            load_grayscale_image(os.path.join(directory_path, subdir, file)) \
            for file in files if file.startswith(FORGED_SIGNATURES_PREFIX)
        ]
        data.append({
            'individual': int(individual),
            'genuine_signatures': genuine_signatures,
            'forged_signatures': forged_signatures
        })
        individuals_counter += 1
        if individuals_counter == num_of_individuals:
            break
    
    return data

In [7]:
dataset = load_data(DATASET_PATH, NUM_OF_WRITERS_TO_LOAD)

In [6]:
def otsu_thresholding(image_array):
    '''
    Aplica a limiarização de Otsu numa imagem.
    Recebe um ndarray.
    Retorna um ndarray onde os pixels brancos representam o objeto e os pixels pretos
    representam o plano de fundo.
    '''
    _, otsu_result = cv.threshold(image_array, 0, 1, cv.THRESH_BINARY + cv.THRESH_OTSU)
    otsu_inverted = otsu_result^1
    return otsu_inverted

def thin_operation(img, th_level):
    '''
    Aplica th_level operações de dilatação e, em seguida, th_level operações de
    erosão na imagem img.
    Recebe um ndarray contendo uma máscara binária de uma imagem.
    Devolve uma máscara binária também, com o resultado da operação.
    '''
    img = ndimage.binary_opening(img, iterations=th_level)
    img = ndimage.binary_erosion(img, iterations=th_level)
    return img.astype(int)

In [7]:
def calculate_patch_densities(image):
    '''
    Calcula as densidades de cada bloco 5x5 centrado em um pixel de assinatura.
    A densidade de um bloco é uma fração de quantos pixels do bloco são pixels
    de assinatura (pixels brancos, iguais a 1).
    Recebe uma máscara binária da assinatura.
    Retorna um array com os valores das densidades.
    '''
    ld = []
    pad = 2
    padded_image = np.pad(image, pad, mode='constant', constant_values=0)
    for i in range(image.shape[0]):
        for j in range(image.shape[1]):
            if image[i, j] == 1:
                patch = padded_image[i:i+5, j:j+5]
                patch_density = np.sum(patch) / 25
                ld.append(patch_density)
    
    if len(ld) == 0:
        ld.append(0)
    return np.array(ld)

def get_optimal_thinning_level(image_array):
    '''
    Calcula a quantidade ótima de operações de abertura/fechamento na imagem.
    Recebe um ndarray com a máscara binária da imagem.
    Retorna um inteiro, correspondente ao número ótimo de operações de
    abertura/fechamento.
    '''
    pd = []
    th_level = 0
    while True:
        thinned_image = thin_operation(image_array, th_level)
        ld = calculate_patch_densities(thinned_image)
        ld_mean = np.mean(ld)
        if len(pd) > 1 and np.abs(pd[-1] - ld_mean) < 0.12:
            break
        pd.append(ld_mean)
        th_level += 1
    pd_diff = np.abs(np.diff(pd))
    otl = np.argmax(pd_diff) + 1
    return otl

def get_most_optimal_thinning_level(genuine_signatures_of_an_individual_preprocessed_with_ostu):
    '''
    Retorna a quantidade ótima de operações de abertura/fechamento para o conjunto
    de imagens de assinaturas genuínas de um indivíduo.
    Recebe a lista de assinaturas genuínas de um único indivíduo.
    Retorna um inteiro, correspondente ao número ótimo de operações de abertura/fechamento.
    '''
    otl = []
    index = 0
    for signature_image in genuine_signatures_of_an_individual_preprocessed_with_ostu:
        signature_otl = get_optimal_thinning_level(signature_image)
        otl.append(signature_otl)
        index += 1
    motl = np.mean(otl)
    return motl

In [60]:
def apply_preprocessing(data):
    '''
    Aplica o pré-processamento nas imagens de assinaturas.
    Aplica limiarização de Otsu + abertura/fechamento.
    '''
    for individual_data in data:
        preprocessed_genuine_signatures = []
        for sig in individual_data['genuine_signatures']:
            otsu_sig = otsu_thresholding(sig)
            preprocessed_genuine_signatures.append(otsu_sig)

        preprocessed_forged_signatures = []
        for sig in individual_data['forged_signatures']:
            otsu_sig = otsu_thresholding(sig)
            preprocessed_forged_signatures.append(otsu_sig)

        motl = get_most_optimal_thinning_level(preprocessed_genuine_signatures)
        individual_data['motl'] = round(motl)
        individual_data['genuine_signatures_mask'] = \
            list(map(lambda sig: thin_operation(sig, individual_data['motl']), preprocessed_genuine_signatures))
        individual_data['forged_signatures_mask'] = \
            list(map(lambda sig: thin_operation(sig, individual_data['motl']), preprocessed_forged_signatures))

In [62]:
apply_preprocessing(dataset)

In [66]:
def transform(data_dict):
    data_dict.pop('genuine_signatures')
    data_dict.pop('forged_signatures')
    data_dict.pop('genuine_signatures_mask')
    data_dict.pop('forged_signatures_mask')
    return data_dict

#dataset2 = list(map(transform, dataset.copy()))

In [10]:
def calculate_feature_map(image, binary_mask):
    '''
    Obtém o mapa de features da imagem. O mapa consiste na aplicação de
    10 funções sob os pixels da imagem: gradientes de primeiro e segundo grau,
    magnitude do gradiente, direção do gradiente e normalização dos valores
    das coordenadas.
    Recebe a imagem (em escala de cinzas) e a máscara binária da assinatura.
    O mapa final terá apenas os valores resultantes correspondentes aos píxels
    da assinatura.
    Retorna um ndarray.
    '''
    image = image / 255.0
    h, w = image.shape
    Ix = np.gradient(image, axis=1)
    Iy = np.gradient(image, axis=0)
    Ixx = np.gradient(Ix, axis=1)
    Ixy = np.gradient(Ix, axis=0)
    Iyy = np.gradient(Iy, axis=0)
    gradient_magnitude = np.sqrt(Ix**2 + Iy**2)
    gradient_direction = np.arctan2(Iy, Ix)
    xn = np.tile(np.arange(w) / w, (h, 1))
    yn = np.tile(np.arange(h) / h, (w, 1)).T

    feature_map = np.stack([
        image, Ix, Iy, Ixx, Ixy, Iyy, gradient_magnitude, gradient_direction, xn, yn
    ], axis=0)
    feature_map_only_signature_pixels = feature_map * binary_mask[np.newaxis, :, :]
    return feature_map_only_signature_pixels

def calculate_signature_covariance_matrix(feature_map):
    '''
    Calcula a matriz de covariância, dado um mapa de features.
    Retorna um ndarray com dimensões (10, 10).
    '''
    feature_map = feature_map.reshape(10, -1)
    S = feature_map.shape[1]
    mu = np.mean(feature_map, axis=1, keepdims=True)
    inner_summup = (feature_map - mu) @ (feature_map - mu).T
    covariance_matrix = (1 / (S - 1)) * inner_summup
    return covariance_matrix

In [11]:
def calculate_signature_covariance_matrices(data):
    '''
    Calcula as matrizes de covariância para todas as assinaturas do conjunto
    de dados carregado.
    '''
    for individual_data in data:
        genuine_signatures_covariance_matrices = []
        for signature, signature_mask in zip(individual_data['genuine_signatures'], individual_data['genuine_signatures_mask']):
            feature_map = calculate_feature_map(signature, signature_mask)
            covariance_matrix = calculate_signature_covariance_matrix(feature_map)
            genuine_signatures_covariance_matrices.append(covariance_matrix)

        forged_signatures_covariance_matrices = []
        for signature, signature_mask in zip(individual_data['forged_signatures'], individual_data['forged_signatures_mask']):
            feature_map = calculate_feature_map(signature, signature_mask)
            covariance_matrix = calculate_signature_covariance_matrix(feature_map)
            forged_signatures_covariance_matrices.append(covariance_matrix)
        
        individual_data['genuine_scm'] = genuine_signatures_covariance_matrices
        individual_data['forged_scm'] = forged_signatures_covariance_matrices

In [65]:
calculate_signature_covariance_matrices(dataset)

In [20]:
def create_dicts(data, forgery_type):
    similar_pairs_dict = {}
    disimilar_pairs_dict = {}

    for individual_data in data:
        person_number = individual_data['individual']

        for i in range(NUM_GENUINE_PER_WRITER):
            for j in range(i + 1, NUM_GENUINE_PER_WRITER):
                if i != j:
                    key = (person_number, person_number)
                    value = (i, j)
                    similar_pairs_dict.setdefault(key, set()).add(value)

        if forgery_type == 'genuine_genuine_mixed_individuals':
            for other_d in data:
                if other_d['individual'] != person_number and (other_d['individual'], person_number) not in disimilar_pairs_dict:
                    for genuine_index in range(NUM_GENUINE_PER_WRITER):
                        for other_genuine_index in range(NUM_GENUINE_PER_WRITER):
                            key = (person_number, other_d['individual'])
                            value = (genuine_index, other_genuine_index)
                            disimilar_pairs_dict.setdefault(key, set()).add(value)
        
        elif forgery_type == 'genuine_forged_mixed_individuals':
            for other_d in data:
                if other_d['individual'] != person_number:
                    for genuine_index in range(NUM_GENUINE_PER_WRITER):
                        for forged_index in range(NUM_FORGED_PER_WRITER):
                            key = (person_number, other_d['individual'])
                            value = (genuine_index, forged_index)
                            disimilar_pairs_dict.setdefault(key, set()).add(value)

        elif forgery_type == 'genuine_forged_same_individuals':
            for genuine_index in range(NUM_GENUINE_PER_WRITER):
                for forged_index in range(NUM_FORGED_PER_WRITER):
                    key = (person_number, person_number)
                    value = (genuine_index, forged_index)
                    disimilar_pairs_dict.setdefault(key, set()).add(value)
    
    return similar_pairs_dict, disimilar_pairs_dict

In [21]:
def split_dataset(data, forgery_type):

    training_individuals = random.sample(data, int(len(data) * TRAINING_SET_RATIO))
    remaining_individuals = [d for d in data if d not in training_individuals]
    validation_individuals = random.sample(remaining_individuals, int(len(data) * VALIDATION_SET_RATIO))
    test_individuals = [d for d in remaining_individuals if d not in validation_individuals]

    similar_pairs_training_dict, disimilar_pairs_training_dict = create_dicts(training_individuals, forgery_type)
    similar_pairs_validation_dict, disimilar_pairs_validation_dict = create_dicts(validation_individuals, forgery_type)
    similar_pairs_test_dict, disimilar_pairs_test_dict = create_dicts(test_individuals, forgery_type)

    dicts = [similar_pairs_training_dict, disimilar_pairs_training_dict, similar_pairs_validation_dict, \
             disimilar_pairs_validation_dict, similar_pairs_test_dict, disimilar_pairs_test_dict]

    for dictionary in dicts:
        for key in dictionary:
            dictionary[key] = list(dictionary[key])

    return similar_pairs_training_dict, disimilar_pairs_training_dict, \
        similar_pairs_validation_dict, disimilar_pairs_validation_dict, \
        similar_pairs_test_dict, disimilar_pairs_test_dict

In [54]:
s_pairs_training, d_pairs_training, s_pairs_validation, d_pairs_validation, s_pairs_test, d_pairs_test = \
        split_dataset(dataset, 'genuine_forged_mixed_individuals')

In [28]:
max(s_pairs_training[10,10])

(22, 23)

In [85]:
n = 10
m = 2
p = 5
max_iterations = 200
learning_rate = 10e-4
epoch = 50
max_epochs = max_iterations // epoch
training_batch_size = 300
validation_batch_size = 200
zeta_s = 1
zeta_d = 20.0
sci = 0.001
aucs = []
minimum_loss = 10e+5
best_A = np.zeros((m, 2))
best_W = np.zeros((m, 2))
best_M = np.zeros((m, m))

A = np.random.rand(m, 2)
M = np.eye(m)
M0 = np.eye(m)
W = np.zeros((n, m*p))
for j in range(m):
    Q = np.random.randn(n, p)
    for i in range(j):
        Q -= np.dot(W[:, i*p:(i+1)*p], np.dot(W[:, i*p:(i+1)*p].T, Q))
    Q, _ = np.linalg.qr(Q)
    W[:, j*p:(j+1)*p] = Q

In [30]:
similar_pairs_training = s_pairs_training
disimilar_pairs_training = d_pairs_training
similar_pairs_validating = s_pairs_validation
disimilar_pairs_validating = d_pairs_validation
similar_pairs_test = s_pairs_test
disimilar_pairs_test = d_pairs_test
mixed_or_same_individuals_in_similar = 'same'
mixed_or_same_individuals_in_disimilar = 'mixed'

In [39]:
def get_batch(num_elements, source_pairs_dict, source_data, same_or_mixed, origin_list_1, origin_list_2):

    list_of_already_selected = []
    list_of_pairs = []

    if same_or_mixed == 'same':
        while len(list_of_pairs) < num_elements:
            random_key = random.choice(list(source_pairs_dict.keys()))
            pairs = source_pairs_dict[random_key]
            pair_index = random.randint(0, len(pairs) - 1)
            identifier = (random_key[0], pair_index)

            if identifier not in list_of_already_selected:
                list_of_already_selected.append(identifier)
                pair_of_indexes = pairs[pair_index]
                pair = (source_data[random_key[0] - 1][origin_list_1][pair_of_indexes[0]],
                        source_data[random_key[0] - 1][origin_list_2][pair_of_indexes[1]])
                list_of_pairs.append(pair)

    elif same_or_mixed == 'mixed':
        while len(list_of_pairs) < num_elements:

            random_key = random.choice(list(source_pairs_dict.keys()))
            pairs = source_pairs_dict[random_key]
            pair_index = random.randint(0, len(pairs) - 1)
            identifier = (random_key[0], random_key[1], pair_index)

            if identifier not in list_of_already_selected:
                list_of_already_selected.append(identifier)
                pair_with_indexes = pairs[pair_index]
                pair = (source_data[random_key[0] - 1][origin_list_1][pair_with_indexes[0]],
                        source_data[random_key[1] - 1][origin_list_2][pair_with_indexes[1]])
                list_of_pairs.append(pair)
                
    return np.array(list_of_pairs)

In [32]:
def get_alpha_beta_divergence(matrix1_p_x_p, matrix2_p_x_p, alpha, beta):
    '''
    Cálculo da subdistância gA(·, ·).
    Retorna uma tupla com: valor da divergência/distância, vetor de autovalores e matrix de autovetores.
    '''
    
    inverse2 = np.linalg.inv(matrix2_p_x_p)
    m1_dot_inverse2 = matrix1_p_x_p @ inverse2
    eigenvalues, eigenvectors = np.linalg.eig(m1_dot_inverse2)
    divergence = \
        (1 / (alpha * beta)) \
        * np.sum(np.log(np.divide((alpha * np.power(eigenvalues, beta) + beta * np.power(eigenvalues, -alpha)), (alpha + beta))))

    return divergence, eigenvalues, eigenvectors

def get_set_of_m_SPD_matrices(covariance_matrix_n_x_n, learnable_parameter_W_n_x_mp, m, p):
    '''
    Transformação de ponto em conjunto, Ts(·). Gera uma lista com m projeções, fW(·), de menor dimensão.
    '''
    Zi = np.transpose(learnable_parameter_W_n_x_mp) @ covariance_matrix_n_x_n @ learnable_parameter_W_n_x_mp
    rows, cols = Zi.shape
    block_diagonal_matrices = []
    start_row, start_col = 0, 0

    while len(block_diagonal_matrices) < m:
        end_row = min(start_row + p, rows)
        end_col = min(start_col + p, cols)
        block_diagonal_matrix = Zi[start_row:end_row, start_col:end_col]
        block_diagonal_matrices.append(block_diagonal_matrix)
        start_row += p
        start_col += p

    return np.array(block_diagonal_matrices)

def calculate_point_to_point_distance(learnable_parameter_M_m_x_m, local_distances_vector_Rm):
    '''
    Distância entre conjuntos, Ds(·, ·). Calculada com a função de integração hM (·).
    '''
    sum_of_d_M_d_terms = np.einsum('kl,k,l', learnable_parameter_M_m_x_m, local_distances_vector_Rm, local_distances_vector_Rm)
    return sum_of_d_M_d_terms

def calculate_dij_and_its_eigenvalues_and_eigenvectors(pairs_of_sets, A):
    '''
    pairs_of_sets -> list of tuples with 2 elements, each with size m.
    A -> m x 2
    dij_of_sets -> list (same length as pairs_of_sets) of lists with size m.
    eigenvalues_of_dij -> list (same length as pairs_of_sets) of lists (length m) of lists (p).
    '''
    dij_of_sets = []
    eigenvalues_of_dij = []
    eigenvectors_of_dij = []
    for Xi_set, Xj_set in pairs_of_sets:
        dij = []
        eigenvalues = []
        eigenvectors = []
        for Xki, Xkj, Ak in zip(Xi_set, Xj_set, A):
            alpha, beta = Ak
            dkij, lambda_kij, Ukij = get_alpha_beta_divergence(Xki, Xkj, alpha, beta)
            dij.append(dkij)
            eigenvalues.append(lambda_kij)
            eigenvectors.append(Ukij)
        
        dij_of_sets.append(dij)
        eigenvalues_of_dij.append(eigenvalues)
        eigenvectors_of_dij.append(eigenvectors)
    
    return np.array(dij_of_sets), np.array(eigenvalues_of_dij), np.array(eigenvectors_of_dij)

def apply_point_to_set_transformation(scm_pairs, W, m, p):

    pairs_of_sets = []
    for Xi, Xj in scm_pairs:
        Xi_set = get_set_of_m_SPD_matrices(Xi, W, m, p)
        Xj_set = get_set_of_m_SPD_matrices(Xj, W, m, p)
        pairs_of_sets.append((Xi_set, Xj_set))

    return np.array(pairs_of_sets)

In [33]:
def calculate_Dij(list_of_dij, M):
    list_of_Dij = []
    
    for dij in list_of_dij:
        Dij = calculate_point_to_point_distance(M, dij)
        list_of_Dij.append(Dij)
    
    return np.array(list_of_Dij)

In [34]:
def calculate_dL_dDij_S(Dij_S, zeta_s):
    Dij_minus_zeta_s = Dij_S - zeta_s
    Dij_minus_zeta_s[Dij_minus_zeta_s < 0] = 0
    dL_dDij = 2 * Dij_minus_zeta_s

    return dL_dDij

def calculate_dL_dDij_D(Dij_D, zeta_d):
    zeta_d_minus_Dij = zeta_d - Dij_D
    zeta_d_minus_Dij[zeta_d_minus_Dij < 0] = 0
    dL_dDij = -2 * zeta_d_minus_Dij

    return dL_dDij

def calculate_dL_ddij_S(dij_S, Dij_S, zeta_s, MT_plus_M):
    num_pairs, m = dij_S.shape

    dL_dDij = calculate_dL_dDij_S(Dij_S, zeta_s)
    reshaped_dij_S = dij_S.reshape(num_pairs, 1, m)
    reshaped_dL_dDij = dL_dDij[:, np.newaxis, np.newaxis]
    dL_dij_S = reshaped_dL_dDij * np.matmul(reshaped_dij_S, MT_plus_M)
    return dL_dij_S.reshape(num_pairs, m)

def calculate_dL_ddij_D(dij_D, Dij_D, zeta_d, MT_plus_M):
    num_pairs, m = dij_D.shape

    dL_dDij = calculate_dL_dDij_D(Dij_D, zeta_d)
    reshaped_dij_D = dij_D.reshape(num_pairs, 1, m)
    reshaped_dL_dDij = dL_dDij[:, np.newaxis, np.newaxis]
    dL_dij_D = reshaped_dL_dDij * np.matmul(reshaped_dij_D, MT_plus_M)
    return dL_dij_D.reshape(num_pairs, m)

def calculate_dL_dA(
  S,
  D,
  eignv_S,
  eignv_D,
  dij_S, 
  dij_D, 
  Dij_S, 
  Dij_D, 
  A, 
  zeta_s, 
  zeta_d,
  MT_plus_M):

  """
  Euclidean gradient of the loss function L with respect to the learnable parameter A.

  S, list of genuine, genuine pairs / tuples.
  eignv_S -> list (same length as S) of lists (length m) of lists (p).
  dij_S -> list (same length as S) of lists with size m.
  Dij_S -> list (same length as S).

  """

  def term1_ddkij_dalpha_k(alpha_k, beta_k, lambda_kij):
    term1_ddkij_dalpha_k = \
    (alpha_k * np.power(lambda_kij, beta_k) -
     alpha_k * beta_k * np.power(lambda_kij, -alpha_k) * np.log(lambda_kij)) / \
    (alpha_k * np.power(lambda_kij, beta_k) + beta_k * np.power(lambda_kij, -alpha_k))

    return term1_ddkij_dalpha_k
  
  def term1_ddkij_dbeta_k(alpha_k, beta_k, lambda_kij):
    term1_ddkij_dbeta_k = \
    (beta_k * np.power(lambda_kij, -alpha_k) -
     alpha_k * beta_k * np.power(lambda_kij, beta_k) * np.log(lambda_kij))/ \
    (alpha_k * np.power(lambda_kij, beta_k) + beta_k * np.power(lambda_kij, -alpha_k))
    
    return term1_ddkij_dbeta_k

  m, _ = A.shape
  _, _, p = eignv_S.shape
  dL_dA = np.zeros_like(A)

  for k in range(m):
    alpha_k, beta_k = A[k]

    term2_ddkij_dalpha_k = np.full((p,), alpha_k / (beta_k + alpha_k)) 
    term2_ddkij_dbeta_k = np.full((p,), beta_k / (beta_k + alpha_k))

    dL_dij_S = calculate_dL_ddij_S(dij_S, Dij_S, zeta_s, MT_plus_M)
    dL_dij_D = calculate_dL_ddij_D(dij_D, Dij_D, zeta_d, MT_plus_M)

    for Dij, dL_dij, lambda_ij in zip(Dij_S, dL_dij_S, eignv_S):

      term1_ddkij_dalpha_k_S = term1_ddkij_dalpha_k(alpha_k, beta_k, lambda_ij[k])
      term1_ddkij_dbeta_k_S = term1_ddkij_dbeta_k(alpha_k, beta_k, lambda_ij[k])

      term3_ddkij_dalpha_or_beta_S = \
      np.log((alpha_k * np.power(lambda_ij[k], beta_k) + beta_k * np.power(lambda_ij[k], -alpha_k)) / \
      (alpha_k + beta_k))

      ddkij_dalpha_k_S = (1 / np.power(alpha_k, 2) * beta_k) * (term1_ddkij_dalpha_k_S - term2_ddkij_dalpha_k - term3_ddkij_dalpha_or_beta_S)
      ddkij_dbeta_k_S = (1 / alpha_k * np.power(beta_k, 2)) * (term1_ddkij_dbeta_k_S - term2_ddkij_dbeta_k - term3_ddkij_dalpha_or_beta_S)

      ddkij_dalpha_k_S = np.sum(ddkij_dalpha_k_S)
      ddkij_dbeta_k_S = np.sum(ddkij_dbeta_k_S)

      dL_dA[k, 0] += (1 / len(S)) * dL_dij[k] * ddkij_dalpha_k_S
      dL_dA[k, 1] += (1 / len(S)) * dL_dij[k] * ddkij_dbeta_k_S
  
    for Dij, dL_dij, lambda_ij in zip(Dij_D, dL_dij_D, eignv_D):
      
      term1_ddkij_dalpha_k_D = term1_ddkij_dalpha_k(alpha_k, beta_k, lambda_ij[k])
      term1_ddkij_dbeta_k_D = term1_ddkij_dbeta_k(alpha_k, beta_k, lambda_ij[k])

      term3_ddkij_dalpha_or_beta_D = \
      np.log((alpha_k * np.power(lambda_ij[k], beta_k) + beta_k * np.power(lambda_ij[k], -alpha_k)) / \
      (alpha_k + beta_k))

      ddkij_dalpha_k_D = (1 / np.power(alpha_k, 2) * beta_k) * (term1_ddkij_dalpha_k_D - term2_ddkij_dalpha_k - term3_ddkij_dalpha_or_beta_D)
      ddij_dbeta_k_D = (1 / alpha_k * np.power(beta_k, 2)) * (term1_ddkij_dbeta_k_D - term2_ddkij_dbeta_k - term3_ddkij_dalpha_or_beta_D)

      ddkij_dalpha_k_D = np.sum(ddkij_dalpha_k_D)
      ddij_dbeta_k_D = np.sum(ddij_dbeta_k_D)

      dL_dA[k, 0] += (1 / len(D)) * dL_dij[k] * ddkij_dalpha_k_D
      dL_dA[k, 1] += (1 / len(D)) * dL_dij[k] * ddij_dbeta_k_D

  return np.clip(dL_dA, -150, 150)

def calculate_dL_dW(
    pairs_S,
    pairs_D,
    low_dim_sets_S,
    low_dim_sets_D,
    dij_S,
    dij_D,
    Dij_S,
    Dij_D,
    dij_eigenvalues_S,
    dij_eigenvalues_D,
    dij_eigenvectors_S,
    dij_eigenvectors_D,
    zeta_s,
    zeta_d,
    MT_plus_M,
    W, A, m, p):

    '''
    Calculates the Euclidean gradient of L with respect to W.

    S, list of genuine, genuine pairs of SPD matrices.
    D, list of genuine, forged pairs of SPD matrices.
    low_dim_sets_S, list of genuine, genuine pairs of low-dimensional sets of SPD matrices.
    low_dim_sets_D, list of genuine, forged pairs of low-dimensional sets of SPD matrices.
    '''

    dL_dW = np.zeros_like(W)

    dL_ddij_S = calculate_dL_ddij_S(dij_S, Dij_S, zeta_s, MT_plus_M)
    dL_ddij_D = calculate_dL_ddij_D(dij_D, Dij_D, zeta_d, MT_plus_M)

    zipped_collections = \
        zip(pairs_S + pairs_D, \
            low_dim_sets_S + low_dim_sets_D, \
            dij_eigenvalues_S + dij_eigenvalues_D, \
            dij_eigenvectors_S + dij_eigenvectors_D, \
            dL_ddij_S + dL_ddij_D)

    for Xi_Xj, Xi_set_Xj_set, lambda_ij, Uij, dL_ddij in zipped_collections:
        Xi, Xj = Xi_Xj
        Xi_set, Xj_set = Xi_set_Xj_set
        
        for k in range(m):
            alpha_k, beta_k = A[k]

            Wk = W[:, k*p : (k+1)*p]

            dL_dlambda_kij = \
                dL_ddij[k] * (1 / alpha_k*beta_k) * \
                ((alpha_k*beta_k*np.power(lambda_ij[k], beta_k-1) - alpha_k*beta_k*np.power(lambda_ij[k], -alpha_k-1)) / \
                 (alpha_k*np.power(lambda_ij[k], beta_k) + beta_k*np.power(lambda_ij[k], -alpha_k)))

            dL_dsigma_kij = np.diag(dL_dlambda_kij)

            transpose_Xki = np.transpose(Xi_set[k])
            inverse_transpose_Xki = np.linalg.inv(transpose_Xki)
            transpose_Xkj = np.transpose(Xj_set[k])
            inverse_transpose_Xkj = np.linalg.inv(transpose_Xkj)

            dL_dXki = Uij[k] @ dL_dsigma_kij @ np.transpose(Uij[k]) @ inverse_transpose_Xki
            dL_dXkj = -1 * inverse_transpose_Xkj @ transpose_Xki @ Uij[k] @ dL_dsigma_kij @ np.transpose(Uij[k]) @ inverse_transpose_Xkj

            dL_dWk = np.transpose(Xi) @ Wk @ dL_dXki + \
                     Xi @ Wk @ np.transpose(dL_dXki) + \
                     np.transpose(Xj) @ Wk @ dL_dXkj + \
                     Xj @ Wk @ np.transpose(dL_dXkj)

            dL_dW[:, k*p : (k+1)*p] = dL_dWk
    
    return dL_dW

def calculate_dL_dWR(dL_dW, W):
    '''
    Calculates the Riemannian gradient of L, with respect to W, from the Euclidean one.
    '''
    dL_dWR = dL_dW - W @ ((1 / 2) * (np.transpose(W) @ dL_dW + np.transpose(dL_dW) @ W))
    return dL_dWR

def update_W_parameter(W_tminus1, learning_rate, dL_dWR):
    W_t, _ = np.linalg.qr(W_tminus1 - learning_rate*dL_dWR)
    return W_t

def calculate_dL_dM(S, D, S_dij, D_dij, Dij_S, Dij_D, M, M0, zeta_s, zeta_d, sci):
  '''
  Euclidean gradient of the loss function with respect to the learnable parameter M.

  Receives:
  List of dij vectors representing the subdistance measures for D pairs (D_dij);
  List of dij vectors representing the subdistance measures for S pairs (S_dij);
  List of set-to-set distances for S pairs (distances_S);
  '''

  m, _ = M.shape
  num_pairs, _, _, _ = S.shape

  # (50,)
  dL_dDij_S = calculate_dL_dDij_S(Dij_S, zeta_s).reshape(num_pairs, 1)
  dL_dDij_D = calculate_dL_dDij_D(Dij_D, zeta_d).reshape(num_pairs, 1)

  term1 = (1 / len(S)) * np.sum(np.matmul(S_dij.reshape(num_pairs, m, 1), (dL_dDij_S * S_dij).reshape(num_pairs, 1, m)), axis=0)
  term2 = (1 / len(D)) * np.sum(np.matmul(D_dij.reshape(num_pairs, m, 1), (dL_dDij_D * D_dij).reshape(num_pairs, 1, m)), axis=0)
  term3 = sci * (np.linalg.inv(M0) - np.linalg.inv(M))

  dL_dM = term1 + term2 + term3
  return dL_dM

def calculate_dL_dMR(dL_dM, M):
    '''
    Calculates the Riemannian gradient of L, with respect to M, from the Euclidean one.
    '''
    dL_dMR = M @ ((1 / 2) * (dL_dM + dL_dM.T) @ M)
    return dL_dMR

def update_M_parameter(M_tminus1, learning_rate, dL_dMR):
    
    eigenvalues_Mtminus1, eigenvectors_Mtminus1 = np.linalg.eig(M_tminus1)

    # M_tminus1 = eigenvectors_Mtminus1 @ diag_Mtminus1 @ eigenvectors_Mtminus1.T
    eigenvalues_Mtminus1_power_1_by_2 = eigenvalues_Mtminus1 ** (1/2)
    eigenvalues_Mtminus1_power_minus_1_by_2 = eigenvalues_Mtminus1 ** (-1/2)
    
    M_tminus1_squareroot = eigenvectors_Mtminus1 @ np.diag(eigenvalues_Mtminus1_power_1_by_2) @ eigenvectors_Mtminus1.T
    M_tminus1_negative_squareroot = eigenvectors_Mtminus1 @ np.diag(eigenvalues_Mtminus1_power_minus_1_by_2) @ eigenvectors_Mtminus1.T

    inner_term = -learning_rate * (M_tminus1_negative_squareroot @ dL_dMR @ M_tminus1_negative_squareroot)

    eigenvalues_inner, eigenvectors_inner = np.linalg.eig(inner_term)

    expm = eigenvectors_inner @ np.diag(np.exp(eigenvalues_inner)) @ eigenvectors_inner.T

    Mt = M_tminus1_squareroot @ expm @ M_tminus1_squareroot

    return Mt

In [56]:
def run_validation(similar_training_batch, disimilar_training_batch, num_epochs):
    s_val = get_batch(validation_batch_size, similar_pairs_validating, dataset, mixed_or_same_individuals_in_similar, 'genuine_scm', 'genuine_scm')
    d_val = get_batch(validation_batch_size, disimilar_pairs_validating, dataset, mixed_or_same_individuals_in_disimilar, 'genuine_scm', 'forged_scm')

    s_pairs_of_sets_val = apply_point_to_set_transformation(s_val, W, m, p)
    d_pairs_of_sets_val = apply_point_to_set_transformation(d_val, W, m, p)

    S_dij_val, _, _ = calculate_dij_and_its_eigenvalues_and_eigenvectors(s_pairs_of_sets_val, A)
    D_dij_val, _, _ = calculate_dij_and_its_eigenvalues_and_eigenvectors(d_pairs_of_sets_val, A)

    S_Dij_val = calculate_Dij(S_dij_val, M)
    D_Dij_val = calculate_Dij(D_dij_val, M)
    total_val = np.concatenate((S_Dij_val, D_Dij_val))
    labels_val = np.concatenate((np.ones_like(S_Dij_val), np.zeros_like(D_Dij_val)))

    probabilities = np.zeros_like(total_val, dtype=float)
    probabilities[total_val < zeta_s] = 1.0
    probabilities[total_val > zeta_d] = 0.0
    in_between_mask = (total_val >= zeta_s) & (total_val <= zeta_d)
    probabilities[in_between_mask] = 1 - ((total_val[in_between_mask] - zeta_s) / (zeta_d - zeta_s))

    #fpr, tpr, thresholds = roc_curve(labels_val, probabilities)
    auc = roc_auc_score(labels_val, probabilities)
    continue_training = False
    aucs.append(auc)

    if auc < max(aucs, default=100):
        new_similar_training_batch = get_batch(training_batch_size, similar_pairs_training, dataset, 'same', 'genuine_scm', 'genuine_scm')
        new_disimilar_training_batch = get_batch(training_batch_size, disimilar_pairs_training, dataset, mixed_or_same_individuals_in_disimilar, 'genuine_scm', 'forged_scm')
        continue_training = True
        return auc, new_similar_training_batch, new_disimilar_training_batch, continue_training

    elif auc >= min(aucs, default=100) and num_epochs == max_epochs:
        return auc, similar_training_batch, disimilar_training_batch, continue_training

    return auc, similar_training_batch, disimilar_training_batch, True

In [87]:
s_pairs = get_batch(training_batch_size, similar_pairs_training, dataset, mixed_or_same_individuals_in_similar , 'genuine_scm', 'genuine_scm')
d_pairs = get_batch(training_batch_size, disimilar_pairs_training, dataset, mixed_or_same_individuals_in_disimilar , 'genuine_scm', 'forged_scm')

aucs = []

for t in range(1, max_iterations+1):
    s_pairs_of_sets = apply_point_to_set_transformation(s_pairs, W, m, p)
    d_pairs_of_sets = apply_point_to_set_transformation(d_pairs, W, m, p)

    S_dij, S_dij_eigenvalues, S_dij_eigenvectors = calculate_dij_and_its_eigenvalues_and_eigenvectors(s_pairs_of_sets, A)
    D_dij, D_dij_eigenvalues, D_dij_eigenvectors = calculate_dij_and_its_eigenvalues_and_eigenvectors(d_pairs_of_sets, A)

    S_Dij = calculate_Dij(S_dij, M)
    D_Dij = calculate_Dij(D_dij, M)

    MT_plus_M = M.T + M

    dL_dA = calculate_dL_dA(s_pairs, d_pairs, S_dij_eigenvalues, D_dij_eigenvalues, S_dij, D_dij, S_Dij, D_Dij, A, zeta_s, zeta_d, MT_plus_M)
    A = A - learning_rate * dL_dA

    dL_dW = calculate_dL_dW(s_pairs, d_pairs, s_pairs_of_sets, d_pairs_of_sets, S_dij, D_dij, S_Dij, D_Dij, S_dij_eigenvalues, D_dij_eigenvalues, S_dij_eigenvectors, D_dij_eigenvectors, zeta_s, zeta_d, MT_plus_M, W, A, m, p)
    dL_dWR = calculate_dL_dWR(dL_dW, W)
    W = update_W_parameter(W, learning_rate, dL_dWR)

    dL_dM = calculate_dL_dM(s_pairs, d_pairs, S_dij, D_dij, S_Dij, D_Dij, M, M0, zeta_s, zeta_d, sci)
    dL_dMR = calculate_dL_dMR(dL_dM, M)
    M = update_M_parameter(M, learning_rate, dL_dMR)

    if t % epoch == 0:
        auc, s_pairs, d_pairs, is_to_continue = \
            run_validation(s_pairs, d_pairs, t // epoch)
        
        if not is_to_continue:
            break

        if auc <= min(aucs):
            best_A = A.copy()
            best_M = M.copy()
            best_W = W.copy()

print('Maior AUC: '+str(max(aucs)))

Maior AUC: 0.5343375
