In [None]:
# Imports e configurações reproducíveis
import os
from pathlib import Path
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, Dropout
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns

RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)

# Paths (ajuste se necessário)
ROOT = Path(r'c:/Users/IA/Desktop/citology pipeline Train')
DATA_DIR = ROOT / 'Dataset' / 'pre-processado'
MODELS_DIR = ROOT / 'models'
os.makedirs(MODELS_DIR, exist_ok=True)

# CSV paths
TRAIN_CSV = DATA_DIR / 'train_data.csv'
VAL_CSV = DATA_DIR / 'val_data.csv'
TEST_CSV = DATA_DIR / 'test_data.csv'

print('DATA_DIR =', DATA_DIR)
print('TRAIN_CSV =', TRAIN_CSV)
print('VAL_CSV =', VAL_CSV)
print('TEST_CSV =', TEST_CSV)


In [None]:
# Carregar CSVs (verificação simples)
train_df = pd.read_csv(TRAIN_CSV)
val_df = pd.read_csv(VAL_CSV)
test_df = pd.read_csv(TEST_CSV)

print('Treino:', len(train_df), 'Val:', len(val_df), 'Test:', len(test_df))
print('Amostra das colunas do CSV:', train_df.columns.tolist())

# Exemplo de caminho (para validar se relativo ou absoluto)
sample_path = train_df.loc[0, 'image_path'] if 'image_path' in train_df.columns else None
print('Exemplo image_path:', sample_path)


In [None]:
# Parâmetros globais de treinamento
IMG_HEIGHT = 224
IMG_WIDTH = 224
BATCH_SIZE = 32
EPOCHS_BASELINE = 10
EPOCHS_ROBUST = 20
EPOCHS_FINETUNE = 10
INPUT_SHAPE = (IMG_HEIGHT, IMG_WIDTH, 3)

# Se os caminhos nas CSVs forem relativos ao DATA_DIR, usamos isso como root. Caso contrário, ajuste para None
IMAGE_ROOT = DATA_DIR


## Funções utilitárias: geradores e construtor de modelo
As funções a seguir criam geradores (com/sem augmentation) e constroem o modelo base com MobileNetV2.

In [None]:
def make_generators(train_df, val_df, test_df, image_root=IMAGE_ROOT, augment=False, batch_size=BATCH_SIZE, img_size=(IMG_HEIGHT, IMG_WIDTH)):
    """Cria ImageDataGenerators e retorna (train_gen, val_gen, test_gen)."""
    if augment:
        train_datagen = ImageDataGenerator(rescale=1./255, rotation_range=20, width_shift_range=0.1, height_shift_range=0.1, shear_range=0.1, zoom_range=0.1, horizontal_flip=True, fill_mode='nearest')
    else:
        train_datagen = ImageDataGenerator(rescale=1./255)

    val_datagen = ImageDataGenerator(rescale=1./255)
    test_datagen = ImageDataGenerator(rescale=1./255)

    common_kwargs = dict(x_col='image_path', y_col='lesion_type', target_size=img_size, class_mode='categorical')

    train_gen = train_datagen.flow_from_dataframe(dataframe=train_df, directory=str(image_root) if image_root is not None else None, batch_size=batch_size, shuffle=True, **common_kwargs)
    val_gen = val_datagen.flow_from_dataframe(dataframe=val_df, directory=str(image_root) if image_root is not None else None, batch_size=batch_size, shuffle=False, **common_kwargs)
    test_gen = test_datagen.flow_from_dataframe(dataframe=test_df, directory=str(image_root) if image_root is not None else None, batch_size=1, shuffle=False, **common_kwargs)

    return train_gen, val_gen, test_gen

def build_model(input_shape=INPUT_SHAPE, num_classes=3, base_trainable=False, learning_rate=1e-4):
    """Constrói um MobileNetV2 com topo customizado e retorna (model, base_model)"""
    base = MobileNetV2(weights='imagenet', include_top=False, input_shape=input_shape)
    base.trainable = base_trainable

    x = base.output
    x = GlobalAveragePooling2D()(x)
    x = Dropout(0.4)(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.2)(x)
    outputs = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=base.input, outputs=outputs)
    model.compile(optimizer=Adam(learning_rate=learning_rate), loss='categorical_crossentropy', metrics=['accuracy'])

    return model, base


## Função de treino genérica
Treina um modelo recebendo callbacks e salva checkpoints/final em `.keras`.

In [None]:
def train_and_save(model, train_gen, val_gen, epochs, checkpoint_path, early_stop_patience=6):
    """Treina `model` e salva melhor checkpoint em `checkpoint_path` (.keras). Retorna (history, final_path)."""
    os.makedirs(os.path.dirname(checkpoint_path), exist_ok=True)

    callbacks = [
        ModelCheckpoint(checkpoint_path, monitor='val_loss', save_best_only=True, verbose=1),
        EarlyStopping(monitor='val_loss', patience=early_stop_patience, restore_best_weights=True, verbose=1),
        ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, verbose=1)
    ]

    history = model.fit(train_gen, validation_data=val_gen, epochs=epochs, callbacks=callbacks, verbose=1)

    final_path = os.path.join(str(MODELS_DIR), os.path.basename(checkpoint_path).replace('.keras', '_final.keras'))
    try:
        best = load_model(checkpoint_path)
        best.save(final_path)
    except Exception as e:
        print('Aviso: não foi possível recarregar checkpoint para salvar final:', e)
        model.save(final_path)

    return history, final_path


## Etapa 1 — Baseline
Treinamento simples por um número fixo de épocas, sem augmentation, para obter uma linha de base.

In [None]:
# Geradores sem augmentation (Baseline)
train_gen, val_gen, test_gen = make_generators(train_df, val_df, test_df, image_root=IMAGE_ROOT, augment=False, batch_size=BATCH_SIZE)

num_classes = len(train_gen.class_indices)
print('Classes detectadas:', train_gen.class_indices)

# Construir modelo (base congelada)
baseline_model, baseline_base = build_model(input_shape=INPUT_SHAPE, num_classes=num_classes, base_trainable=False, learning_rate=1e-4)
baseline_model.summary()

checkpoint_baseline = str(MODELS_DIR / 'baseline_checkpoint.keras')
history_baseline, baseline_final_path = train_and_save(baseline_model, train_gen, val_gen, epochs=EPOCHS_BASELINE, checkpoint_path=checkpoint_baseline)
print('Baseline final salvo em:', baseline_final_path)

# Plot histórico básico
def plot_history(history, title=''):
    plt.figure(figsize=(10,4))
    plt.subplot(1,2,1)
    plt.plot(history.history['loss'], label='loss')
    plt.plot(history.history['val_loss'], label='val_loss')
    plt.legend(); plt.title(title + ' - Loss')
    plt.subplot(1,2,2)
    plt.plot(history.history['accuracy'], label='acc')
    plt.plot(history.history['val_accuracy'], label='val_acc')
    plt.legend(); plt.title(title + ' - Accuracy')
    plt.show()

plot_history(history_baseline, 'Baseline')


## Etapa 2 — Treinamento Robusto
Aplicar Data Augmentation, Transfer Learning (base congelada), callbacks e otimização simples de hiperparâmetros (busca por melhores `learning_rate`).

In [None]:
# Hiperparâmetros candidatos (exemplo simples de busca)
candidate_lrs = [1e-3, 1e-4, 5e-5]
best_val_acc = -1.0
best_model_path = None

# Geradores com augmentation
train_gen_aug, val_gen_aug, test_gen_aug = make_generators(train_df, val_df, test_df, image_root=IMAGE_ROOT, augment=True, batch_size=BATCH_SIZE)

for lr in candidate_lrs:
    print('--- Treinando candidato lr=', lr)
    model_candidate, _ = build_model(input_shape=INPUT_SHAPE, num_classes=num_classes, base_trainable=False, learning_rate=lr)
    checkpoint_path = str(MODELS_DIR / f'robust_checkpoint_lr{str(lr).replace('.', 'p')}.keras')
    history, final_path = train_and_save(model_candidate, train_gen_aug, val_gen_aug, epochs=EPOCHS_ROBUST, checkpoint_path=checkpoint_path)

    # avaliar o checkpoint salvo (val_loss otimizado) - recarregar e avaliar por val_accuracy
    try:
        loaded = load_model(checkpoint_path)
        res = loaded.evaluate(val_gen_aug, verbose=0)
        val_acc = res[1] if len(res) > 1 else None
    except Exception as e:
        print('Falha ao carregar ou avaliar checkpoint:', e)
        val_acc = history.history.get('val_accuracy', [None])[-1]

    print('lr=', lr, ' -> val_acc=', val_acc)
    if val_acc is not None and val_acc > best_val_acc:
        best_val_acc = val_acc
        best_model_path = checkpoint_path

print('Melhor modelo da etapa 2 salvo em:', best_model_path, 'com val_acc=', best_val_acc)

# Salvar cópia nomeada do melhor modelo
if best_model_path is not None and os.path.exists(best_model_path):
    robust_best = load_model(best_model_path)
    robust_best_path = str(MODELS_DIR / 'robust_best.keras')
    robust_best.save(robust_best_path)
    print('Robust best salvo em:', robust_best_path)
else:
    print('Nenhum modelo válido encontrado na etapa 2.')


## Etapa 3 — Fine-Tuning
Descongelar camadas superiores da base, reduzir a taxa de aprendizado e re-treinar cuidadosamente.

In [None]:
# Carregar o melhor modelo da Etapa 2 (se disponível)
robust_best_path = str(MODELS_DIR / 'robust_best.keras')
if os.path.exists(robust_best_path):
    model_ft = load_model(robust_best_path)
    print('Carregado robust_best.keras para fine-tuning')
else:
    fallback = str(MODELS_DIR / 'baseline_checkpoint.keras')
    if os.path.exists(fallback):
        model_ft = load_model(fallback)
        print('Carregado baseline para fine-tuning (fallback)')
    else:
        raise FileNotFoundError('Nenhum modelo encontrado para fine-tuning. Rode a Etapa 1 ou 2 primeiro.')

# Estratégia: descongelar as últimas N camadas do modelo (ajustável)
UNFREEZE_LAST_N = 30
total_layers = len(model_ft.layers)
start_idx = max(0, total_layers - UNFREEZE_LAST_N)
for i, layer in enumerate(model_ft.layers):
    layer.trainable = True if i >= start_idx else False
print(f'Tornadas treináveis as camadas a partir do índice {start_idx} (total {total_layers}).')

# Re-compilar com LR reduzida
FT_LR = 1e-5
model_ft.compile(optimizer=Adam(learning_rate=FT_LR), loss='categorical_crossentropy', metrics=['accuracy'])

# Preparar geradores (mantemos augmentation leve)
train_gen_ft, val_gen_ft, test_gen_ft = make_generators(train_df, val_df, test_df, image_root=IMAGE_ROOT, augment=True, batch_size=BATCH_SIZE)

checkpoint_ft = str(MODELS_DIR / 'final_finetuned_checkpoint.keras')
history_ft, final_ft_path = train_and_save(model_ft, train_gen_ft, val_gen_ft, epochs=EPOCHS_FINETUNE, checkpoint_path=checkpoint_ft, early_stop_patience=5)
print('Fine-tuning final salvo em:', final_ft_path)
plot_history(history_ft, 'Fine-Tune')


## Avaliação Final e Relatórios
Carregar o modelo final salvo e avaliar no conjunto de teste, gerando matriz de confusão e relatório de classificação.

In [None]:
# Selecionar modelo para avaliação: preferir fine-tuned > robust > baseline
candidates = [str(MODELS_DIR / 'final_finetuned_checkpoint.keras'), str(MODELS_DIR / 'robust_best.keras'), str(MODELS_DIR / 'baseline_checkpoint.keras')]
model_eval = None
for p in candidates:
    if os.path.exists(p):
        print('Usando modelo para avaliação:', p)
        model_eval = load_model(p)
        break

if model_eval is None:
    raise FileNotFoundError('Nenhum modelo disponível para avaliação. Execute as etapas anteriores.')

# Criar gerador de teste (batch_size=1 para previsões exatas)
_, _, test_gen_eval = make_generators(train_df, val_df, test_df, image_root=IMAGE_ROOT, augment=False, batch_size=BATCH_SIZE)

print('Avaliando no conjunto de teste...')
test_loss, test_acc = model_eval.evaluate(test_gen_eval, steps=len(test_gen_eval), verbose=1)
print(f'Test loss: {test_loss:.4f} | Test acc: {test_acc:.4f}')

# Previsões completas para relatório e matriz de confusão
y_pred_probs = model_eval.predict(test_gen_eval, steps=len(test_gen_eval), verbose=1)
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = test_gen_eval.classes
labels = list(test_gen_eval.class_indices.keys())

cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)
plt.xlabel('Predito')
plt.ylabel('Verdadeiro')
plt.title('Matriz de Confusão (Teste)')
plt.show()

print('Relatório de classificação (Teste):')
print(classification_report(y_true, y_pred, target_names=labels))
