In [None]:
from torch.amp import autocast, GradScaler
import os
import torch
from torch.utils.data import Dataset, DataLoader, random_split
import rasterio
from torchvision import transforms
import torchvision
from tqdm import tqdm
import numpy as np
from torch.utils.tensorboard import SummaryWriter
from datetime import datetime

torch.backends.cudnn.benchmark = True  # Acelera convoluções na GPU
torch.set_float32_matmul_precision('high')

# 1. Dataset Class (mesma que antes)
class SegmentationDataset(Dataset):
    def __init__(self, x_dir, y_dir, transform=None):
        self.x_dir = x_dir
        self.y_dir = y_dir
        self.transform = transform
        self.x_files = sorted([f for f in os.listdir(x_dir) if f.endswith('.tif')])
        self.y_files = sorted([f for f in os.listdir(y_dir) if f.endswith('.tif')])
        
        assert len(self.x_files) == len(self.y_files), "Número de arquivos em X e Y não corresponde"
        for x, y in zip(self.x_files, self.y_files):
            assert x == y, f"Arquivos não correspondentes: {x} vs {y}"

    def __len__(self):
        return len(self.x_files)

    def __getitem__(self, idx):
        x_path = os.path.join(self.x_dir, self.x_files[idx])
        y_path = os.path.join(self.y_dir, self.y_files[idx])
        
        with rasterio.open(x_path) as src:
            x = src.read()[:3]  # Garante 3 canais (RGB)
        
        with rasterio.open(y_path) as src:
            y = src.read()
        
        x = torch.from_numpy(x).float() / 255.0
        
        if y.shape[0] > 1:
            y = np.argmax(y, axis=0)
        y = torch.from_numpy(y).long()
        
        if self.transform:
            x = self.transform(x)
            
        return x, y


# 2. Divisão dos Dados
def prepare_datasets(x_dir, y_dir, val_split=0.15, test_split=0.15, random_seed=42):
    full_dataset = SegmentationDataset(
        x_dir=x_dir,
        y_dir=y_dir,
        transform=transforms.Compose([
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
    )
    
    # Calcula tamanhos
    total_size = len(full_dataset)
    test_size = int(test_split * total_size)
    val_size = int(val_split * total_size)
    train_size = total_size - val_size - test_size
    
    # Divide o dataset
    train_dataset, val_dataset, test_dataset = random_split(
        full_dataset,
        [train_size, val_size, test_size],
        generator=torch.Generator().manual_seed(random_seed)
    )
    
    return train_dataset, val_dataset, test_dataset

# 3. Preparação dos DataLoaders
def prepare_dataloaders(train_dataset, val_dataset, test_dataset, batch_size=8):
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    
    return train_loader, val_loader, test_loader

# 4. Modelo
def create_deeplabv3(output_channels=3):
    weights = torchvision.models.segmentation.DeepLabV3_ResNet50_Weights.DEFAULT
    model = torchvision.models.segmentation.deeplabv3_resnet50(weights=weights)
    model.classifier[4] = torch.nn.Conv2d(256, output_channels, kernel_size=(1, 1))
    if model.aux_classifier is not None:
        model.aux_classifier[4] = torch.nn.Conv2d(256, output_channels, kernel_size=(1, 1))
    return model

# 5. Função para visualizar segmentações no TensorBoard
def log_images(writer, model, dataloader, epoch, num_images=3):
    model.eval()
    device = next(model.parameters()).device
    images_logged = 0
    
    with torch.no_grad():
        for inputs, masks in dataloader:
            inputs, masks = inputs.to(device), masks.to(device)
            outputs = model(inputs)['out']
            preds = torch.argmax(outputs, dim=1)
            
            # Denormalize images for visualization
            mean = torch.tensor([0.485, 0.456, 0.406]).view(1, 3, 1, 1).to(device)
            std = torch.tensor([0.229, 0.224, 0.225]).view(1, 3, 1, 1).to(device)
            denorm_images = inputs * std + mean
            denorm_images = denorm_images.clamp(0, 1)
            
            # Log images, masks and predictions
            for i in range(min(num_images - images_logged, inputs.size(0))):
                writer.add_image(f'Validation/Image_{images_logged}', denorm_images[i], epoch)
                
                # Convert masks and preds to RGB for visualization
                mask_rgb = torch.stack([masks[i]]*3, dim=0).float() / 2.0  # Simple colormap
                pred_rgb = torch.stack([preds[i]]*3, dim=0).float() / 2.0
                
                writer.add_image(f'Validation/GroundTruth_{images_logged}', mask_rgb, epoch)
                writer.add_image(f'Validation/Prediction_{images_logged}', pred_rgb, epoch)
                
                images_logged += 1
                if images_logged >= num_images:
                    return

# 6. Loop de Treinamento com Validação e Early Stopping
def train_with_validation(model, train_loader, val_loader, criterion, optimizer, num_epochs=50, patience=5):
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    writer = SummaryWriter(f'runs/deeplabv3_{timestamp}')
    
    best_iou = 0.0
    epochs_no_improve = 0
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    scaler = GradScaler(enabled=True)
    
    print(f"Usando dispositivo: {device}")
    if torch.cuda.is_available():
        print(f"Nome da GPU: {torch.cuda.get_device_name(0)}")
    
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        
        for inputs, masks in tqdm(train_loader, desc=f'Train Epoch {epoch+1}/{num_epochs}'):
            inputs, masks = inputs.to(device), masks.to(device)
            
            optimizer.zero_grad(set_to_none=True)
            
            with torch.amp.autocast(device_type='cuda', dtype=torch.float16):
                outputs = model(inputs)['out']
                loss = criterion(outputs, masks)
            
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            
            train_loss += loss.item()
        
        avg_train_loss = train_loss/len(train_loader)
        writer.add_scalar('Loss/train', avg_train_loss, epoch)
        
        val_loss, val_iou = evaluate(model, val_loader, criterion)
        writer.add_scalar('Loss/val', val_loss, epoch)
        writer.add_scalar('IoU/val', val_iou, epoch)
        
        if epoch % 5 == 0:
            log_images(writer, model, val_loader, epoch)
        
        print(f'Epoch {epoch+1}:')
        print(f'Train Loss: {avg_train_loss:.4f}')
        print(f'Val Loss: {val_loss:.4f} | Val IoU: {val_iou:.4f}')
        
        if val_iou > best_iou:
            best_iou = val_iou
            epochs_no_improve = 0
            torch.save(model.state_dict(), 'best_model.pth')
            print('Melhor modelo salvo!')
        else:
            epochs_no_improve += 1
            print(f'No improvement for {epochs_no_improve} epochs')
            
            if epochs_no_improve >= patience:
                print(f'Early stopping triggered after {epoch+1} epochs!')
                break
    
    writer.close()
    return model

# 7. Função de Avaliação (mesma que antes)
def evaluate(model, dataloader, criterion):
    model.eval()
    total_loss = 0.0
    total_iou = 0.0
    device = next(model.parameters()).device
    num_classes = 3  # Substitua pelo seu número real de classes
    
    with torch.no_grad():
        for inputs, masks in dataloader:
            inputs, masks = inputs.to(device), masks.to(device)
            outputs = model(inputs)['out']
            
            # Calcula loss
            loss = criterion(outputs, masks)
            total_loss += loss.item()
            
            # Calcula IoU para cada classe
            preds = torch.argmax(outputs, dim=1)
            
            for c in range(num_classes):
                pred_mask = (preds == c)
                true_mask = (masks == c)
                
                intersection = (pred_mask & true_mask).sum().float()
                union = (pred_mask | true_mask).sum().float()
                
                if union > 0:
                    total_iou += (intersection / union).item()
    
    avg_loss = total_loss / len(dataloader)
    avg_iou = total_iou / (len(dataloader) * num_classes)
    
    return avg_loss, avg_iou

import itertools

# 8. Fluxo Principal Modificado para Hyperparameter Tuning
if __name__ == "__main__":
    # Definir as combinações de hiperparâmetros
    hyperparams = {
        'learning_rate': [0.01, 0.001, 0.0001],
        'weight_decay': [0, 1e-4, 1e-5],
        'optimizer': ['Adam', 'SGD']
    }
    
    # Gerar todas as combinações possíveis (produto cartesiano)
    keys, values = zip(*hyperparams.items())
    param_combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]
    
    # Selecionar 10 combinações aleatórias (ou as primeiras 10 se quiser todas)
    selected_combinations = param_combinations[:10]
    
    # Preparar dados (uma vez para todas as execuções)
    train_ds, val_ds, test_ds = prepare_datasets(
        x_dir='dataset/X/',
        y_dir='dataset/y/',
        val_split=0.15,
        test_split=0.15
    )
    
    train_loader, val_loader, test_loader = prepare_dataloaders(train_ds, val_ds, test_ds, batch_size=8)
    
    # Resultados serão armazenados aqui
    results = []
    
    for i, params in enumerate(selected_combinations):
        print(f"\n=== Executando combinação {i+1}/10 ===")
        print("Hiperparâmetros:", params)
        
        # Criar modelo
        model = create_deeplabv3(output_channels=3)
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        model = model.to(device)
        
        # Definir otimização com os hiperparâmetros atuais
        if params['optimizer'] == 'Adam':
            optimizer = torch.optim.Adam(
                model.parameters(), 
                lr=params['learning_rate'],
                weight_decay=params['weight_decay']
            )
        else:  # SGD
            optimizer = torch.optim.SGD(
                model.parameters(), 
                lr=params['learning_rate'],
                weight_decay=params['weight_decay'],
                momentum=0.9
            )
            
        criterion = torch.nn.CrossEntropyLoss()
        
        # Treinar com early stopping
        trained_model = train_with_validation(
            model=model,
            train_loader=train_loader,
            val_loader=val_loader,
            criterion=criterion,
            optimizer=optimizer,
            num_epochs=25,
            patience=5
        )
        
        # Avaliar no teste
        test_loss, test_iou = evaluate(trained_model, test_loader, criterion)
        print(f'Resultado: Test Loss: {test_loss:.4f} | Test IoU: {test_iou:.4f}')
        
        # Armazenar resultados
        results.append({
            'params': params,
            'test_loss': test_loss,
            'test_iou': test_iou
        })
    
    # Exibir resumo dos resultados
    print("\n=== Resumo dos Resultados ===")
    for i, res in enumerate(results):
        print(f"Combinação {i+1}:")
        print(f"Hiperparâmetros: {res['params']}")
        print(f"Test Loss: {res['test_loss']:.4f} | Test IoU: {res['test_iou']:.4f}")
        print("-" * 50)
    
    # Encontrar a melhor combinação
    best_result = max(results, key=lambda x: x['test_iou'])
    print("\n=== Melhor Combinação ===")
    print(f"Hiperparâmetros: {best_result['params']}")
    print(f"Test IoU: {best_result['test_iou']:.4f}")

## Inferência

In [2]:
import os
import torch
import rasterio
from torchvision import transforms
import numpy as np
import torchvision

# Configurações
X_DIR = 'TILES/'  # Pasta com imagens de entrada
OUTPUT_DIR = 'INFERENCIA/'  # Pasta para salvar as predições em grayscale
MODEL_PATH = 'best_model.pth'  # Caminho para o melhor modelo salvo
NUM_CLASSES = 3  # Número de classes no seu modelo

# Criar pasta de saída
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Dispositivo (GPU se disponível)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Usando dispositivo: {device}")

# Carregar modelo
def create_deeplabv3(output_channels=3):
    weights = torchvision.models.segmentation.DeepLabV3_ResNet50_Weights.DEFAULT
    model = torchvision.models.segmentation.deeplabv3_resnet50(weights=weights)
    model.classifier[4] = torch.nn.Conv2d(256, output_channels, kernel_size=(1, 1))
    if model.aux_classifier is not None:
        model.aux_classifier[4] = torch.nn.Conv2d(256, output_channels, kernel_size=(1, 1))
    return model

model = create_deeplabv3(output_channels=NUM_CLASSES)
model.load_state_dict(torch.load(MODEL_PATH, map_location=device))
model = model.to(device)
model.eval()

# Transformações
transform = transforms.Compose([
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Processar cada imagem
for filename in os.listdir(X_DIR):
    if filename.endswith('.tif'):
        input_path = os.path.join(X_DIR, filename)
        output_path = os.path.join(OUTPUT_DIR, filename)
        
        print(f"Processando: {filename}")
        
        # Carregar imagem
        with rasterio.open(input_path) as src:
            x = src.read()[:3]  # Pega apenas os 3 primeiros canais (RGB)
            meta = src.meta.copy()
        
        # Atualizar metadados para imagem de saída (1 banda, grayscale)
        meta.update({
            'count': 1,
            'dtype': 'uint8'
        })
        
        # Pré-processamento
        x = torch.from_numpy(x).float() / 255.0
        x = transform(x)
        x = x.unsqueeze(0).to(device)
        
        # Inferência
        with torch.no_grad():
            output = model(x)['out']
            pred = torch.argmax(output, dim=1).squeeze().cpu().numpy()
        
        # Converter predições para uint8 (0-255)
        pred_gray = pred.astype(np.uint8)
        
        # Salvar TIFF grayscale georreferenciado
        with rasterio.open(output_path, 'w', **meta) as dst:
            dst.write(pred_gray, 1)  # Escreve na banda 1

print("Inferência concluída! Arquivos em grayscale salvos em:", OUTPUT_DIR)

Usando dispositivo: cuda
Processando: MOSAICO_2015_tile_1000.tif
Processando: MOSAICO_2015_tile_1001.tif
Processando: MOSAICO_2015_tile_1002.tif
Processando: MOSAICO_2015_tile_1003.tif
Processando: MOSAICO_2015_tile_1004.tif
Processando: MOSAICO_2015_tile_10049.tif
Processando: MOSAICO_2015_tile_1005.tif
Processando: MOSAICO_2015_tile_10050.tif
Processando: MOSAICO_2015_tile_10051.tif
Processando: MOSAICO_2015_tile_10052.tif
Processando: MOSAICO_2015_tile_10053.tif
Processando: MOSAICO_2015_tile_10054.tif
Processando: MOSAICO_2015_tile_10055.tif
Processando: MOSAICO_2015_tile_10056.tif
Processando: MOSAICO_2015_tile_10057.tif
Processando: MOSAICO_2015_tile_10058.tif
Processando: MOSAICO_2015_tile_10059.tif
Processando: MOSAICO_2015_tile_1006.tif
Processando: MOSAICO_2015_tile_10060.tif
Processando: MOSAICO_2015_tile_10061.tif
Processando: MOSAICO_2015_tile_10062.tif
Processando: MOSAICO_2015_tile_10063.tif
Processando: MOSAICO_2015_tile_10064.tif
Processando: MOSAICO_2015_tile_10065.ti