# CardioIA - Fase 4: Visao Computacional

Pipeline base para pre-processamento e classificacao de imagens medicas simuladas.
- Dataset sugerido: NIH Chest X-Ray (Kaggle) ou outro ImageFolder.
- Inclui CNN simples (do zero) e esqueleto de transfer learning.
- Usa FakeData quando `USE_FAKE_DATA=True` para validar o fluxo sem dataset real.

**Importante:** coloque o dataset real em `../data/raw/<dataset>/train|val|test` (estrutura ImageFolder). Defina `cfg.use_fake_data=False` e `cfg.data_dir` para o caminho correto. Se o caminho nao existir, o notebook cai em `FakeData` apenas para smoke test.

In [None]:
import os
from pathlib import Path
import time
from dataclasses import dataclass
from typing import Tuple

import torch
from torch import nn, optim
from torch.utils.data import DataLoader
import torchvision
from torchvision import transforms, datasets, models

print("Torch version:", torch.__version__)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

## Config geral
Ajuste `data_dir` para o dataset real. Se ele nao existir, define `USE_FAKE_DATA=True` para rodar o fluxo end-to-end.

In [None]:
@dataclass
class CFG:
    data_dir: Path = Path("../data/raw/chest_xray")  # ajuste para seu dataset
    use_fake_data: bool = False  # mude para False quando o dataset real estiver baixado
    img_size: Tuple[int, int] = (224, 224)
    batch_size: int = 16
    num_workers: int = 0  # Windows + notebooks: deixe 0 para evitar problemas
    num_epochs: int = 3
    lr: float = 1e-3
    transfer_learning: bool = True
    fake_classes: int = 2
    fake_samples: int = 200


cfg = CFG()
cfg.data_dir = cfg.data_dir.resolve()
cfg

## Dataloaders
- Para dataset real, espera estrutura ImageFolder: `root/class_x/xxx.png`.
- FakeData habilita smoke test sem baixar nada.

In [None]:
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
base_transforms = transforms.Compose([
    transforms.Resize(cfg.img_size),
    transforms.ToTensor(),
    normalize,
])

if cfg.use_fake_data or not cfg.data_dir.exists():
    print("Usando FakeData (ajuste cfg.use_fake_data=False quando tiver dataset real).")
    fake_train = datasets.FakeData(
        size=cfg.fake_samples,
        image_size=(3, *cfg.img_size),
        num_classes=cfg.fake_classes,
        transform=base_transforms,
    )
    fake_val = datasets.FakeData(
        size=max(20, cfg.fake_samples // 5),
        image_size=(3, *cfg.img_size),
        num_classes=cfg.fake_classes,
        transform=base_transforms,
    )
    class_names = [str(i) for i in range(cfg.fake_classes)]
    num_classes = cfg.fake_classes
    train_loader = DataLoader(fake_train, batch_size=cfg.batch_size, shuffle=True)
    val_loader = DataLoader(fake_val, batch_size=cfg.batch_size, shuffle=False)
else:
    train_dir = cfg.data_dir / "train"
    val_dir = cfg.data_dir / "val"
    if not train_dir.exists() or not val_dir.exists():
        raise SystemExit("Crie pastas train/ e val/ dentro do dataset ou habilite FakeData.")

    train_ds = datasets.ImageFolder(train_dir, transform=base_transforms)
    val_ds = datasets.ImageFolder(val_dir, transform=base_transforms)
    class_names = train_ds.classes
    num_classes = len(class_names)
    train_loader = DataLoader(train_ds, batch_size=cfg.batch_size, shuffle=True, num_workers=cfg.num_workers)
    val_loader = DataLoader(val_ds, batch_size=cfg.batch_size, shuffle=False, num_workers=cfg.num_workers)

num_classes, class_names[:5]

## Modelo 1: CNN simples
Rede pequena para baseline. Ajuste canais/filtros conforme o dataset.

In [None]:
class SmallCNN(nn.Module):
    def __init__(self, num_classes: int):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool2d((1, 1)),
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128, 64),
            nn.ReLU(inplace=True),
            nn.Dropout(0.2),
            nn.Linear(64, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x


def accuracy(logits, targets):
    preds = torch.argmax(logits, dim=1)
    return (preds == targets).float().mean().item()


def train_one_epoch(model, loader, criterion, optimizer):
    model.train()
    total_loss, total_acc, total_samples = 0.0, 0.0, 0
    for images, labels in loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        logits = model(images)
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()
        batch_size = labels.size(0)
        total_loss += loss.item() * batch_size
        total_acc += accuracy(logits, labels) * batch_size
        total_samples += batch_size
    return total_loss / total_samples, total_acc / total_samples


def evaluate(model, loader, criterion):
    model.eval()
    total_loss, total_acc, total_samples = 0.0, 0.0, 0
    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            logits = model(images)
            loss = criterion(logits, labels)
            batch_size = labels.size(0)
            total_loss += loss.item() * batch_size
            total_acc += accuracy(logits, labels) * batch_size
            total_samples += batch_size
    return total_loss / total_samples, total_acc / total_samples


model = SmallCNN(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=cfg.lr)

history = []
start = time.time()
for epoch in range(cfg.num_epochs):
    train_loss, train_acc = train_one_epoch(model, train_loader, criterion, optimizer)
    val_loss, val_acc = evaluate(model, val_loader, criterion)
    history.append({"epoch": epoch + 1, "train_loss": train_loss, "train_acc": train_acc, "val_loss": val_loss, "val_acc": val_acc})
    print(f"Epoch {epoch+1}/{cfg.num_epochs} | train_loss={train_loss:.4f} acc={train_acc:.3f} | val_loss={val_loss:.4f} acc={val_acc:.3f}")
print(f"Tempo total: {time.time() - start:.1f}s")
history

## Modelo 2: Transfer Learning (ex.: ResNet18)
Congela os pesos do backbone e treina apenas o cabe√ßalho (head). Se nao quiser baixar pesos, passe `weights=None`.

In [None]:
if cfg.transfer_learning:
    weights = models.ResNet18_Weights.DEFAULT if hasattr(models, "ResNet18_Weights") else None
    backbone = models.resnet18(weights=weights)
    for param in backbone.parameters():
        param.requires_grad = False
    in_features = backbone.fc.in_features
    backbone.fc = nn.Linear(in_features, num_classes)
    backbone = backbone.to(device)

    tl_optimizer = optim.Adam(backbone.fc.parameters(), lr=1e-3)
    tl_criterion = nn.CrossEntropyLoss()

    tl_hist = []
    for epoch in range(cfg.num_epochs):
        train_loss, train_acc = train_one_epoch(backbone, train_loader, tl_criterion, tl_optimizer)
        val_loss, val_acc = evaluate(backbone, val_loader, tl_criterion)
        tl_hist.append({"epoch": epoch + 1, "train_loss": train_loss, "train_acc": train_acc, "val_loss": val_loss, "val_acc": val_acc})
        print(f"[TL] Epoch {epoch+1}/{cfg.num_epochs} | train_loss={train_loss:.4f} acc={train_acc:.3f} | val_loss={val_loss:.4f} acc={val_acc:.3f}")
    tl_hist

## Proximos passos
- Salvar metricas e matriz de confusao em `FASE4/reports/`.
- Exportar modelo (torch.save) para servir no Flask/app.
- Adicionar notebook ou script de fairness (IR ALEMA 1).
- Integrar com app mobile (IR ALEMA 2) consumindo um endpoint simples.