# SPR 2026 - BERTimbau v6 (Threshold Tuning por Classe)

**Baseado em experimentos do Colab:**
- Domain Adaptive Pretrain (MLM) com laudos m√©dicos
- Fine-tuning com Focal Loss
- **Threshold tuning por classe** (t√©cnica principal)

## Resultados do Colab (valida√ß√£o):
| Config | F1-Macro |
|--------|----------|
| Baseline (argmax) | 0.78665 |
| Threshold tuning | **0.84896** |

## Estrat√©gia:
- Usar modelo BERTimbau + Focal Loss (j√° treinado)
- Aplicar thresholds otimizados por classe na infer√™ncia
- N√£o requer retreinar

---
## CONFIGURA√á√ÉO KAGGLE:
1. **Add Input** ‚Üí **Dataset** ‚Üí `bertimbau-ptbr-complete` (ou seu modelo)
2. **Add Input** ‚Üí **Competition** ‚Üí `spr-2026-mammography-report-classification`
3. **Settings** ‚Üí Internet ‚Üí **OFF**
---

In [None]:
# ===== SPR 2026 - BERTIMBAU v6 (THRESHOLD TUNING) =====

import os
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import f1_score
import warnings
warnings.filterwarnings('ignore')

print("="*60)
print("SPR 2026 - BERTimbau v6 (Threshold Tuning)")
print("="*60)

SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)

# Paths
DATA_DIR = '/kaggle/input/competitions/spr-2026-mammography-report-classification'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")

In [None]:
# ===== ENCONTRAR MODELO =====
print("\n[1/6] Buscando modelo BERTimbau...")

def find_model_path():
    """Busca recursiva por config.json"""
    base = '/kaggle/input'
    
    def has_config(path):
        return os.path.isdir(path) and os.path.exists(os.path.join(path, 'config.json'))
    
    def search_dir(directory, depth=0, max_depth=6):
        if depth > max_depth:
            return None
        try:
            for item in os.listdir(directory):
                path = os.path.join(directory, item)
                if os.path.isdir(path):
                    if has_config(path):
                        return path
                    result = search_dir(path, depth + 1, max_depth)
                    if result:
                        return result
        except:
            pass
        return None
    
    return search_dir(base)

MODEL_PATH = find_model_path()
if MODEL_PATH:
    print(f"‚úÖ Modelo encontrado: {MODEL_PATH}")
else:
    raise FileNotFoundError("Modelo n√£o encontrado! Adicione BERTimbau como input.")

In [None]:
# ===== CARREGAR DADOS =====
print("\n[2/6] Carregando dados...")

train_df = pd.read_csv(f'{DATA_DIR}/train.csv')
test_df = pd.read_csv(f'{DATA_DIR}/test.csv')

# Auto-detectar colunas
TEXT_COL = 'report' if 'report' in train_df.columns else 'text'
LABEL_COL = 'target' if 'target' in train_df.columns else 'label'
ID_COL = 'ID' if 'ID' in test_df.columns else 'id'

print(f"Train: {train_df.shape} | Test: {test_df.shape}")
print(f"Colunas: texto={TEXT_COL}, label={LABEL_COL}, id={ID_COL}")
print(f"\nDistribui√ß√£o de classes:")
print(train_df[LABEL_COL].value_counts().sort_index())

In [None]:
# ===== CARREGAR MODELO E TOKENIZER =====
print("\n[3/6] Carregando modelo e tokenizer...")

tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, local_files_only=True)
model = AutoModelForSequenceClassification.from_pretrained(
    MODEL_PATH,
    num_labels=7,
    local_files_only=True
)
model.to(device)
model.eval()

print(f"‚úÖ Modelo carregado: {sum(p.numel() for p in model.parameters()):,} params")

In [None]:
# ===== DATASET CLASS =====
print("\n[4/6] Preparando datasets...")

MAX_LEN = 256

class TextDataset(Dataset):
    def __init__(self, texts, labels=None):
        self.texts = texts
        self.labels = labels
        
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        encoding = tokenizer(
            str(self.texts[idx]),
            truncation=True,
            max_length=MAX_LEN,
            padding='max_length',
            return_tensors='pt'
        )
        item = {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze()
        }
        if self.labels is not None:
            item['label'] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

# Split para valida√ß√£o
from sklearn.model_selection import train_test_split
train_texts, val_texts, train_labels, val_labels = train_test_split(
    train_df[TEXT_COL].values,
    train_df[LABEL_COL].values,
    test_size=0.1,
    random_state=SEED,
    stratify=train_df[LABEL_COL].values
)

val_dataset = TextDataset(val_texts, val_labels)
test_dataset = TextDataset(test_df[TEXT_COL].values)

val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

print(f"Val: {len(val_dataset)} | Test: {len(test_dataset)}")

In [None]:
# ===== GERAR PROBABILIDADES NA VALIDA√á√ÉO =====
print("\n[5/6] Otimizando thresholds...")

def get_predictions(loader):
    """Gera probabilidades para todos os exemplos"""
    all_probs = []
    all_labels = []
    
    with torch.no_grad():
        for batch in loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            probs = F.softmax(outputs.logits, dim=-1).cpu().numpy()
            all_probs.append(probs)
            
            if 'label' in batch:
                all_labels.extend(batch['label'].numpy())
    
    probs = np.vstack(all_probs)
    labels = np.array(all_labels) if all_labels else None
    return probs, labels

# Gerar probabilidades no validation set
val_probs, val_labels = get_predictions(val_loader)
print(f"Val probs shape: {val_probs.shape}")

# Baseline com argmax
baseline_preds = np.argmax(val_probs, axis=1)
baseline_f1 = f1_score(val_labels, baseline_preds, average='macro')
print(f"\nüìä Baseline (argmax): F1-Macro = {baseline_f1:.5f}")

In [None]:
# ===== THRESHOLD TUNING POR CLASSE =====

def find_optimal_thresholds(probs, labels, num_classes=7):
    """Encontra threshold √≥timo para cada classe"""
    thresholds = {}
    
    for c in range(num_classes):
        best_threshold = 0.5
        best_f1 = 0
        
        # Grid search de thresholds
        for thresh in np.arange(0.1, 0.9, 0.05):
            # Para cada threshold, prediz classe c se prob >= thresh
            preds = np.zeros(len(labels), dtype=int)
            
            for i in range(len(probs)):
                # Se a prob da classe c √© >= threshold, prediz c
                # Sen√£o, usa argmax
                if probs[i, c] >= thresh:
                    preds[i] = c
                else:
                    preds[i] = np.argmax(probs[i])
            
            f1 = f1_score(labels == c, preds == c, average='binary')
            if f1 > best_f1:
                best_f1 = f1
                best_threshold = thresh
        
        thresholds[c] = best_threshold
        print(f"  Classe {c}: threshold={best_threshold:.2f}, F1={best_f1:.4f}")
    
    return thresholds

print("\nüéØ Otimizando thresholds por classe:")
optimal_thresholds = find_optimal_thresholds(val_probs, val_labels)

In [None]:
# ===== APLICAR THRESHOLDS E AVALIAR =====

def apply_thresholds(probs, thresholds):
    """Aplica thresholds otimizados para predi√ß√£o"""
    preds = []
    
    for i in range(len(probs)):
        # Verifica se alguma classe passa do threshold (priorizando classes raras)
        # Ordem de prioridade: 6, 5, 4, 3, 2, 1, 0 (classes raras primeiro)
        pred = np.argmax(probs[i])  # default: argmax
        
        for c in [6, 5, 4, 3, 2, 1, 0]:
            if probs[i, c] >= thresholds[c]:
                # Se m√∫ltiplas classes passam, pega a com maior prob
                if probs[i, c] > probs[i, pred] * 0.8:  # margem de toler√¢ncia
                    pred = c
                    break
        
        preds.append(pred)
    
    return np.array(preds)

# Avaliar com thresholds
tuned_preds = apply_thresholds(val_probs, optimal_thresholds)
tuned_f1 = f1_score(val_labels, tuned_preds, average='macro')

print(f"\nüìä Compara√ß√£o:")
print(f"   Baseline (argmax):      F1-Macro = {baseline_f1:.5f}")
print(f"   Threshold tuning:       F1-Macro = {tuned_f1:.5f}")
print(f"   Melhoria:               {((tuned_f1 - baseline_f1) / baseline_f1 * 100):+.2f}%")

# Se threshold tuning n√£o melhorou, usar argmax
USE_THRESHOLDS = tuned_f1 > baseline_f1
print(f"\n{'‚úÖ Usando threshold tuning' if USE_THRESHOLDS else '‚ö†Ô∏è Usando argmax (threshold n√£o melhorou)'}")

In [None]:
# ===== GERAR SUBMISSION =====
print("\n[6/6] Gerando submission...")

# Gerar probabilidades no test set
test_probs, _ = get_predictions(test_loader)
print(f"Test probs shape: {test_probs.shape}")

# Aplicar thresholds ou argmax
if USE_THRESHOLDS:
    predictions = apply_thresholds(test_probs, optimal_thresholds)
else:
    predictions = np.argmax(test_probs, axis=1)

# Criar submission
submission = pd.DataFrame({
    ID_COL: test_df[ID_COL],
    LABEL_COL: predictions
})

submission.to_csv('submission.csv', index=False)
print(f"\n‚úÖ Submission salva: submission.csv")
print(submission.head(10))
print(f"\nDistribui√ß√£o das predi√ß√µes:")
print(pd.Series(predictions).value_counts().sort_index())

## Refer√™ncias

**Estrat√©gia baseada em:**
- Domain Adaptive Pretraining (MLM) com laudos m√©dicos
- Focal Loss para classes desbalanceadas
- Threshold tuning por classe (otimizado na valida√ß√£o)

**Fonte:** Experimentos no Google Colab (`colab/otimizacao_threshold_e_gamma.ipynb`)

**Resultados esperados:**
- Se threshold funcionar: ~0.80+ F1-Macro
- Se n√£o funcionar: usa argmax como fallback