<a href="https://colab.research.google.com/github/IzadoraSC/hackathon_workcap_2025/blob/main/Hackathon_Worcap_2025_Team_GT_BR.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#HACKATHON - WORCAP - 2025

---
Team: GT-BR

Integrantes: Alejandro Lopez, Izadora S. de Carvalho, Joaquim AJR

Capitão: Alejandro Lopez

Repositório do Projeto: [GitHub](https://github.com/IzadoraSC/hackathon_workcap_2025)

Kaggle: [Hackaton WorCap 2025](https://www.kaggle.com/competitions/worcap-2025)

#U-Net Siamesa com módulos de atenção scSE


O objetivo principal deste notebook é implementar uma rede neural U-Net do tipo siamesa com módulos de atenção, a fim de focar tanto nas regiões corretas (espaço) quanto nas características corretas (canais), o que, segundo Murari et al. (2023), mostra-se especialmente útil em tarefas de segmentação.

#Instalar pacotes

In [None]:
!pip install rasterio -q

Collecting rasterio
  Downloading rasterio-1.4.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.1 kB)
Collecting affine (from rasterio)
  Downloading affine-2.4.0-py3-none-any.whl.metadata (4.0 kB)
Collecting cligj>=0.5 (from rasterio)
  Downloading cligj-0.7.2-py3-none-any.whl.metadata (5.0 kB)
Collecting click-plugins (from rasterio)
  Downloading click_plugins-1.1.1.2-py2.py3-none-any.whl.metadata (6.5 kB)
Downloading rasterio-1.4.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (22.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m22.3/22.3 MB[0m [31m74.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading cligj-0.7.2-py3-none-any.whl (7.1 kB)
Downloading affine-2.4.0-py3-none-any.whl (15 kB)
Downloading click_plugins-1.1.1.2-py2.py3-none-any.whl (11 kB)
Installing collected packages: cligj, click-plugins, affine, rasterio
Successfully installed affine-2.4.0 click-plugins-1.1.1.2 cligj-0.7.2 rasterio-1.4.3


#Importar pacotes

In [None]:
import os, random, math, csv
from pathlib import Path
import numpy as np
import rasterio
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.modules.padding import ReplicationPad2d
from torch.utils.data import Dataset, DataLoader, Subset
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import rasterio
import matplotlib.pyplot as plt
import re
from contextlib import ExitStack
import pandas as pd

# Montar Google Drive
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


#Data Loader



## Reprodutibilidade

In [None]:
# -------------------------
# Reprodutibilidade e cuDNN
# -------------------------

#Definimos uma função set_seed para fixar a seed aleatória (padrão 42).
#Isso garante que operações que envolvem aleatoriedade (embaralhar dataset, inicializar pesos, augmentações, etc.) sejam reprodutíveis entre execuções.

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
set_seed(42)

torch.backends.cudnn.benchmark = True  # Ativa um otimizador interno do cuDNN (biblioteca da NVIDIA usada pelo PyTorch para convoluções). Acelera convs para shapes fixos.


#As duas linhas abaixo definem se o treinamento será feito na GPU (cuda) ou na CPU, dependendo do que está disponível e imprimem o dispositivo.

#Em seguida, imprime o dispositivo escolhido.

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)


Device: cuda


## Dataset bitemporal

In [None]:
# =========================
# Dataset bitemporal (T1, T2, máscara 0/1)
#Garantimos que cada par de imagens esteja alinhado com sua máscara e pronto para ser passado ao modelo.
# =========================
class SiameseDataset(Dataset):
    """
    Espera nomes 'recorte_*.tif' em T10_dir, T20_dir e mask_dir.
    Lê todas as bandas (C,H,W), normaliza cada tile para [0,1] via min-max local,
    e retorna tensores float32: t1, t2, mask(1,H,W).

    Se 'ids' for fornecido, usa apenas esses IDs; caso contrário, usa todos os IDs comuns.
    """
    def __init__(self, T10_dir, T20_dir, mask_dir, transform=None, ids=None):   # possui a responsabilidade de criar o objeto da classe SiameseDataset. Nela será contida todas as informações principais do objeto.
        self.T10_dir  = str(T10_dir)    #garantimos que os caminhos são string
        self.T20_dir  = str(T20_dir)
        self.mask_dir = str(mask_dir)
        self.transform = transform

        def recortes(d):    #Lista todos os arquivos .tif começando com recorte_. Retorna um conjunto {…} de nomes. Além disso, calculamos intersecção entre os nomes das trÊs pastas.

            return {n for n in os.listdir(d) if n.startswith("recorte_") and n.endswith(".tif")}
        common = recortes(self.T10_dir) & recortes(self.T20_dir) & recortes(self.mask_dir)
        if not common:
            raise RuntimeError("Nenhum 'recorte_*.tif' comum encontrado nos três diretórios.")


        ids_all = sorted([n.split('_', 1)[1].replace('.tif', '') for n in common])   #Extrai só o ID de cada arquivo (ex: recorte_123.tif → 123).Ordena em ordem crescente.
        if ids is None:
            self.ids = ids_all
        else:
            ids = set(ids)
            missing = ids - set(ids_all)
            if missing:
                raise ValueError(f"IDs não encontrados nos diretórios: {sorted(list(missing))[:5]} ...")
            self.ids = sorted(list(ids))

    def __len__(self):    #Retorna o número de amostras.
        return len(self.ids)



    @staticmethod
    def _read_image(path):    #Lê todas as bandas em formato (C, H, W). Retorna tensor torch.float32.
        with rasterio.open(path) as src:
            img = src.read().astype(np.float32)  # (C,H,W)
            img = np.nan_to_num(img, nan=0.0)     # Substitui NaN por 0.0. Normaliza min-max para [0, 1] por tile
            mn, mx = img.min(), img.max()
            if mx > mn:
                img = (img - mn) / (mx - mn)
            else:
                img = np.zeros_like(img, dtype=np.float32)   #Se todos os valores forem iguais, gera um array de zeros.
        return torch.from_numpy(img)  # (C,H,W) float32



    @staticmethod
    def _read_mask(path):  #Lê a mascara.
        with rasterio.open(path) as src:
            m = src.read(1).astype(np.float32)  # (H,W)
            m = np.nan_to_num(m, nan=0.0)      # Substitui NaN por 0.0.
            m = (m > 0).astype(np.float32)      #mascara binaria False=0, True=1
        return torch.from_numpy(m).unsqueeze(0)  # (1,H,W)


    def __getitem__(self, idx):  #Recupera o ID pelo índice.
        id_ = self.ids[idx]
        fname = f"recorte_{id_}.tif" #Monta o nome do arquivo.
        t1 = self._read_image(os.path.join(self.T10_dir,  fname))    #Lê t1, t2 e mask.
        t2 = self._read_image(os.path.join(self.T20_dir,  fname))
        m  = self._read_mask (os.path.join(self.mask_dir, fname))

        if self.transform is not None:      #Se houver transformações (JointAugment, por exemplo), aplica de forma sincronizada.
            t1, t2, m = self.transform(t1, t2, m)

        return t1, t2, m  #Retorna um triplo: (t1, t2, mask).

##Augmentation

In [None]:
# =========================
# Augment conjunto (sincronizado)
# =========================

#A ideia é aumentar a variabilidade do dataset para que o modelo não fique “decorando” posições fixas.

#Como trabalhamos com pares de imagens (antes e depois) + máscara, é essencial que a mesma transformação seja aplicada aos três ao mesmo tempo, mantendo alinhamento pixel a pixel.

class JointAugment:
    """Flips e rotações de 90° sincronizadas para t1, t2, mask."""
    def __call__(self, t1, t2, mask):
        # flip H
        if torch.rand(1).item() < 0.5:
            t1 = t1.flip(-1); t2 = t2.flip(-1); mask = mask.flip(-1)    #espelhamento horizontal aplicado em t1, t2 e mask
        # flip V
        if torch.rand(1).item() < 0.5:
            t1 = t1.flip(-2); t2 = t2.flip(-2); mask = mask.flip(-2)   # espelhamento vertical


        # geramos um número aleatorio de 0 a 3, sendo que cada numero representa a rotação (0/90/180/270) que será aplicada no dataset
        k = torch.randint(0, 4, (1,)).item()
        if k > 0:
            t1 = torch.rot90(t1, k, dims=(-2, -1))
            t2 = torch.rot90(t2, k, dims=(-2, -1))
            mask = torch.rot90(mask, k, dims=(-2, -1))
        return t1, t2, mask

#Construção do módulo de atenção

In [None]:
# =========================
# ATENÇÃO scSE (cSE + sSE)
#Concurrent Spatial and Channel 'Squeeze & Excitation' (Compactação e Excitação Espacial e de Canais Concorrentes)
# =========================

# Este módulo de atenção, Segundo Ngoc et al. (2024), melhoram a precisão ao concentrar-se em áreas ricas em informação, em vez de processar a imagem inteira.

# A combinação dos modulos cSE e sSE potencializa a capacidade de modulação de características do bloco, recalibrando de forma eficaz tanto a informação espacial quanto a de canais dentro da rede.


#O cSE responde “o que olhar?” (features/canais).


class cSE(nn.Module):
    """Channel Squeeze & Excitation."""
    def __init__(self, c, r=8):
        super().__init__()
        c_mid = max(c // r, 1)
        self.avg = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Conv2d(c, c_mid, 1, bias=True)
        self.fc2 = nn.Conv2d(c_mid, c, 1, bias=True)
    def forward(self, x):
        w = self.avg(x)
        w = F.relu(self.fc1(w), inplace=True)
        w = torch.sigmoid(self.fc2(w))
        return x * w


#O sSE responde “onde olhar?” (posições na imagem).



class sSE(nn.Module):
    """Spatial Squeeze & Excitation."""
    def __init__(self, c):
        super().__init__()
        self.conv = nn.Conv2d(c, 1, kernel_size=1, bias=True)
    def forward(self, x):
        w = torch.sigmoid(self.conv(x))
        return x * w


#O scSE junta os dois para responder: “quais canais são importantes e em quais posições eles importam mais?”


class scSE(nn.Module):
    """Concurrent Spatial & Channel SE (soma das atenções)."""
    def __init__(self, c, r=8):
        super().__init__()
        self.cse = cSE(c, r=r)
        self.sse = sSE(c)
    def forward(self, x):
        return self.cse(x) + self.sse(x)




#Modelo

##SiamUnet Diff + scSE

In [None]:
# =========================
# Modelo: SiamUnet_diff + scSE
# =========================

class SiamUnet_diff(nn.Module):
    def __init__(self, n_channels, n_classes=1, enable_attention=True, attn_reduction=8):
        super(SiamUnet_diff, self).__init__()
        self.enable_attention = enable_attention

        # --------------------------
        # ENCODER (compartilhado)
        # Cada "stage" dobra canais e reduz H,W pela metade via max-pool.
        # Dropout2d leve em cada bloco para regularizar.
        # --------------------------

        # Encoder (compartilhado)
        self.conv11 = nn.Conv2d(n_channels, 16, kernel_size=3, padding=1)
        self.bn11 = nn.BatchNorm2d(16); self.do11 = nn.Dropout2d(p=0.2)
        self.conv12 = nn.Conv2d(16, 16, kernel_size=3, padding=1)
        self.bn12 = nn.BatchNorm2d(16); self.do12 = nn.Dropout2d(p=0.2)

        self.conv21 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.bn21 = nn.BatchNorm2d(32); self.do21 = nn.Dropout2d(p=0.2)
        self.conv22 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.bn22 = nn.BatchNorm2d(32); self.do22 = nn.Dropout2d(p=0.2)

        self.conv31 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn31 = nn.BatchNorm2d(64); self.do31 = nn.Dropout2d(p=0.2)
        self.conv32 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.bn32 = nn.BatchNorm2d(64); self.do32 = nn.Dropout2d(p=0.2)
        self.conv33 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.bn33 = nn.BatchNorm2d(64); self.do33 = nn.Dropout2d(p=0.2)

        self.conv41 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn41 = nn.BatchNorm2d(128); self.do41 = nn.Dropout2d(p=0.2)
        self.conv42 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        self.bn42 = nn.BatchNorm2d(128); self.do42 = nn.Dropout2d(p=0.2)
        self.conv43 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        self.bn43 = nn.BatchNorm2d(128); self.do43 = nn.Dropout2d(p=0.2)

        # --------------------------
        # Atenção scSE por escala e por tempo
        # (assume classe scSE(channels, r) definida na celda acima
        # Se desativado, usa nn.Identity() para custo zero.
        # --------------------------

        # scSE em cada escala
        if enable_attention:
            self.att1_T1 = scSE(16, r=attn_reduction)
            self.att1_T2 = scSE(16, r=attn_reduction)
            self.att2_T1 = scSE(32, r=attn_reduction)
            self.att2_T2 = scSE(32, r=attn_reduction)
            self.att3_T1 = scSE(64, r=attn_reduction)
            self.att3_T2 = scSE(64, r=attn_reduction)
            self.att4_T1 = scSE(128, r=attn_reduction)
            self.att4_T2 = scSE(128, r=attn_reduction)
        else:
            self.att1_T1 = nn.Identity(); self.att1_T2 = nn.Identity()
            self.att2_T1 = nn.Identity(); self.att2_T2 = nn.Identity()
            self.att3_T1 = nn.Identity(); self.att3_T2 = nn.Identity()
            self.att4_T1 = nn.Identity(); self.att4_T2 = nn.Identity()


        # --------------------------
        # DECODER
        # Estratégia:
        # (i) upsample com ConvTranspose2d (stride=2 nos "upconv*"),
        # (ii) pad para igualar a dimensão da "skip",
        # (iii) concatenação com |skip_T1 - skip_T2|,
        # (iv) "conv transpose" com stride=1 (atua como conv 3x3) + BN + ReLU + Dropout.
        # --------------------------

        # Decoder
        self.upconv4 = nn.ConvTranspose2d(128, 128, kernel_size=3, padding=1, stride=2, output_padding=1)
        self.conv43d = nn.ConvTranspose2d(256, 128, kernel_size=3, padding=1)
        self.bn43d = nn.BatchNorm2d(128); self.do43d = nn.Dropout2d(p=0.2)
        self.conv42d = nn.ConvTranspose2d(128, 128, kernel_size=3, padding=1)
        self.bn42d = nn.BatchNorm2d(128); self.do42d = nn.Dropout2d(p=0.2)
        self.conv41d = nn.ConvTranspose2d(128, 64, kernel_size=3, padding=1)
        self.bn41d = nn.BatchNorm2d(64); self.do41d = nn.Dropout2d(p=0.2)

        self.upconv3 = nn.ConvTranspose2d(64, 64, kernel_size=3, padding=1, stride=2, output_padding=1)
        self.conv33d = nn.ConvTranspose2d(128, 64, kernel_size=3, padding=1)
        self.bn33d = nn.BatchNorm2d(64); self.do33d = nn.Dropout2d(p=0.2)
        self.conv32d = nn.ConvTranspose2d(64, 64, kernel_size=3, padding=1)
        self.bn32d = nn.BatchNorm2d(64); self.do32d = nn.Dropout2d(p=0.2)
        self.conv31d = nn.ConvTranspose2d(64, 32, kernel_size=3, padding=1)
        self.bn31d = nn.BatchNorm2d(32); self.do31d = nn.Dropout2d(p=0.2)

        self.upconv2 = nn.ConvTranspose2d(32, 32, kernel_size=3, padding=1, stride=2, output_padding=1)
        self.conv22d = nn.ConvTranspose2d(64, 32, kernel_size=3, padding=1)
        self.bn22d = nn.BatchNorm2d(32); self.do22d = nn.Dropout2d(p=0.2)
        self.conv21d = nn.ConvTranspose2d(32, 16, kernel_size=3, padding=1)
        self.bn21d = nn.BatchNorm2d(16); self.do21d = nn.Dropout2d(p=0.2)

        self.upconv1 = nn.ConvTranspose2d(16, 16, kernel_size=3, padding=1, stride=2, output_padding=1)
        self.conv12d = nn.ConvTranspose2d(32, 16, kernel_size=3, padding=1)
        self.bn12d = nn.BatchNorm2d(16); self.do12d = nn.Dropout2d(p=0.2)
        self.conv11d = nn.ConvTranspose2d(16, n_classes, kernel_size=3, padding=1)  # n_classes=1

    def _encode_once(self, x):

        """Passa uma imagem (T1 ou T2) por um encoder.
        Retorna features por escala + o "bottleneck" depois do último pool.
        """
        # Stage 1 (C=16, H/2)
        x11 = self.do11(F.relu(self.bn11(self.conv11(x))))
        x12 = self.do12(F.relu(self.bn12(self.conv12(x11))))
        x1p = F.max_pool2d(x12, kernel_size=2, stride=2)

        # Stage 2 (C=32, H/4)
        x21 = self.do21(F.relu(self.bn21(self.conv21(x1p))))
        x22 = self.do22(F.relu(self.bn22(self.conv22(x21))))
        x2p = F.max_pool2d(x22, kernel_size=2, stride=2)

        # Stage 3 (C=64, H/8)
        x31 = self.do31(F.relu(self.bn31(self.conv31(x2p))))
        x32 = self.do32(F.relu(self.bn32(self.conv32(x31))))
        x33 = self.do33(F.relu(self.bn33(self.conv33(x32))))
        x3p = F.max_pool2d(x33, kernel_size=2, stride=2)

        # Stage 4 (C=128, H/16)
        x41 = self.do41(F.relu(self.bn41(self.conv41(x3p))))
        x42 = self.do42(F.relu(self.bn42(self.conv42(x41))))
        x43 = self.do43(F.relu(self.bn43(self.conv43(x42))))
        x4p = F.max_pool2d(x43, kernel_size=2, stride=2)
        return (x12, x22, x33, x43, x4p)

    def forward(self, x1, x2):

        # --------------------------
        # Encoders siameses (mesmos pesos)
        # --------------------------

        # Encoders (pesos compartilhados)
        x12_1, x22_1, x33_1, x43_1, _    = self._encode_once(x1)
        x12_2, x22_2, x33_2, x43_2, x4p  = self._encode_once(x2)

        # Aplicar scSE antes de calcular diferenças
        if self.enable_attention:
            x12_1 = self.att1_T1(x12_1); x12_2 = self.att1_T2(x12_2)
            x22_1 = self.att2_T1(x22_1); x22_2 = self.att2_T2(x22_2)
            x33_1 = self.att3_T1(x33_1); x33_2 = self.att3_T2(x33_2)
            x43_1 = self.att4_T1(x43_1); x43_2 = self.att4_T2(x43_2)

        # Decoder 4d
        x4d = self.upconv4(x4p)
        pad4 = nn.ReplicationPad2d((0, x43_1.size(3) - x4d.size(3), 0, x43_1.size(2) - x4d.size(2)))
        x4d = torch.cat((pad4(x4d), torch.abs(x43_1 - x43_2)), 1)  # 256
        x43d = self.do43d(F.relu(self.bn43d(self.conv43d(x4d))))
        x42d = self.do42d(F.relu(self.bn42d(self.conv42d(x43d))))
        x41d = self.do41d(F.relu(self.bn41d(self.conv41d(x42d))))

        # Decoder 3d
        x3d = self.upconv3(x41d)
        pad3 = nn.ReplicationPad2d((0, x33_1.size(3) - x3d.size(3), 0, x33_1.size(2) - x3d.size(2)))
        x3d = torch.cat((pad3(x3d), torch.abs(x33_1 - x33_2)), 1)  # 128
        x33d = self.do33d(F.relu(self.bn33d(self.conv33d(x3d))))
        x32d = self.do32d(F.relu(self.bn32d(self.conv32d(x33d))))
        x31d = self.do31d(F.relu(self.bn31d(self.conv31d(x32d))))

        # Decoder 2d
        x2d = self.upconv2(x31d)
        pad2 = nn.ReplicationPad2d((0, x22_1.size(3) - x2d.size(3), 0, x22_1.size(2) - x2d.size(2)))
        x2d = torch.cat((pad2(x2d), torch.abs(x22_1 - x22_2)), 1)  # 64
        x22d = self.do22d(F.relu(self.bn22d(self.conv22d(x2d))))
        x21d = self.do21d(F.relu(self.bn21d(self.conv21d(x22d))))

        # Decoder 1d
        x1d = self.upconv1(x21d)
        pad1 = nn.ReplicationPad2d((0, x12_1.size(3) - x1d.size(3), 0, x12_1.size(2) - x1d.size(2)))
        x1d = torch.cat((pad1(x1d), torch.abs(x12_1 - x12_2)), 1)  # 32
        x12d = self.do12d(F.relu(self.bn12d(self.conv12d(x1d))))
        x11d = self.conv11d(x12d)
        return x11d  # logits (B,1,H,W)


##Losses e Métricas

In [None]:
# =========================
# Losses & Métricas
# =========================
def dice_loss_from_logits(logits, targets, eps=1e-6):
    probs = torch.sigmoid(logits)
    num = 2.0 * (probs * targets).sum(dim=(2, 3)) + eps
    den = probs.sum(dim=(2, 3)) + targets.sum(dim=(2, 3)) + eps
    return 1.0 - (num / den).mean()

def tversky_loss_from_logits(logits, targets, alpha=0.5, beta=0.5, eps=1e-6):
    probs = torch.sigmoid(logits)
    tp = (probs * targets).sum(dim=(2,3))
    fp = (probs * (1 - targets)).sum(dim=(2,3))
    fn = ((1 - probs) * targets).sum(dim=(2,3))
    tversky = (tp + eps) / (tp + alpha*fp + beta*fn + eps)
    return (1 - tversky).mean()

class ComboLoss:
    """BCEWithLogits(pos_weight) + Dice (e opcional Tversky)."""
    def __init__(self, bce_weight=0.5, pos_weight=None, use_tversky=False, tv_alpha=0.5, tv_beta=0.5):
        self.bce_weight = bce_weight
        self.pos_weight = pos_weight
        self.use_tversky = use_tversky
        self.tv_alpha = tv_alpha
        self.tv_beta = tv_beta
    def __call__(self, logits, targets):
        bce = F.binary_cross_entropy_with_logits(
            logits, targets, pos_weight=self.pos_weight
        )
        if self.use_tversky:
            aux = tversky_loss_from_logits(logits, targets, alpha=self.tv_alpha, beta=self.tv_beta)
        else:
            aux = dice_loss_from_logits(logits, targets)
        return self.bce_weight * bce + (1.0 - self.bce_weight) * aux

@torch.no_grad()
def iou_from_logits(logits, targets, thr=0.5, eps=1e-6):
    probs = torch.sigmoid(logits)
    preds = (probs > thr).float()
    inter = (preds * targets).sum(dim=(2, 3))
    union = preds.sum(dim=(2, 3)) + targets.sum(dim=(2, 3)) - inter + eps
    return ((inter + eps) / union).mean().item()

@torch.no_grad()
def dice_from_logits(logits, targets, thr=0.5, eps=1e-6):
    probs = torch.sigmoid(logits)
    preds = (probs > thr).float()
    num = 2.0 * (preds * targets).sum(dim=(2, 3)) + eps
    den = preds.sum(dim=(2, 3)) + targets.sum(dim=(2, 3)) + eps
    return (num / den).mean().item()

@torch.no_grad()
def roc_auc_from_loader(model, loader, max_points=50):
    """Calcula ROC AUC agregando todo o val (sem sklearn)."""
    model.eval()
    all_probs = []
    all_true  = []
    for t1, t2, y in loader:
        t1, t2 = t1.to(device), t2.to(device)
        with torch.amp.autocast('cuda', enabled=(device.type=='cuda')):
            logits = model(t1, t2)
            probs = torch.sigmoid(logits).detach().cpu().numpy()  # (B,1,H,W)
        all_probs.append(probs.reshape(-1))
        all_true.append(y.numpy().reshape(-1))
    p = np.concatenate(all_probs)
    g = np.concatenate(all_true).astype(np.uint8)
    # evita casos degenerados
    if g.max() == g.min():
        return float('nan'), 0.5  # sem positivos/negativos → AUC indefinido
    # Pontos de threshold uniformes em [0,1]
    ths = np.linspace(0, 1, max_points)
    tpr = []; fpr = []; best_thr = 0.5; best_j = -1.0
    P = (g == 1).sum(); N = (g == 0).sum()
    for th in ths:
        pred = (p >= th).astype(np.uint8)
        TP = np.logical_and(pred==1, g==1).sum()
        FP = np.logical_and(pred==1, g==0).sum()
        TN = np.logical_and(pred==0, g==0).sum()
        FN = np.logical_and(pred==0, g==1).sum()
        TPR = TP / max(P,1); FPR = FP / max(N,1)
        tpr.append(TPR); fpr.append(FPR)
        J = TPR - FPR
        if J > best_j:
            best_j = J; best_thr = float(th)
    # AUC pelo método do trapézio (ordenar por FPR)
    idx = np.argsort(fpr)
    auc = np.trapz(np.array(tpr)[idx], np.array(fpr)[idx])
    return float(auc), float(best_thr)


## Split automático

In [None]:
# =========================
# Split automático (estratificado por presença de mudança)
# =========================

def list_common_ids(T10_dir, T20_dir, mask_dir):
    # função auxiliar que lista arquivos do diretório 'd' cujo nome
    # começa com "recorte_" e termina com ".tif"; retorna um CONJUNTO (set)
    def rec(d): return {n for n in os.listdir(d) if n.startswith("recorte_") and n.endswith(".tif")}
    # interseção dos três conjuntos -> só os nomes que existem nas 3 pastas (T1, T2, máscara)
    common = rec(T10_dir) & rec(T20_dir) & rec(mask_dir)
    # para cada nome "recorte_<ID>.tif", extrai "<ID>" (tudo após o 1º "_") e remove ".tif";
    # devolve IDs ORDENADOS (list)
    return sorted([n.split('_', 1)[1].replace('.tif', '') for n in common])

def has_positive_change(mask_path):
    # abre a máscara no caminho 'mask_path' com rasterio (somente leitura)
    with rasterio.open(mask_path) as src:
        # lê a banda 1 (array 2D de inteiros/float)
        arr = src.read(1)
        # True se existe pelo menos UM pixel > 0; False caso contrário
        return bool((arr > 0).any())

def stratified_ids_by_mask(mask_dir, ids, val_ratio=0.2, seed=42):
    # RNG com semente fixa para reprodutibilidade do shuffle
    rng = random.Random(seed)
    pos, neg = [], []
    # varre todos os IDs candidate e separa em positivos/negativos
    for id_ in ids:
        mpath = os.path.join(mask_dir, f"recorte_{id_}.tif")
        if has_positive_change(mpath):
            pos.append(id_)
        else:
            neg.append(id_)

    # tamanho total e quantos vão para validação
    n_total = len(ids)
    n_val = max(1, int(round(n_total * val_ratio)))  # garante pelo menos 1

    # caso "normal": há tanto positivos quanto negativos
    if pos and neg:
        # fração de positivos no conjunto completo
        frac_pos = len(pos) / max(1, n_total)
        # quantos positivos vão para val (proporcional à fração, mas com limites)
        n_val_pos = min(len(pos), max(1, int(round(n_val * frac_pos))))
        # o resto das vagas vai para negativos
        n_val_neg = max(0, n_val - n_val_pos)
        # embaralha as listas para amostrar sem viés
        rng.shuffle(pos); rng.shuffle(neg)
        # escolhe os primeiros n_val_pos/n_val_neg para compor a validação
        val_ids = set(pos[:n_val_pos] + neg[:n_val_neg])
        # treino é o complemento (tudo que não foi para val)
        train_ids = [i for i in ids if i not in val_ids]
        # retorna:
        # - train_ids (lista)
        # - val_ids ordenados (lista)
        # - pos, neg (listas de TODOS os ids positivos/negativos, não só do treino)
        return train_ids, sorted(list(val_ids)), pos, neg

    # caso de borda: só há positivos OU só há negativos
    ids_copy = ids[:]
    rng.shuffle(ids_copy)
    # escolhe n_val IDs aleatórios para validação
    val_ids = set(ids_copy[:n_val])
    # treino é o restante
    train_ids = [i for i in ids if i not in val_ids]
    # recalcula pos/neg **apenas no treino** (já que só existe uma classe no total)
    pos_train, neg_train = [], []
    for id_ in train_ids:
        mpath = os.path.join(mask_dir, f"recorte_{id_}.tif")
        (pos_train if has_positive_change(mpath) else neg_train).append(id_)
    # retorna:
    # - train_ids (lista)
    # - val_ids ordenados (lista)
    # - pos_train, neg_train (listas filtradas do treino)
    return train_ids, sorted(list(val_ids)), pos_train, neg_train

# =========================
# Utilidades de balanceamento e pos_weight
# =========================
def compute_pixel_pos_weight(mask_dir, ids, clamp_max=None, use_sqrt=False):
    """
    Calcula pos_weight para BCEWithLogits: (negativos / positivos) em nível de PIXEL.
    - ids: lista de tiles do CONJUNTO DE TREINO.
    - clamp_max: se informado, faz clamp do pos_weight para evitar explosões (ex.: 100).
    - use_sqrt: se True, usa sqrt(neg/pos) em vez de (neg/pos) para suavizar.
    """
    pos = 0; neg = 0
    for id_ in ids:
        p = os.path.join(mask_dir, f"recorte_{id_}.tif")
        with rasterio.open(p) as src:
            m = src.read(1)                        # lê a banda da máscara
            m_bin = (m > 0).astype(np.uint8)      # assume 0 = neg, >0 = pos
            pos += int(m_bin.sum())               # total de pixels positivos
            neg += int(m_bin.size - m_bin.sum())  # total de pixels negativos

    pos = max(pos, 1)                             # evita div/0 se não houver positivos
    ratio = neg / pos
    if use_sqrt:
        ratio = ratio**0.5                        # suaviza o peso dos positivos

    if clamp_max is not None:
        ratio = min(ratio, clamp_max)             # limita peso máximo para estabilidade

    # BCEWithLogitsLoss espera um tensor no device correto; shape [1] funciona
    return torch.tensor([ratio], dtype=torch.float32, device=device)


def make_weighted_sampler(train_ids, pos_ids, neg_ids, frac_pos=0.7):
    """
    Oversampling em nível de TILE:
    - Atribui mais peso para tiles com mudança (pos_ids).
    - frac_pos: fração desejada de amostras positivas por época (ex.: 0.7 = 70%).
    """
    pos_set = set(pos_ids)
    neg_set = set(neg_ids)

    # Se faltar alguma classe, caia para “sem sampler” (ou 100% da classe disponível).
    if len(pos_set) == 0 and len(neg_set) == 0:
        # Nada a amostrar – devolve um sampler trivial
        return WeightedRandomSampler([1.0]*len(train_ids), num_samples=len(train_ids), replacement=True)
    if len(pos_set) == 0:
        # só negativos disponíveis
        w_neg = 1.0 / max(len(neg_set), 1)
        weights = [w_neg for _ in train_ids]
        return WeightedRandomSampler(weights, num_samples=len(train_ids), replacement=True)
    if len(neg_set) == 0:
        # só positivos disponíveis
        w_pos = 1.0 / max(len(pos_set), 1)
        weights = [w_pos for _ in train_ids]
        return WeightedRandomSampler(weights, num_samples=len(train_ids), replacement=True)

    # pesos proporcionais ao alvo de fração de positivos
    w_pos = frac_pos / max(len(pos_set), 1)
    w_neg = (1.0 - frac_pos) / max(len(neg_set), 1)

    weights = [w_pos if i in pos_set else w_neg for i in train_ids]
    return WeightedRandomSampler(weights, num_samples=len(train_ids), replacement=True)
# =========================
# Utilidades de balanceamento e pos_weight
# =========================
def compute_pixel_pos_weight(mask_dir, ids, clamp_max=None, use_sqrt=False):
    """
    Calcula pos_weight para BCEWithLogits: (negativos / positivos) em nível de PIXEL.
    - ids: lista de tiles do CONJUNTO DE TREINO.
    - clamp_max: se informado, faz clamp do pos_weight para evitar explosões (ex.: 100).
    - use_sqrt: se True, usa sqrt(neg/pos) em vez de (neg/pos) para suavizar.
    """
    pos = 0; neg = 0
    for id_ in ids:
        p = os.path.join(mask_dir, f"recorte_{id_}.tif")
        with rasterio.open(p) as src:
            m = src.read(1)                        # lê a banda da máscara
            m_bin = (m > 0).astype(np.uint8)      # assume 0 = neg, >0 = pos
            pos += int(m_bin.sum())               # total de pixels positivos
            neg += int(m_bin.size - m_bin.sum())  # total de pixels negativos

    pos = max(pos, 1)                             # evita div/0 se não houver positivos
    ratio = neg / pos
    if use_sqrt:
        ratio = ratio**0.5                        # suaviza o peso dos positivos

    if clamp_max is not None:
        ratio = min(ratio, clamp_max)             # limita peso máximo para estabilidade

    # BCEWithLogitsLoss espera um tensor no device correto; shape [1] funciona
    return torch.tensor([ratio], dtype=torch.float32, device=device)


def make_weighted_sampler(train_ids, pos_ids, neg_ids, frac_pos=0.7):
    """
    Oversampling em nível de TILE:
    - Atribui mais peso para tiles com mudança (pos_ids).
    - frac_pos: fração desejada de amostras positivas por época (ex.: 0.7 = 70%).
    """
    pos_set = set(pos_ids)
    neg_set = set(neg_ids)


    if len(pos_set) == 0 and len(neg_set) == 0:
        # Nada a amostrar – devolve um sampler trivial
        return WeightedRandomSampler([1.0]*len(train_ids), num_samples=len(train_ids), replacement=True)
    if len(pos_set) == 0:
        # só negativos disponíveis
        w_neg = 1.0 / max(len(neg_set), 1)
        weights = [w_neg for _ in train_ids]
        return WeightedRandomSampler(weights, num_samples=len(train_ids), replacement=True)
    if len(neg_set) == 0:
        # só positivos disponíveis
        w_pos = 1.0 / max(len(pos_set), 1)
        weights = [w_pos for _ in train_ids]
        return WeightedRandomSampler(weights, num_samples=len(train_ids), replacement=True)

    # pesos proporcionais ao alvo de fração de positivos
    w_pos = frac_pos / max(len(pos_set), 1)
    w_neg = (1.0 - frac_pos) / max(len(neg_set), 1)

    weights = [w_pos if i in pos_set else w_neg for i in train_ids]
    return WeightedRandomSampler(weights, num_samples=len(train_ids), replacement=True)


## Treino e Validação

In [None]:
# =========================
# Treino / Validação
# =========================
def train_one_epoch(model, loader, optimizer, criterion, scaler=None, max_norm=1.0):
    model.train()
    total_loss, total_iou, n = 0.0, 0.0, 0
    for t1, t2, y in loader:
        t1, t2, y = t1.to(device, non_blocking=True), t2.to(device, non_blocking=True), y.to(device, non_blocking=True)
        optimizer.zero_grad(set_to_none=True)
        with torch.amp.autocast('cuda', enabled=(device.type=='cuda')):
            logits = model(t1, t2)
            loss = criterion(logits, y)
        if scaler is not None and device.type == "cuda":
            scaler.scale(loss).backward()
            # grad clipping
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
            scaler.step(optimizer)
            scaler.update()
        else:
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
            optimizer.step()
        bs = t1.size(0)
        total_loss += loss.item() * bs
        total_iou  += iou_from_logits(logits, y) * bs
        n += bs
    return total_loss / n, total_iou / n

@torch.no_grad()
def evaluate(model, loader, criterion, use_best_thr=False, last_best_thr=0.5):
    model.eval()
    total_loss, total_iou, total_dice, n = 0.0, 0.0, 0.0, 0
    # calcula ROC AUC e threshold ótimo numa passada separada ao final
    all_logits = []
    all_targets = []
    for t1, t2, y in loader:
        t1, t2, y = t1.to(device, non_blocking=True), t2.to(device, non_blocking=True), y.to(device, non_blocking=True)
        with torch.amp.autocast('cuda', enabled=(device.type=='cuda')):
            logits = model(t1, t2)
            loss = criterion(logits, y)
        bs = t1.size(0)
        total_loss += loss.item() * bs
        total_iou  += iou_from_logits(logits, y, thr=last_best_thr if use_best_thr else 0.5) * bs
        total_dice += dice_from_logits(logits, y,  thr=last_best_thr if use_best_thr else 0.5) * bs
        n += bs
        all_logits.append(logits.detach().cpu())
        all_targets.append(y.detach().cpu())

    # ROC AUC + best threshold (Youden J)
    logits_cat  = torch.cat(all_logits, dim=0)
    targets_cat = torch.cat(all_targets, dim=0)
    probs = torch.sigmoid(logits_cat).numpy().reshape(-1)
    truth = targets_cat.numpy().reshape(-1).astype(np.uint8)

    if truth.max() != truth.min():
        # Calcula em ~100 pontos
        ths = np.linspace(0, 1, 100)
        tpr = []; fpr = []; best_thr = 0.5; best_j = -1
        P = (truth==1).sum(); N=(truth==0).sum()
        for th in ths:
            pred = (probs >= th).astype(np.uint8)
            TP = np.logical_and(pred==1, truth==1).sum()
            FP = np.logical_and(pred==1, truth==0).sum()
            TN = np.logical_and(pred==0, truth==0).sum()
            FN = np.logical_and(pred==0, truth==1).sum()
            TPR = TP / max(P,1); FPR = FP / max(N,1)
            tpr.append(TPR); fpr.append(FPR)
            J = TPR - FPR
            if J > best_j:
                best_j = J; best_thr = float(th)
        idx = np.argsort(fpr)
        roc_auc = float(np.trapz(np.array(tpr)[idx], np.array(fpr)[idx]))
    else:
        roc_auc = float('nan'); best_thr = last_best_thr

    return (total_loss / n, total_iou / n, total_dice / n, roc_auc, best_thr)


##Configuração e Execução

In [None]:
# =========================
# Config e Execução
# =========================
# caminhos das pastas
T10_dir  = "/content/drive/MyDrive/WorkCap/dataset_kaggle/dataset/t1"
T20_dir  = "/content/drive/MyDrive/WorkCap/dataset_kaggle/dataset/t2"
mask_dir = "/content/drive/MyDrive/WorkCap/dataset_kaggle/dataset/mask"

VAL_RATIO = 0.2
BATCH_SIZE = 8            # um pouco maior ajuda o OneCycleLR (ajuste conforme VRAM)
EPOCHS = 50               # mais épocas – com early stopping prático via best ckpt
LR_MAX = 3e-3             # pico do OneCycle
WD = 1e-4
GRAD_MAX_NORM = 1.0

# 1) Descobrir IDs e fazer split estratificado
ids_all = list_common_ids(T10_dir, T20_dir, mask_dir)
print(f"Total de amostras: {len(ids_all)}")

train_ids, val_ids, pos_train_ids, neg_train_ids = stratified_ids_by_mask(mask_dir, ids_all, val_ratio=VAL_RATIO, seed=42)
print(f"Split -> train: {len(train_ids)} | val: {len(val_ids)} | train_pos: {len(pos_train_ids)} | train_neg: {len(neg_train_ids)}")

# 2) Datasets
train_tf = JointAugment()
val_tf = None
train_ds = SiameseDataset(T10_dir, T20_dir, mask_dir, transform=train_tf, ids=train_ids)
val_ds   = SiameseDataset(T10_dir, T20_dir, mask_dir, transform=val_tf,   ids=val_ids)

# 3) DataLoaders (sampler balanceado para treino)
num_workers = 4
sampler = make_weighted_sampler(train_ids, pos_train_ids, neg_train_ids)
train_dl = DataLoader(train_ds, batch_size=BATCH_SIZE, sampler=sampler, num_workers=num_workers, pin_memory=True, drop_last=False)
val_dl   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False,     num_workers=num_workers, pin_memory=True)

# 4) Modelo, loss, otimizador, scheduler
c_in = train_ds[0][0].shape[0]   # nº de bandas em T1 (igual a T2)
model = SiamUnet_diff(n_channels=c_in, n_classes=1, enable_attention=True, attn_reduction=8).to(device)

# pos_weight por pixels no treino
pos_weight = compute_pixel_pos_weight(mask_dir, train_ids)
print(f"pos_weight(pixel) = {pos_weight.item():.3f}")

criterion = ComboLoss(bce_weight=0.6, pos_weight=pos_weight, use_tversky=False)
optimizer = torch.optim.AdamW(model.parameters(), lr=LR_MAX, weight_decay=WD)

# OneCycleLR: define steps_per_epoch pelo train_dl
steps_per_epoch = math.ceil(len(train_dl))
scheduler = torch.optim.lr_scheduler.OneCycleLR(
    optimizer,
    max_lr=LR_MAX,
    epochs=EPOCHS,
    steps_per_epoch=steps_per_epoch,
    pct_start=0.1, div_factor=10.0, final_div_factor=1e2
)

scaler = torch.amp.GradScaler('cuda', enabled=(device.type=="cuda"))

# 5) Loop de treino com checkpoint no melhor IoU (thr adaptativo por ROC)
best_iou = -1.0
best_thr = 0.5
ckpt_path = "siamunet_scSE_best.pth"

hist = {
    "epoch": [],
    "train_loss": [],
    "train_iou": [],
    "val_loss": [],
    "val_iou": [],
    "val_dice": [],
    "val_roc_auc": [],
    "best_thr": [],
    "lr": []
}

for epoch in range(1, EPOCHS+1):
    tr_loss, tr_iou = train_one_epoch(model, train_dl, optimizer, criterion, scaler, max_norm=GRAD_MAX_NORM)
    # -> Chamaremos manualmente por batch seria o ideal; alternativa: usar scheduler.step() por iteração.
    # Para simplicidade (e compatibilidade), chamamos aqui um step extra proporcional:
    for _ in range(max(1, steps_per_epoch)):
        scheduler.step()

    va_loss, va_iou, va_dice, va_auc, thr_opt = evaluate(model, val_dl, criterion, use_best_thr=False, last_best_thr=best_thr)
    # Re-avalia com threshold ótimo encontrado, só para logging de IoU/Dice com thr ótimo
    va_loss2, va_iou2, va_dice2, _, _ = evaluate(model, val_dl, criterion, use_best_thr=True, last_best_thr=thr_opt)

    # salvar melhor por IoU (com thr ótimo)
    if va_iou2 > best_iou:
        best_iou = va_iou2
        best_thr = thr_opt
        torch.save({
            "model": model.state_dict(),
            "epoch": epoch,
            "val_iou": va_iou2,
            "val_dice": va_dice2,
            "val_auc": va_auc,
            "best_thr": best_thr,
            "c_in": c_in
        }, ckpt_path)

    # LR atual (primeiro param group)
    cur_lr = optimizer.param_groups[0]["lr"]

    # log
    hist["epoch"].append(epoch)
    hist["train_loss"].append(tr_loss)
    hist["train_iou"].append(tr_iou)
    hist["val_loss"].append(va_loss)
    hist["val_iou"].append(va_iou2)   # com thr ótimo
    hist["val_dice"].append(va_dice2) # com thr ótimo
    hist["val_roc_auc"].append(va_auc)
    hist["best_thr"].append(thr_opt)
    hist["lr"].append(cur_lr)

    print(f"[{epoch:02d}/{EPOCHS}] "
          f"train_loss={tr_loss:.4f} IoU={tr_iou:.3f} | "
          f"val_loss={va_loss:.4f} IoU@thr*={va_iou2:.3f} Dice@thr*={va_dice2:.3f} AUC={va_auc:.3f} thr*={thr_opt:.3f} | "
          f"best_IoU={best_iou:.3f}")

print("Treino finalizado. Melhor checkpoint salvo em:", ckpt_path)
print(f"Melhor threshold valid: {best_thr:.3f}")



Total de amostras: 945
Split -> train: 756 | val: 189 | train_pos: 604 | train_neg: 341
pos_weight(pixel) = 75.484


  roc_auc = float(np.trapz(np.array(tpr)[idx], np.array(fpr)[idx]))


[01/50] train_loss=1.2693 IoU=0.026 | val_loss=0.8492 IoU@thr*=0.172 Dice@thr*=0.229 AUC=0.936 thr*=0.485 | best_IoU=0.172
[02/50] train_loss=0.8897 IoU=0.089 | val_loss=0.5579 IoU@thr*=0.412 Dice@thr*=0.484 AUC=0.990 thr*=0.455 | best_IoU=0.412
[03/50] train_loss=0.6984 IoU=0.186 | val_loss=1.1870 IoU@thr*=0.201 Dice@thr*=0.269 AUC=0.880 thr*=0.485 | best_IoU=0.412
[04/50] train_loss=0.6213 IoU=0.242 | val_loss=0.4402 IoU@thr*=0.406 Dice@thr*=0.487 AUC=0.990 thr*=0.202 | best_IoU=0.412
[05/50] train_loss=0.6456 IoU=0.296 | val_loss=0.4459 IoU@thr*=0.338 Dice@thr*=0.418 AUC=0.993 thr*=0.081 | best_IoU=0.412
[06/50] train_loss=0.5221 IoU=0.330 | val_loss=0.3763 IoU@thr*=0.370 Dice@thr*=0.456 AUC=0.993 thr*=0.152 | best_IoU=0.412
[07/50] train_loss=0.5034 IoU=0.375 | val_loss=0.4058 IoU@thr*=0.403 Dice@thr*=0.490 AUC=0.992 thr*=0.040 | best_IoU=0.412
[08/50] train_loss=0.4601 IoU=0.398 | val_loss=0.3777 IoU@thr*=0.311 Dice@thr*=0.395 AUC=0.994 thr*=0.212 | best_IoU=0.412
[09/50] train_lo

##Salvando histórico em CSV; Gráficos; Inferência rápida

In [None]:
# -------------------------
# Salvar histórico em CSV
# -------------------------
hist_path = "training_history_scSE.csv"
with open(hist_path, "w", newline="") as f:
    w = csv.writer(f)
    w.writerow(list(hist.keys()))
    for i in range(len(hist["epoch"])):
        w.writerow([hist[k][i] for k in hist.keys()])
print("Histórico salvo em:", hist_path)

# -------------------------
# Gráficos (Loss, IoU, Dice, ROC AUC)
# -------------------------
def plot_and_save(x, y, title, ylabel, out_png):
    plt.figure(figsize=(7,4))
    plt.plot(x, y)
    plt.title(title)
    plt.xlabel("Época")
    plt.ylabel(ylabel)
    plt.grid(True, ls="--", alpha=0.4)
    plt.tight_layout()
    plt.savefig(out_png, dpi=150, bbox_inches="tight")
    plt.close()
    print("Figura salva:", out_png)

ep = hist["epoch"]
plot_and_save(ep, hist["train_loss"], "Loss (Treino)", "Loss", "fig_loss_train.png")
plot_and_save(ep, hist["val_loss"],   "Loss (Val)",    "Loss", "fig_loss_val.png")
plot_and_save(ep, hist["train_iou"],  "IoU (Treino)",  "IoU",  "fig_iou_train.png")
plot_and_save(ep, hist["val_iou"],    "IoU (Val)",     "IoU",  "fig_iou_val.png")
plot_and_save(ep, hist["val_dice"],   "Dice (Val)",    "Dice", "fig_dice_val.png")
plot_and_save(ep, hist["val_roc_auc"],"ROC AUC (Val)", "AUC",  "fig_auc_val.png")

# -------------------------
# Inferência rápida em um minibatch de validação
# -------------------------
@torch.no_grad()
def predict_batch(model, t1, t2, thr=0.5):
    model.eval()
    with torch.amp.autocast('cuda', enabled=(device.type=='cuda')):
        logits = model(t1.to(device), t2.to(device))
        probs = torch.sigmoid(logits)
    preds = (probs > thr).float()
    return preds.cpu(), probs.cpu()

for t1b, t2b, mb in val_dl:
    preds, probs = predict_batch(model, t1b, t2b, thr=best_thr)
    print("Pred shape:", preds.shape)  # (B,1,H,W)
    break


##Visualizando gráficos

In [None]:
ep = hist["epoch"]

fig, axes = plt.subplots(3, 2, figsize=(12, 12))  # 3 linhas x 2 colunas

# Loss (Treino)
axes[0,0].plot(ep, hist["train_loss"])
axes[0,0].set_title("Loss (Treino)")
axes[0,0].set_xlabel("Epoch"); axes[0,0].set_ylabel("Loss")
axes[0,0].grid(True, ls="--", alpha=0.4)

# Loss (Val)
axes[0,1].plot(ep, hist["val_loss"])
axes[0,1].set_title("Loss (Val)")
axes[0,1].set_xlabel("Epoch"); axes[0,1].set_ylabel("Loss")
axes[0,1].grid(True, ls="--", alpha=0.4)

# IoU (Treino)
axes[1,0].plot(ep, hist["train_iou"])
axes[1,0].set_title("IoU (Treino)")
axes[1,0].set_xlabel("Epoch"); axes[1,0].set_ylabel("IoU")
axes[1,0].grid(True, ls="--", alpha=0.4)

# IoU (Val)
axes[1,1].plot(ep, hist["val_iou"])
axes[1,1].set_title("IoU (Val)")
axes[1,1].set_xlabel("Epoch"); axes[1,1].set_ylabel("IoU")
axes[1,1].grid(True, ls="--", alpha=0.4)

# Dice (Val)
axes[2,0].plot(ep, hist["val_dice"])
axes[2,0].set_title("Dice (Val)")
axes[2,0].set_xlabel("Epoch"); axes[2,0].set_ylabel("Dice")
axes[2,0].grid(True, ls="--", alpha=0.4)

# ROC AUC (Val)
axes[2,1].plot(ep, hist["val_roc_auc"])
axes[2,1].set_title("ROC AUC (Val)")
axes[2,1].set_xlabel("Epoch"); axes[2,1].set_ylabel("AUC")
axes[2,1].grid(True, ls="--", alpha=0.4)

plt.tight_layout()
plt.show()

#Inferência

In [None]:
# ---------- CONFIG ----------
# Configuração: Definição das variáveis e caminhos necessários para a execução do script.
T10_INF = "/content/drive/MyDrive/WorkCap/dataset_kaggle/avaliacao/t1"
T20_INF = "/content/drive/MyDrive/WorkCap/dataset_kaggle/avaliacao/t2"
CKPT_PATH = "siamunet_diff_best.pth"
OUT_DIR = "/content/preds_siamunet"
SHOW_MAX = 315
THRESH = 0.4
SAVE_OUTPUTS = True

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# ---------- Utilitários  ----------
# Funções auxiliares para manipulação de arquivos,
# leitura de imagens, visualização e salvamento de resultados.
if 'list_common_recortes' not in globals():
    def list_common_recortes(d1, d2):
        f = lambda d: {n for n in os.listdir(d) if n.startswith("recorte_") and n.endswith(".tif")}
        return sorted(list(f(d1) & f(d2)))

if 'read_image_norm' not in globals():
    def read_image_norm(path):
        with rasterio.open(path) as src:
            img = src.read().astype(np.float32)  # (C,H,W)
            img = np.nan_to_num(img, nan=0.0)
            mn, mx = img.min(), img.max()
            if mx > mn:
                img = (img - mn) / (mx - mn)
            else:
                img[:] = 0.0
            prof = src.profile
        return torch.from_numpy(img), prof  # (C,H,W), profile

if 'show_triplet' not in globals():
    def show_triplet(t1, t2, mbin, title=None, save_path=None):
        def to_rgb_first3(t):
            c, h, w = t.shape
            if c >= 3:
                rgb = torch.stack([t[0], t[1], t[2]], dim=0).permute(1,2,0).cpu().numpy()
            else:
                g = t[0].cpu().numpy()
                rgb = np.stack([g,g,g], axis=-1)
            return np.clip(rgb, 0, 1)

        rgb1 = to_rgb_first3(t1)
        rgb2 = to_rgb_first3(t2)
        mnp  = mbin.cpu().numpy()

        plt.figure(figsize=(12,4))
        plt.subplot(1,3,1); plt.imshow(rgb1); plt.title("T1"); plt.axis("off")
        plt.subplot(1,3,2); plt.imshow(rgb2); plt.title("T2"); plt.axis("off")
        plt.subplot(1,3,3); plt.imshow(mnp);  plt.title("Máscara (binária)"); plt.axis("off")
        if title: plt.suptitle(title, y=0.98)
        plt.tight_layout()
        if save_path is not None:
            Path(save_path).parent.mkdir(parents=True, exist_ok=True)
            plt.savefig(save_path, dpi=150, bbox_inches="tight")
        plt.show()
        plt.close()

if 'save_geotiff_like' not in globals():
    def save_geotiff_like(ref_profile, out_path, array, as_uint8=True):
        prof = ref_profile.copy()
        prof.update({"count": 1, "compress": "lzw"})
        if as_uint8:
            arr = (array * 255).astype(np.uint8) if array.dtype != np.uint8 else array
            prof.update({"dtype": rasterio.uint8})
        else:
            arr = array.astype(np.float32)
            prof.update({"dtype": rasterio.float32})
        Path(out_path).parent.mkdir(parents=True, exist_ok=True)
        with rasterio.open(out_path, "w", **prof) as dst:
            dst.write(arr, 1)

# ---------- Modelo (reutiliza o já instanciado) ----------
# supõe que `model` JÁ existe na RAM; se quiser recarregar pesos do ckpt, descomente:
# ckpt = torch.load(CKPT_PATH, map_location=device)
# model.load_state_dict(ckpt["model"], strict=True)

model.to(device).eval()

# ---------- LOOP DE INFERÊNCIA ----------
# Loop Principal Itera sobre os pares de imagens, executa o modelo e salva/exibe os resultados
# Obtém a lista de nomes de arquivos de imagem que existem em ambos os diretórios (T1 e T2)
ids = list_common_recortes(T10_INF, T20_INF)
print(f"Pares encontrados para inferência: {len(ids)}")
Path(OUT_DIR).mkdir(parents=True, exist_ok=True)

shown = 0
for name in ids:
    t1, prof1 = read_image_norm(os.path.join(T10_INF, name))
    t2, _     = read_image_norm(os.path.join(T20_INF, name))
    H, W = t1.shape[-2], t1.shape[-1]

    with torch.no_grad():
        logits = model(t1.unsqueeze(0).to(device), t2.unsqueeze(0).to(device))  # (1,1,h,w)
        probs  = torch.sigmoid(logits)[0,0]  # (h,w)
        if probs.shape[-2:] != (H, W):
            probs = F.interpolate(probs.unsqueeze(0).unsqueeze(0), size=(H,W),
                                  mode="bilinear", align_corners=False)[0,0]
        m_bin = (probs > THRESH).float().cpu()

    # Mostrar
    # Exibe o resultado se o limite de visualização não foi atingido
    if shown < SHOW_MAX:
        show_triplet(
            t1, t2, m_bin,
            title=name,
            save_path=(Path(OUT_DIR)/"figs"/f"{name}.png" if SAVE_OUTPUTS else None)
        )
        shown += 1

    # Salvar GeoTIFFs
    #  Salva os resultados em formato GeoTIFF se a flag estiver ativa
    if SAVE_OUTPUTS:
        save_geotiff_like(prof1, str(Path(OUT_DIR)/"tif_prob"/name),
                          probs.cpu().numpy().astype(np.float32), as_uint8=False)
        save_geotiff_like(prof1, str(Path(OUT_DIR)/"tif_bin"/name),
                          m_bin.numpy().astype(np.uint8), as_uint8=True)

print("Inferência concluída.")
print(f"Saídas em: {OUT_DIR}")


#Salvar predições em CSV

In [None]:
# --------------------------
# CONFIG (ajuste os caminhos)
# --------------------------
t1_dir = "/content/drive/MyDrive/WorkCap/dataset_kaggle/avaliacao/t1"
t2_dir = "/content/drive/MyDrive/WorkCap/dataset_kaggle/avaliacao/t2"
csv_path = "Siamunet_attentionscSE.csv"      # saída (binária 0/1)
save_probs_csv = False                        # também salvar probabilidades?
probs_csv_path = "predicted_change_probs.csv" # se True acima
THRESH = 0.4

# Se você já tem `model` com pesos na memória, deixe False.
LOAD_FROM_CKPT = False
CKPT_PATH = "siamunet_diff_best.pth"

VALID_EXTS = {".tif", ".tiff"}
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# --------------------------
# Utilitários
# --------------------------
def natural_key(s: str):
    """Ordena 'recorte2' < 'recorte10' (1,2,3,...,10,11,...)"""
    return [int(t) if t.isdigit() else t.lower() for t in re.split(r'(\d+)', s)]

def list_images_by_id(folder):
    """Retorna dict: {nome_sem_ext: caminho_completo}, filtrando por .tif/.tiff"""
    out = {}
    for name in os.listdir(folder):
        if name.startswith('.'):
            continue
        path = os.path.join(folder, name)
        if not os.path.isfile(path):
            continue
        root, ext = os.path.splitext(name)
        if ext.lower() not in VALID_EXTS:
            continue
        out[root] = path
    return out

def read_image_minmax(path):
    """Lê TIFF (todas bandas) e normaliza por tile para [0,1]."""
    with rasterio.open(path) as src:
        img = src.read().astype(np.float32)  # (C,H,W)
        img = np.nan_to_num(img, nan=0.0)
        mn, mx = img.min(), img.max()
        if mx > mn:
            img = (img - mn) / (mx - mn)
        else:
            img[:] = 0.0
    return torch.from_numpy(img)  # (C,H,W) float32

# --------------------------
# Preparar dados e modelo
# Cria mapeamentos de ID para caminho de arquivo para T1 e T2
t1_map = list_images_by_id(t1_dir)
t2_map = list_images_by_id(t2_dir)

# Encontra a interseção de IDs (arquivos com mesmo nome base)

ids_t1 = set(t1_map.keys())
ids_t2 = set(t2_map.keys())

# ORDEM NATURAL PELO NOME
common_ids = sorted(ids_t1 & ids_t2, key=natural_key)
only_t1 = sorted(ids_t1 - ids_t2, key=natural_key)
only_t2 = sorted(ids_t2 - ids_t1, key=natural_key)

# Assume que todas as imagens têm as mesmas dimensões.
# Usa o primeiro par de imagens para definir o cabeçalho do CSV.
print(f"T1 válidos: {len(ids_t1)} | T2 válidos: {len(ids_t2)} | IDs em comum: {len(common_ids)}")
if only_t1: print(f"Só em T1 ({len(only_t1)}): {only_t1[:10]} ...")
if only_t2: print(f"Só em T2 ({len(only_t2)}): {only_t2[:10]} ...")
assert len(common_ids) > 0, "Não há interseção de nomes entre T1 e T2."

# Header (H, W, C) a partir do primeiro par
first_id = common_ids[0]
with rasterio.open(t1_map[first_id]) as src0:
    C = src0.count
    H, W = src0.height, src0.width
num_pixels = H * W
header = ["id"] + [f"pixel_{i}" for i in range(num_pixels)]
print(f"Header: {H}x{W}({num_pixels} px), C={C}")

# --------------------------
# Modelo (reutiliza o existente se LOAD_FROM_CKPT=False)
# --------------------------
if LOAD_FROM_CKPT:
    # Recrie o MESMO modelo usado no treino antes de carregar (com atenção)
    # Certifique-se de que a classe SiamUnet_diff está definida
    model = SiamUnet_diff(n_channels=C, n_classes=1, use_scse=True, scse_reduction=16, fuse_mode='absdiff').to(device)
    ckpt = torch.load(CKPT_PATH, map_location=device)
    model.load_state_dict(ckpt["model"], strict=True)
    print(f"Checkpoint carregado (época {ckpt.get('epoch','?')})")
else:
    # Assume que o modelo já está carregado na memória
    model = model.to(device)
    print("Modelo existente utilizado")

model.eval()

# --------------------------
# Escrever CSV(s)
# --------------------------
skipped_shape = 0
# Contador de pares de imagens pulados por terem shapes diferentes.
# Cria os diretórios de saída se não existirem.
Path(os.path.dirname(csv_path) or ".").mkdir(parents=True, exist_ok=True)
if save_probs_csv:
    Path(os.path.dirname(probs_csv_path) or ".").mkdir(parents=True, exist_ok=True)

with ExitStack() as stack:
    fbin = stack.enter_context(open(csv_path, "w", newline=""))
    writer_bin = csv.writer(fbin)
    writer_bin.writerow(header)
# Se a flag for True, abre e prepara o arquivo CSV para as probabilidades
    if save_probs_csv:
        fprb = stack.enter_context(open(probs_csv_path, "w", newline=""))
        writer_prb = csv.writer(fprb)
        writer_prb.writerow(header)
    else:
        writer_prb = None

    with torch.no_grad():
        for i, id_ in enumerate(common_ids, 1):
            p1, p2 = t1_map[id_], t2_map[id_]

            # Leitura + normalização (igual ao treino)
            t1 = read_image_minmax(p1).unsqueeze(0).to(device)  # [1,C,H,W]
            t2 = read_image_minmax(p2).unsqueeze(0).to(device)  # [1,C,H,W]

            # Verificar shapes
            if t1.shape != t2.shape:
                print(f"Shape diferente em {id_}: T1 {tuple(t1.shape)} vs T2 {tuple(t2.shape)}. Pulando.")
                skipped_shape += 1
                continue

            # Inferência
            # passa o par de imagens pelo modelo
            logits = model(t1, t2)              # [1,1,h,w]
            probs  = torch.sigmoid(logits)[0,0] # [h,w]

            # Garantir tamanho (H,W) para casar com header
            if probs.shape[-2:] != (H, W):
                probs = F.interpolate(
                    probs.unsqueeze(0).unsqueeze(0),
                    size=(H, W),
                    mode="bilinear",
                    align_corners=False
                )[0,0]

            # CORREÇÃO: Mantém o nome original com underline e adiciona extensão .tif
            id_col = f"{id_}.tif"

            # Binária
            pred_mask = (probs > THRESH).to(torch.uint8).cpu().numpy().reshape(-1)
            writer_bin.writerow([id_col] + pred_mask.tolist())

            # (Opcional) Probabilidades
            if writer_prb is not None:
                writer_prb.writerow([id_col] + probs.cpu().numpy().astype(np.float32).reshape(-1).tolist())
            # Imprime o progresso a cada 25 imagens.
            if i % 25 == 0:
                print(f"Progresso: {i}/{len(common_ids)} pares...")

print(f"CSV binário salvo em: {csv_path}")
if save_probs_csv:
    print(f"CSV de probabilidades salvo em: {probs_csv_path}")
if skipped_shape:
    print(f"ℹ Pares pulados por shape inconsistente: {skipped_shape}")

Device: cpu
T1 válidos: 315 | T2 válidos: 325 | IDs em comum: 315
Só em T2 (10): ['recorte_3 (1)', 'recorte_30 (1)', 'recorte_298 (1)', 'recorte_299 (1)', 'recorte_300 (1)', 'recorte_301 (1)', 'recorte_302 (1)', 'recorte_303 (1)', 'recorte_304 (1)', 'recorte_305 (1)'] ...
Header: 128x128(16384 px), C=4
Modelo existente utilizado
Progresso: 25/315 pares...
Progresso: 50/315 pares...
Progresso: 75/315 pares...
Progresso: 100/315 pares...
Progresso: 125/315 pares...
Progresso: 150/315 pares...
Progresso: 175/315 pares...
Progresso: 200/315 pares...
Progresso: 225/315 pares...
Progresso: 250/315 pares...
Progresso: 275/315 pares...
Progresso: 300/315 pares...
CSV binário salvo em: Siamunet_attentionscSE.csv


# Padronizar o csv ao padrão Kaggle e mostrar primeiras linhas


In [None]:

# >>> ajuste o(s) caminho(s) se necessário <<<
csv_masks = "Siamunet_attentionscSE.csv"  # CSV de máscaras binárias 0/1
csv_probs = "predicted_change_probs.csv"    # (opcional) CSV de probabilidades

# opções de visualização (só pra ficar legível)
pd.set_option("display.max_rows", 20)
pd.set_option("display.max_columns", 30)  # aumente se quiser ver mais colunas
pd.set_option("display.width", 0)

# helper p/ exibir em notebook ou terminal
try:
    from IPython.display import display
except Exception:
    display = print

def padronizar_id_recorte(x: str) -> str:
    """
    Retorna 'recorte_123.tif' a partir de qualquer variação:
      - 'recorte_123.tif'
      - 'recorte123.tif'
      - 'recorte_123.tiff'
      - caminho completo '/.../recorte_123.tif'
      - apenas '123' (como fallback) -> 'recorte_123.tif'
    """
    base = os.path.basename(str(x))
    root, _ext = os.path.splitext(base)

    # Se vier só o número (ex.: "6"), vira "recorte_6"
    if re.fullmatch(r"\d+", root):
        root = f"recorte_{root}"

    # Inserir "_" se vier "recorte123"
    root = re.sub(r"^(recorte)(\d+)$", r"\1_\2", root, flags=re.IGNORECASE)
    # Se já vier "recorte_123", mantém
    root = re.sub(r"^(recorte)_(\d+)$", r"\1_\2", root, flags=re.IGNORECASE)

    # Normaliza extensão para .tif
    return f"{root}.tif"

def ajustar_id_col(df: pd.DataFrame) -> pd.DataFrame:
    """Aplica a padronização na coluna 'id', se existir."""
    if "id" in df.columns:
        df = df.copy()
        df["id"] = df["id"].apply(padronizar_id_recorte)
    return df

# --- máscaras binárias ---
if os.path.exists(csv_masks):
    dfm = pd.read_csv(csv_masks)
    dfm = ajustar_id_col(dfm)
    print(f"Máscaras – shape: {dfm.shape}")
    pixel_cols = [c for c in dfm.columns if str(c).startswith("pixel_")]

    # Exibição compacta
    if len(pixel_cols) > 20:
        cols_view = ["id"] + pixel_cols[:20]
        print("(mostrando apenas as 20 primeiras colunas de pixels)")
        display(dfm[cols_view].head(10))
    else:
        display(dfm.head(10))

    # (Opcional) salvar uma cópia já padronizada
    out_masks_norm = os.path.splitext(csv_masks)[0] + "_norm_ids.csv"
    dfm.to_csv(out_masks_norm, index=False)
    print(f"CSV (máscaras) com IDs padronizados salvo em: {out_masks_norm}")
else:
    print(f"CSV de máscaras não encontrado em: {csv_masks}")

# --- probabilidades (opcional) ---
if os.path.exists(csv_probs):
    dfp = pd.read_csv(csv_probs)
    dfp = ajustar_id_col(dfp)
    print(f"\nProbabilidades – shape: {dfp.shape}")
    pixel_cols = [c for c in dfp.columns if str(c).startswith("pixel_")]

    if len(pixel_cols) > 20:
        cols_view = ["id"] + pixel_cols[:20]
        print("(mostrando apenas as 20 primeiras colunas de pixels)")
        display(dfp[cols_view].head(10))
    else:
        display(dfp.head(10))

    # (Opcional) salvar uma cópia já padronizada
    out_probs_norm = os.path.splitext(csv_probs)[0] + "_norm_ids.csv"
    dfp.to_csv(out_probs_norm, index=False)
    print(f"CSV (probabilidades) com IDs padronizados salvo em: {out_probs_norm}")
else:
    print(f"ℹ CSV de probabilidades não encontrado (opcional): {csv_probs}")


Máscaras – shape: (315, 16385)
(mostrando apenas as 20 primeiras colunas de pixels)


Unnamed: 0,id,pixel_0,pixel_1,pixel_2,pixel_3,pixel_4,pixel_5,pixel_6,pixel_7,pixel_8,pixel_9,pixel_10,pixel_11,pixel_12,pixel_13,pixel_14,pixel_15,pixel_16,pixel_17,pixel_18,pixel_19
0,recorte_1.tif,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,recorte_2.tif,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
2,recorte_3.tif,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
3,recorte_4.tif,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
4,recorte_5.tif,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
5,recorte_6.tif,1,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0
6,recorte_7.tif,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,0,0
7,recorte_8.tif,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
8,recorte_9.tif,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
9,recorte_10.tif,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


CSV (máscaras) com IDs padronizados salvo em: Siamunet_attentionscSE_norm_ids.csv
ℹ CSV de probabilidades não encontrado (opcional): predicted_change_probs.csv


In [None]:
# 1) Verifique o tamanho (opcional)
!du -sh /content/preds_siamunet/figs

# 2) Compacte em ZIP
!zip -r -q /content/preds_siamunet/figs.zip /content/preds_siamunet/figs

# 3) Baixe para o seu computador
from google.colab import files
files.download('/content/preds_siamunet/figs')


##Bibliografia

[1] NGOC, Hoang; NGUYEN VINH, Nghi; LÊ, Nhi; NGUYEN, Nam; LE, Thu; DINH NGUYEN, Vinh. Enhancing Semantic Scene Segmentation for Indoor Autonomous Systems Using Advanced Attention-Supported Improved UNet. 2024. DOI: https://doi.org/10.21203/rs.3.rs-4587262/v1.

[2] CHICCHON, Miguel; BEDON, Hector; DEL-BLANCO, Carlos; SIPIRAN, Ivan. Semantic Segmentation of Fish and Underwater Environments Using Deep Convolutional Neural Networks and Learned Active Contours. IEEE Access, v. PP, p. 1-1, 2023. DOI: https://doi.org/10.1109/ACCESS.2023.3262649.

[3] VERMA, Sagar; GUPTA, Kavya. Post Wildfire Burnt-up Detection using Siamese UNet. In: EUROPEAN CONFERENCE ON MACHINE LEARNING AND PRINCIPLES AND PRACTICE OF KNOWLEDGE DISCOVERY IN DATABASES (ECML PKDD), 2023, Turin. Anais... Turin: [s.n.], set. 2023. Disponível em: https://hal.science/hal-04225474.

[4] ZHANG, Xiangrong; HE, Ling; QIN, Kai; DANG, Qi; SI, Hongjie; TANG, Xu; JIAO, Licheng. SMD-Net: Siamese Multi-Scale Difference-Enhancement Network for Change Detection in Remote Sensing. Remote Sensing, v.14, n.7: 1580. 2022. DOI: https://doi.org/10.3390/rs14071580.