In [6]:
# Importações essenciais para deep learning e processamento de dados
import os
import numpy as np
import pandas as pd
import cv2
from datetime import datetime, timedelta
import random
import time
import psutil
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import tensorflow as tf
from keras.applications import ResNet50
from keras.layers import Dense, GlobalAveragePooling2D, Dropout
from keras.models import Model
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
from keras.utils import to_categorical
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from collections import Counter

# Configuração de reprodutibilidade
tf.random.set_seed(42)
np.random.seed(42)
random.seed(42)

print(f"TensorFlow version: {tf.__version__}")
print(f"GPU disponível: {tf.config.list_physical_devices('GPU')}")

TensorFlow version: 2.20.0
GPU disponível: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [None]:
# Configurações do experimento
IMG_SIZE = 224  # Tamanho da imagem (224x224 pixels)
BATCH_SIZE = 32  # Tamanho do lote para treinamento
EPOCHS = 100  # Número máximo de épocas
VALIDATION_SPLIT = 0.3  # 30% dos dados para validação

# Caminhos dos datasets
FER2013_PATH = r"src/data/FER2013"
RAF_DB_PATH = r"src/data/ RAF-DB"
DFEW_DB_PATH = r"src/data/DFEW"


# Mapeamento das 7 emoções básicas
EMOTION_LABELS = {
    'anger': 0, 'disgust': 1, 'fear': 2, 'happy': 3, 
    'neutral': 4, 'sadness': 5, 'surprise': 6
}

print("Configurações definidas:")
print(f"- Tamanho da imagem: {IMG_SIZE}x{IMG_SIZE}")
print(f"- Batch size: {BATCH_SIZE}")
print(f"- Épocas máximas: {EPOCHS}")
print(f"- Classes de emoção: {len(EMOTION_LABELS)}")

Configurações definidas:
- Tamanho da imagem: 96x96
- Batch size: 32
- Épocas máximas: 100
- Classes de emoção: 7


In [4]:
class TrainingMonitor:
    """
    Classe para monitorar desempenho computacional durante o treinamento.
    Essencial para experimentos científicos reproduzíveis.
    """
    
    def __init__(self):
        self.start_time = None
        self.end_time = None
        self.peak_memory_mb = 0
        self.initial_memory_mb = 0
        self.process = psutil.Process()
        
    def start_monitoring(self):
        """Inicia o monitoramento de tempo e memória"""
        self.start_time = time.time()
        self.initial_memory_mb = self._get_memory_usage()
        self.peak_memory_mb = self.initial_memory_mb
        print(f"Iniciando treinamento ResNet50...")
        print(f"Horário de início: {time.strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"Memória inicial: {self.initial_memory_mb:.2f} MB")
        print("-" * 50)
        
    def _get_memory_usage(self):
        """Retorna uso atual de memória em MB"""
        return self.process.memory_info().rss / 1024 / 1024
        
    def update_peak_memory(self):
        """Atualiza o pico de memória se necessário"""
        current_memory = self._get_memory_usage()
        if current_memory > self.peak_memory_mb:
            self.peak_memory_mb = current_memory

# Instancia o monitor
monitor = TrainingMonitor()

# VER COM LU COMO ELA SALVO OS DADOS PRÉ-PROCESSADOS E ARRUMAR A CELULA ABAIXO

In [None]:
import pickle

def load_preprocessed_data():
    """
    Carrega dados já pré-processados e balanceados do formato pickle.
    Converte imagens de escala de cinza para RGB para compatibilidade com ResNet50.
    """
    print("Carregando dados pré-processados do pickle...")
    
    try:
        # Carrega os dados do pickle
        with open('X_train.pkl', 'rb') as f:
            X_train = pickle.load(f)
        with open('y_train.pkl', 'rb') as f:
            y_train = pickle.load(f)
        with open('X_test.pkl', 'rb') as f:
            X_test = pickle.load(f)
        with open('y_test.pkl', 'rb') as f:
            y_test = pickle.load(f)
        
        print(f"Dados carregados com sucesso:")
        print(f"- X_train: {X_train.shape}")
        print(f"- y_train: {y_train.shape}")
        print(f"- X_test: {X_test.shape}")
        print(f"- y_test: {y_test.shape}")
        
        # Converte de escala de cinza (H, W, 1) para RGB (H, W, 3)
        # ResNet50 foi treinado com 3 canais
        if X_train.shape[-1] == 1:
            X_train = np.repeat(X_train, 3, axis=-1)
            X_test = np.repeat(X_test, 3, axis=-1)
            print("Imagens convertidas de escala de cinza para RGB (3 canais)")
        
        # Verifica distribuição de classes
        train_distribution = dict(Counter(y_train))
        test_distribution = dict(Counter(y_test))
        
        print(f"Distribuição treino: {train_distribution}")
        print(f"Distribuição teste: {test_distribution}")
        
        return X_train, y_train, X_test, y_test
        
    except FileNotFoundError as e:
        print(f"Erro: Arquivo não encontrado - {e}")
        print("Certifique-se de que os arquivos pickle estão no diretório correto")
        return None, None, None, None
    except Exception as e:
        print(f"Erro ao carregar dados: {e}")
        return None, None, None, None

# Carrega os dados
X_train, y_train, X_test, y_test = load_preprocessed_data()
monitor.update_peak_memory()

In [None]:
def create_experiment_structure():
    """
    Cria estrutura de diretórios para salvar modelos e métricas.
    """
    # Cria timestamp único para o experimento
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    experiment_id = f"resnet50_emotion_{timestamp}"
    
    # Cria diretórios
    os.makedirs("models", exist_ok=True)
    os.makedirs("metrics", exist_ok=True)
    os.makedirs("plots", exist_ok=True)
    
    return experiment_id

def save_model_if_good_performance(model, accuracy, f1_score, experiment_id, threshold=0.85):
    """
    Salva modelo apenas se a performance for boa.
    
    Args:
        model: Modelo treinado
        accuracy: Acurácia do modelo
        f1_score: F1-score macro do modelo
        experiment_id: ID único do experimento
        threshold: Limite mínimo para salvar (default: 85%)
    """
    if accuracy >= threshold or f1_score >= threshold:
        model_path = f"models/resnet50_emotion_{experiment_id}.pkl"
        
        # Salva apenas os pesos para economia de espaço
        model.save_weights(f"models/weights_resnet50_{experiment_id}.h5")
        
        # Salva configuração do modelo
        model_config = {
            'architecture': 'ResNet50',
            'img_size': IMG_SIZE,
            'num_classes': 7,
            'experiment_id': experiment_id,
            'accuracy': accuracy,
            'f1_score': f1_score,
            'timestamp': datetime.now().isoformat()
        }
        
        with open(f"models/config_resnet50_{experiment_id}.pkl", 'wb') as f:
            pickle.dump(model_config, f)
        
        print(f"Modelo salvo! Performance: Acc={accuracy:.4f}, F1={f1_score:.4f}")
        return True
    else:
        print(f"Performance insuficiente para salvar. Acc={accuracy:.4f}, F1={f1_score:.4f} < {threshold}")
        return False

def save_metrics_to_csv(metrics_dict, experiment_id):
    """
    Salva métricas de performance em arquivo CSV.
    
    Args:
        metrics_dict: Dicionário com todas as métricas
        experiment_id: ID único do experimento
    """
    # Converte métricas para DataFrame
    metrics_df = pd.DataFrame([metrics_dict])
    
    # Arquivo CSV principal
    csv_path = "metrics/performance_metrics.csv"
    
    # Append ao CSV se já existir, senão cria novo
    if os.path.exists(csv_path):
        metrics_df.to_csv(csv_path, mode='a', header=False, index=False)
    else:
        metrics_df.to_csv(csv_path, index=False)
    
    # Salva também arquivo individual do experimento
    individual_csv = f"metrics/metrics_{experiment_id}.csv"
    metrics_df.to_csv(individual_csv, index=False)
    
    print(f"Métricas salvas em: {csv_path} e {individual_csv}")

# Inicializa estrutura do experimento
experiment_id = create_experiment_structure()
print(f"Experimento iniciado: {experiment_id}")

In [None]:
def create_resnet50_model():
    """
    Cria modelo ResNet50 com transfer learning para classificação de emoções.
    
    Arquitetura:
    - ResNet50 pré-treinado (ImageNet) como feature extractor
    - Global Average Pooling para redução dimensional
    - Camadas densas para classificação final
    - Dropout para regularização
    
    Returns:
        keras.Model: Modelo compilado pronto para treinamento
    """
    # Carrega ResNet50 pré-treinado no ImageNet
    base_model = ResNet50(
        weights='imagenet',          # Pesos pré-treinados
        include_top=False,           # Remove camadas de classificação originais
        input_shape=(IMG_SIZE, IMG_SIZE, 3)  # Define formato de entrada
    )
    
    # Congela camadas base para transfer learning
    base_model.trainable = False
    
    # Adiciona camadas de classificação customizadas
    x = base_model.output
    x = GlobalAveragePooling2D()(x)           # Reduz dimensionalidade espacial
    x = Dense(512, activation='relu')(x)       # Primeira camada densa
    x = Dropout(0.5)(x)                       # Regularização forte
    x = Dense(256, activation='relu')(x)       # Segunda camada densa
    x = Dropout(0.3)(x)                       # Regularização moderada
    predictions = Dense(7, activation='softmax')(x)  # Classificação final (7 emoções)
    
    # Cria modelo final
    model = Model(inputs=base_model.input, outputs=predictions)

    # Compilação otimizada para classificação multiclasse
    model.compile(
        optimizer=Adam(learning_rate=0.0001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    return model

# Verifica se os dados foram carregados corretamente antes de criar o modelo
if X_train is not None:
    print("Criando modelo ResNet50 otimizado...")
    model, base_model = create_resnet50_model()
    
    print(f"Modelo criado com sucesso:")
    print(f"- Total de parâmetros: {model.count_params():,}")
    print(f"- Parâmetros treináveis: {sum([tf.keras.backend.count_params(p) for p in model.trainable_weights]):,}")
    print(f"- Camadas congeladas: {len([l for l in base_model.layers if not l.trainable])}")
    
    monitor.update_peak_memory()
else:
    print("Erro: Dados não carregados. Verifique a célula anterior.")

In [None]:
class MemoryCallback(tf.keras.callbacks.Callback):
    """
    Callback customizado para monitoramento de memória durante treinamento.
    Essencial para experimentos com recursos limitados.
    """
    
    def __init__(self, monitor):
        super().__init__()
        self.monitor = monitor
        
    def on_epoch_end(self, epoch, logs=None):
        self.monitor.update_peak_memory()
        if epoch % 5 == 0:  # Log a cada 5 épocas
            current_memory = self.monitor._get_memory_usage()
            print(f"Época {epoch+1} - Memória atual: {current_memory:.2f} MB")

def setup_training_callbacks(monitor):
    """
    Configura callbacks para treinamento otimizado.
    
    Returns:
        list: Lista de callbacks configurados
    """
    # Early Stopping: Para evitar overfitting
    early_stopping = EarlyStopping(
        monitor='val_loss',           # Métrica para monitorar
        patience=15,                  # Épocas sem melhoria antes de parar
        restore_best_weights=True,    # Restaura melhores pesos
        verbose=1,                    # Mostra quando para
        mode='min'                    # Minimizar loss
    )
    
    # Reduce Learning Rate: Ajuste adaptativo da taxa de aprendizado
    reduce_lr = ReduceLROnPlateau(
        monitor='val_loss',           # Métrica para monitorar
        factor=0.2,                   # Fator de redução (lr = lr * factor)
        patience=10,                  # Épocas sem melhoria antes de reduzir
        min_lr=1e-7,                  # Taxa mínima de aprendizado
        verbose=1,                    # Mostra quando reduz
        mode='min'
    )
    
    # Memory monitoring callback
    memory_callback = MemoryCallback(monitor)
    
    return [early_stopping, reduce_lr, memory_callback]

# Preparação dos dados para treinamento
print("Preparando dados para treinamento...")

# Divisão treino/validação estratificada
X_train_split, X_val, y_train_split, y_val = train_test_split(
    X_train, y_train, 
    test_size=VALIDATION_SPLIT,   # 30% para validação
    stratify=y_train,             # Mantém proporção por classe
    random_state=42               # Reprodutibilidade
)

# Converte labels para formato categorical (one-hot encoding)
y_train_cat = to_categorical(y_train_split, 7)
y_val_cat = to_categorical(y_val, 7)
y_test_cat = to_categorical(y_test, 7)

print(f"Dados de treino: {X_train_split.shape}")
print(f"Dados de validação: {X_val.shape}")
print(f"Dados de teste: {X_test.shape}")
print(f"Distribuição de classes - Treino: {dict(Counter(y_train_split))}")
print(f"Distribuição de classes - Validação: {dict(Counter(y_val))}")

# Configura callbacks
callbacks = setup_training_callbacks(monitor)
print("Callbacks configurados para treinamento otimizado")

In [None]:
def train_resnet50_model(model, X_train, y_train, X_val, y_val, monitor, callbacks):
    """
    Executa treinamento completo do ResNet50 com monitoramento detalhado.
    
    Returns:
        tuple: (history, training_metrics)
    """
    print("Iniciando treinamento ResNet50...")
    monitor.start_monitoring()
    
    # Inicia cronômetro específico do treinamento
    training_start_time = time.time()
    
    # Executa treinamento
    history = model.fit(
        X_train, y_train,
        batch_size=BATCH_SIZE,
        epochs=EPOCHS,
        validation_data=(X_val, y_val),
        callbacks=callbacks,
        verbose=1,
        shuffle=True
    )
    
    # Calcula tempo de treinamento
    training_end_time = time.time()
    training_duration = training_end_time - training_start_time
    
    # Métricas do treinamento
    training_metrics = {
        'training_time_seconds': training_duration,
        'training_time_formatted': str(timedelta(seconds=int(training_duration))),
        'epochs_completed': len(history.history['accuracy']),
        'best_train_accuracy': max(history.history['accuracy']),
        'best_val_accuracy': max(history.history['val_accuracy']),
        'final_train_loss': history.history['loss'][-1],
        'final_val_loss': history.history['val_loss'][-1]
    }
    
    print(f"Treinamento concluído em: {training_metrics['training_time_formatted']}")
    print(f"Épocas executadas: {training_metrics['epochs_completed']}")
    print(f"Melhor acurácia validação: {training_metrics['best_val_accuracy']:.4f}")
    
    return history, training_metrics

# Preparação dos dados se carregamento foi bem-sucedido
if X_train is not None and y_train is not None:
    
    # Divisão estratificada treino/validação
    X_train_split, X_val, y_train_split, y_val = train_test_split(
        X_train, y_train,
        test_size=VALIDATION_SPLIT,
        stratify=y_train,
        random_state=42
    )
    
    # Conversão para categorical (one-hot encoding)
    y_train_cat = to_categorical(y_train_split, 7)
    y_val_cat = to_categorical(y_val, 7)
    y_test_cat = to_categorical(y_test, 7)
    
    print(f"Preparação concluída:")
    print(f"- Treino: {X_train_split.shape} | Labels: {y_train_cat.shape}")
    print(f"- Validação: {X_val.shape} | Labels: {y_val_cat.shape}")
    print(f"- Teste: {X_test.shape} | Labels: {y_test_cat.shape}")
    
    # Configura callbacks
    callbacks = setup_training_callbacks(monitor)
    
    # Executa treinamento
    history, training_metrics = train_resnet50_model(
        model, X_train_split, y_train_cat, X_val, y_val_cat, monitor, callbacks
    )
    
    print("Treinamento finalizado com sucesso!")
    
else:
    print("Erro: Dados não disponíveis para treinamento")