## Teste com Dataset Animal

In [None]:
# Importa todas as bibliotecas
import nibabel as nib
import numpy as np
import copy
import itertools
import os
import cv2
import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras import layers, models, callbacks, metrics, Input, Model, regularizers
import scipy.ndimage as ndi
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from tqdm import tqdm
from sklearn.metrics import confusion_matrix, classification_report, ConfusionMatrixDisplay, auc, precision_recall_curve
from sklearn.utils.class_weight import compute_class_weight
from sklearn.model_selection import train_test_split
import random

In [None]:
def load_animal_data(folder):
    """
    Carrega os dados de um animal do diretório fornecido

    Args:
        folder (str): Caminho da pasta contendo os dados dos animais.

    Returns:
        dict: Dados do paciente, incluindo imagens, máscaras e labels para os lados esquerdo e direito.
              Retorna None se o paciente não for encontrado.
    """

    types_image = ["Dog", "Cat"]
    dogs = os.path.join(folder, types_image[0])
    cats = os.path.join(folder, types_image[1])

    # Verifica se os diretórios existem
    if not os.path.exists(dogs) or not os.path.exists(cats):
        print(f"Estrutura de diretórios inválida para o animal fornecido.")
        return None

    # Listas temporárias para armazenar os dados originais
    dog_images = []
    cat_images = []
    dogs_label = [] 
    cats_label = []
    
    # Tamanho fixo para redimensionamento
    target_size = (224, 224)

    # Carrega as imagens e máscaras do lado esquerdo e direito
    for dog in sorted(os.listdir(dogs)):
        # Carrega os dados do cachorro
        data_dog = cv2.imread(os.path.join(dogs, dog))
        if data_dog is not None:
            data_dog = cv2.resize(data_dog, target_size)
            dog_images.append(data_dog)
            dogs_label.append(1)
    
    for cat in sorted(os.listdir(cats)):
        # Carrega os dados do gato
        data_cat = cv2.imread(os.path.join(cats, cat))
        if data_cat is not None:
            data_cat = cv2.resize(data_cat, target_size)
            cat_images.append(data_cat)
            cats_label.append(0)
            
    print(f"Total de recortes: {len(dog_images) + len(cat_images)}")
    
    return dog_images, cat_images, dogs_label, cats_label

In [None]:
def get_augmenter(input_shape, seed=42):
    """Cria uma camada de aumento de dados que aplica transformações aleatórias."""
    return models.Sequential([
        layers.Input(shape=input_shape),
        layers.RandomFlip("horizontal_and_vertical", seed=seed),
        layers.RandomRotation(0.25, seed=seed),
    ], name="augmenter")

def supervised_nt_xent_loss(projections, labels, temperature=0.5):
    """
    Calcula a perda NT-Xent supervisionada para classificação entre cães e gatos.
    Considera pares da mesma classe como positivos, mesmo que sejam imagens diferentes.
    """
    batch_size = tf.shape(projections)[0]
    
    # Normalizar as projeções
    projections = tf.math.l2_normalize(projections, axis=1)
    
    # Calcular matriz de similaridade
    similarity_matrix = tf.matmul(projections, projections, transpose_b=True)
    
    # Garantir que os labels tenham o formato correto
    labels = tf.reshape(labels, [-1])
    
    # Criar máscara para pares positivos (imagens da mesma classe)
    labels_matrix = tf.equal(labels[:, tf.newaxis], labels[tf.newaxis, :])
    positive_mask = tf.cast(labels_matrix, tf.float32)
    
    # Remover a diagonal (não comparar uma imagem com ela mesma)
    positive_mask = tf.linalg.set_diag(positive_mask, tf.zeros(batch_size))
    
    # Calcular a perda NT-Xent
    logits = similarity_matrix / temperature
    
    # Para estabilidade numérica, subtrair o máximo dos logits
    logits_max = tf.reduce_max(logits, axis=1, keepdims=True)
    logits = logits - logits_max
    
    # Calcular a loss para cada exemplo
    exp_logits = tf.exp(logits)
    
    # Máscara para pares negativos (imagens de classes diferentes)
    negative_mask = 1 - positive_mask
    negative_mask = tf.linalg.set_diag(negative_mask, tf.zeros(batch_size))
    
    # Calcular o denominador (soma sobre todos os pares negativos)
    denominator = tf.reduce_sum(exp_logits * negative_mask, axis=1, keepdims=True)
    
    # Calcular o numerador (soma sobre todos os pares positivos)
    numerator = tf.reduce_sum(exp_logits * positive_mask, axis=1, keepdims=True)
    
    # Calcular a loss para cada par positivo
    loss = -tf.math.log(numerator / (numerator + denominator + 1e-8))
    
    # Calcular a média apenas sobre os pares positivos existentes
    num_positive_pairs = tf.reduce_sum(positive_mask)
    total_loss = tf.reduce_sum(loss * positive_mask) / (num_positive_pairs + 1e-8)
    
    return total_loss

class SupervisedSimCLRModel(Model):
    def __init__(self, encoder, projection_head, augmenter):
        super().__init__()
        self.encoder = encoder
        self.projection_head = projection_head
        self.augmenter = augmenter
    
    def compile(self, optimizer, **kwargs):
        super().compile(**kwargs)
        self.optimizer = optimizer

    def train_step(self, data):
        # Recebemos imagens e labels
        images, labels = data

        # Gerar duas visões aumentadas de cada imagem
        augmented_1 = self.augmenter(images)
        augmented_2 = self.augmenter(images)
        
        # Concatenar todas as visões
        all_views = tf.concat([augmented_1, augmented_2], axis=0)
        
        # Replicar os labels para corresponder às visões aumentadas
        extended_labels = tf.concat([labels, labels], axis=0)
        
        with tf.GradientTape() as tape:
            embeddings = self.encoder(all_views)
            projections = self.projection_head(embeddings)
            
            # Calcular a perda usando todas as projeções e labels estendidos
            loss = supervised_nt_xent_loss(projections, extended_labels)
            
        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        
        return {"loss": loss}

    def test_step(self, data):
        images, labels = data
        
        # Gerar duas visões aumentadas de cada imagem
        augmented_1 = self.augmenter(images)
        augmented_2 = self.augmenter(images)
        
        # Concatenar todas as visões
        all_views = tf.concat([augmented_1, augmented_2], axis=0)
        
        # Replicar os labels para corresponder às visões aumentadas
        extended_labels = tf.concat([labels, labels], axis=0)
        
        embeddings = self.encoder(all_views)
        projections = self.projection_head(embeddings)
        
        loss = supervised_nt_xent_loss(projections, extended_labels)
        
        return {"loss": loss}

def build_encoder(input_shape):
    """Constrói um encoder similar ao usado no SimCLR"""
    return models.Sequential([
        layers.Input(shape=input_shape),
        layers.Conv2D(32, (3,3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2,2)),
        layers.Dropout(0.3),
        layers.Conv2D(64, (3,3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2,2)),
        layers.Dropout(0.3),
        layers.Conv2D(128, (3,3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.GlobalAveragePooling2D(),
        layers.Dense(256, activation='relu'),
        layers.BatchNormalization(),
    ], name="encoder")

def build_projection_head(embedding_dim=256, projection_dim=128):
    """Constrói uma cabeça de projeção"""
    return models.Sequential([
        layers.Input(shape=(embedding_dim,)),
        layers.Dense(projection_dim, activation='relu'),
        layers.BatchNormalization(),
        layers.Dense(projection_dim),  # Sem ativação na última camada
    ], name="projection_head")
    
def build_classifier(encoder, input_shape, num_classes):
    """
    Constrói um classificador tradicional usando o encoder pré-treinado.
    """
    # Congelar o encoder inicialmente para fine-tuning gradual
    encoder.trainable = False
        
    # Adicionar camadas de classificação
    model = models.Sequential([
        layers.Input(shape=input_shape),
        encoder,
        layers.Dense(num_classes, activation='softmax')
    ])
    
    # Criar o modelo
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
                  loss='binary_crossentropy',
                  metrics=['accuracy', metrics.Precision(name="precision"), metrics.Recall(name="recall")])
    
    return model

In [None]:
# Função de plot do treinamento do modelo
def plot_training_history(history):
    plt.figure(figsize=(12, 4))

    # Plot Loss
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    if 'val_loss' in history.history:
        plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Loss Graphic')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    # Plot Accuracy
    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    if 'val_accuracy' in history.history:
        plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Accuracy Graphic')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.show()

# Função de plot da matriz de confusão
def plot_confusion_matrix(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot(cmap=plt.cm.Blues)
    plt.title('Confusion Matrix')
    plt.show()


In [None]:
folder = "D:/Dataset-Dogs_Cats"
dogs_data, cats_data, dogs_label_data, cats_label_data = load_animal_data(folder)

In [None]:
print("\n--- Iniciando Etapa 1: Pré-treinamento Auto-Supervisionado com Labels ---")

# Preparar os dados de pré-treinamento (incluindo labels)
pretrain_data = np.concatenate([dogs_data[:1000], cats_data[:1000]], axis=0)
pretrain_label = np.concatenate([dogs_label_data[:1000], cats_label_data[:1000]], axis=0)
print(f"Total de imagens para treinamento: {len(pretrain_data)}")

input_shape = pretrain_data[0].shape
print(f"Shape das imagens para treinamento: {input_shape}")

encoder = build_encoder(input_shape)
projection_head = build_projection_head()
augmenter = get_augmenter(input_shape)

In [None]:
sscl_model = SupervisedSimCLRModel(encoder, projection_head, augmenter)
sscl_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01))

indices = np.arange(len(pretrain_data))
np.random.shuffle(indices)
pretrain_data = pretrain_data[indices]
pretrain_labels = pretrain_label[indices]

# Treinar o modelo contrastivo
history_sscl = sscl_model.fit(
    pretrain_data,
    pretrain_label, 
    batch_size=64,
    epochs=100
)

# Salvar os pesos do encoder pré-treinado
encoder.save_weights('animal_encoder_pretrained.weights.h5')
print("Pesos do encoder pré-treinado salvos com sucesso!")

In [None]:
print("\n--- Iniciando Etapa 2: Fine-tuning Supervisionado ---")

# Construir o classificador siamês usando o encoder pré-treinado
animal_classifier = build_classifier(encoder, input_shape)

animal_classifier.summary()

In [None]:
# Preparar os dados de treinamento (incluindo labels)
train_data = np.concatenate([dogs_data[1000:3000], cats_data[1000:3000]], axis=0)
train_label = np.concatenate([dogs_label_data[1000:3000], cats_label_data[1000:3000]], axis=0)
# Preparar os dados de validaçao (incluindo labels)
valid_data = np.concatenate([dogs_data[3000:3750], cats_data[3000:3750]], axis=0)
valid_label = np.concatenate([dogs_label_data[3000:3750], cats_label_data[3000:3750]], axis=0)
# Preparar os dados de teste (incluindo labels)
test_data = np.concatenate([dogs_data[3750:4100], cats_data[3750:4100]], axis=0)
test_label = np.concatenate([dogs_label_data[3750:4100], cats_label_data[3750:4100]], axis=0)

In [None]:
# Callbacks
checkpoint = callbacks.ModelCheckpoint(
    'animal_trained.weights.h5', 
    monitor='val_loss', 
    save_best_only=True,  
    save_weights_only=True, 
    mode='min'
)

# Treinamento do classificador
history = animal_classifier.fit(
    train_data,
    train_label,
    validation_data=(valid_data, valid_label),
    batch_size=128, 
    epochs=50, 
    callbacks=[checkpoint]
)

print("Treinamento concluído com sucesso!")

In [None]:
# Plotar o histórico do treinamento
plot_training_history(history)

In [None]:
# Carregando pesos da melhor época
animal_classifier.load_weights('animal_trained.weights.h5')

# Avaliar o modelo na validação
y_pred_train = (animal_classifier.predict(train_data) > 0.5).astype(int)

# Avaliar o modelo na validação
y_pred_valid = (animal_classifier.predict(valid_data) > 0.5).astype(int)

# Avaliar o modelo no teste
y_pred_test = (animal_classifier.predict(test_data) > 0.5).astype(int)

# Calcula a curva precision-recall
precision, recall, _ = precision_recall_curve(test_label, y_pred_test)

# Calcula a AUC precision-recall
auc_pr = auc(recall, precision)

# Plote a curva precision-recall
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, label=f'AUC = {auc_pr:.2f}')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='best')
plt.grid(alpha=0.5)
plt.show()

In [None]:
# Gerar o relatório de classificação
print("Treino:")
print(classification_report(train_label, y_pred_train))
print("\n#########################################################\n")
print("Validação:")
print(classification_report(valid_label, y_pred_valid))
print("\n#########################################################\n")
print("Teste:")
#print(classification_report(y_test, y_pred_test)) 
print(classification_report(test_label, y_pred_test)) 

In [None]:
# Gerar a matriz de confusão
print("Validação:")
plot_confusion_matrix(valid_label, y_pred_valid)
print("Teste:")
plot_confusion_matrix(test_label, y_pred_test)