In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.transforms import v2
from torch.utils.data import DataLoader, Dataset, Subset
from sklearn.model_selection import StratifiedKFold
from concurrent.futures import ThreadPoolExecutor
import threading
import numpy as np
import os
from PIL import Image
from tqdm.notebook import tqdm
import time

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
print(f"GPU available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

In [None]:
drive_path = "/content/drive/MyDrive/Espectrogramas5a20/"
spectrogram_path = os.path.join(drive_path, "Ramphastidae")
path_to_save_model = os.path.join(drive_path, "ModelosCNN")
os.makedirs(path_to_save_model, exist_ok=True)
number_epochs = 80
filtered_files_path = "/content/drive/MyDrive/filtered_files_google_driveR.txt"
start_time = time.time()

In [None]:
class BirdCNN(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 64, 5, padding=2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(64, 128, 5, padding=2),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(128, 256, 3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(256, 512, 3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.AvgPool2d(8),
        )

        with torch.no_grad():
            dummy_input = torch.randn(1, 1, 256, 512)
            features = self.features(dummy_input)
            feature_size = features.view(1, -1).size(1)
            print(feature_size)

        self.classifier = nn.Sequential(
            nn.Linear(feature_size, 512),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        return self.classifier(x)

transformar = v2.Compose([
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
    v2.Grayscale(),
    v2.Resize((256, 512)),
    v2.Normalize(mean=[0.5], std=[0.5])
])

class FilteredSpectrogramDataset(Dataset):
    def __init__(self, filelist_path="filtered_files.txt", transform=None):
        self.transform = transform
        self.samples = []

        with open(filelist_path) as f:
            for line in f:
                img_path, class_dir = line.strip().split(',')
                self.samples.append((img_path, class_dir))

        self.classes = sorted(set([s[1] for s in self.samples]))
        self.class_to_idx = {cls: i for i, cls in enumerate(self.classes)}

        self.preloaded_images = [None] * len(self.samples)

        def load_image(i, img_path):
            img = Image.open(img_path).convert('L').crop((316, 60, 2181, 986))
            if self.transform:
                img = self.transform(img)
            self.preloaded_images[i] = img

        print("Preloading images with threading...")
        with ThreadPoolExecutor(max_workers=8) as executor:
            list(tqdm(
                executor.map(lambda x: load_image(x[0], x[1][0]),
                           enumerate(self.samples)),
                total=len(self.samples),
                desc="Preloading images"
            ))

    def __len__(self):
        return len(self.preloaded_images)

    def __getitem__(self, idx):
        return self.preloaded_images[idx], self.class_to_idx[self.samples[idx][1]]

In [None]:
dataset = FilteredSpectrogramDataset(
    filelist_path=filtered_files_path,
    transform=transformar
)

print(f"Total classes detected: {len(dataset.classes)}")
print(f"Class names: {dataset.classes}")
print(f"Original samples: {sum([len(files) for _, _, files in os.walk(spectrogram_path)])}")
print(f"Filtered samples: {len(dataset)}")

In [None]:
def calculate_class_weights(dataset_obj):
    """Calculate class weights inversely proportional to class frequencies"""
    # Cálculo de amostras por classe
    class_counts = np.zeros(len(dataset_obj.classes))
    for _, label in dataset_obj.samples:
        class_counts[dataset_obj.class_to_idx[label]] += 1

    # Computa pesos inversos e os normaliza
    inverse_weights = 1. / class_counts
    normalized_weights = inverse_weights / inverse_weights.sum() * len(class_counts)  # Normalize

    print(f"Class distribution: {dict(zip(dataset_obj.classes, class_counts))}")
    print(f"Computed weights: {normalized_weights}")

    return torch.tensor(normalized_weights, dtype=torch.float32).to(device)

# Calcula pesos iniciais de classe
class_weights = calculate_class_weights(dataset)
smoothed_weights = torch.log1p(class_weights)
print(f"Smoothed weights: {smoothed_weights}")

sample, _ = next(iter(DataLoader(dataset, batch_size=1)))
print(f"Spectrogram shape: {sample.shape}")

def calculate_class_weights_for_split(train_indices, dataset_obj):
    """Calculate class weights for a specific training split"""
    class_counts = np.zeros(len(dataset_obj.classes))

    for idx in train_indices:
        _, label_str = dataset_obj.samples[idx]
        class_idx = dataset_obj.class_to_idx[label_str]
        class_counts[class_idx] += 1

    class_counts = np.maximum(class_counts, 1e-8)

    inverse_weights = 1. / class_counts
    normalized_weights = inverse_weights / inverse_weights.sum() * len(class_counts)

    print(f"Split class distribution: {dict(zip(dataset_obj.classes, class_counts.astype(int)))}")
    print(f"Split computed weights: {normalized_weights}")

    return torch.tensor(normalized_weights, dtype=torch.float32).to(device)

In [None]:
def print_fold_stats(train_indices, val_indices, dataset_obj):
    """Print class distribution for current fold"""
    print("\nFold Distribution:")

    for class_name in dataset_obj.classes:
        train_count = sum(1 for idx in train_indices
                          if dataset_obj.samples[idx][1] == class_name)
        val_count = sum(1 for idx in val_indices
                        if dataset_obj.samples[idx][1] == class_name)

        total_count = train_count + val_count
        val_percentage = (val_count / total_count * 100) if total_count > 0 else 0

        print(f"  {class_name}: {train_count} train, {val_count} val "
              f"({val_percentage:.1f}% in validation)")


def train_single_fold(model, train_loader, val_loader, criterion, optimizer,
                      scheduler, epochs, fold_num, patience=10):
    """Train and validate for a single fold"""
    model.to(device)
    best_acc = 0
    best_model_state = None
    fold_train_losses = []
    fold_val_losses = []
    fold_train_accs = []
    fold_val_accs = []

    epochs_no_improve = 0
    best_epoch = 0

    for epoch in range(epochs):
        # Fase de treino
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            scheduler.step()

            train_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            train_total += labels.size(0)
            train_correct += (predicted == labels).sum().item()

        # Fase de validação
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                val_loss += criterion(outputs, labels).item()
                _, predicted = torch.max(outputs, 1)
                val_correct += (predicted == labels).sum().item()
                val_total += labels.size(0)

        # Cálculo de métricas
        avg_train_loss = train_loss / len(train_loader)
        avg_val_loss = val_loss / len(val_loader)
        train_acc = 100 * train_correct / train_total
        val_acc = 100 * val_correct / val_total
        current_lr = optimizer.param_groups[0]['lr']

        # Salvar métricas para este epoch
        fold_train_losses.append(avg_train_loss)
        fold_val_losses.append(avg_val_loss)
        fold_train_accs.append(train_acc)
        fold_val_accs.append(val_acc)

        print(f"Fold {fold_num + 1} Epoch {epoch + 1}: "
              f"Train Loss: {avg_train_loss:.4f}, "
              f"Train Acc: {train_acc:.2f}%, "
              f"Val Loss: {avg_val_loss:.4f}, "
              f"Val Acc: {val_acc:.2f}%, "
              f"LR: {current_lr:.6f}")

        # Salvar melhor modelo do fold
        if val_acc > best_acc:
            best_acc = val_acc
            best_epoch = epoch
            epochs_no_improve = 0

            best_model_state = {
                'fold': fold_num,
                'epoch': epoch,
                'model_state_dict': model.state_dict().copy(),
                'optimizer_state_dict': optimizer.state_dict().copy(),
                'scheduler_state_dict': scheduler.state_dict().copy(),
                'val_acc': best_acc,
                'class_names': dataset.classes,
                'train_losses': fold_train_losses.copy(),
                'val_losses': fold_val_losses.copy(),
                'train_accs': fold_train_accs.copy(),
                'val_accs': fold_val_accs.copy()
            }
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                print(f"Early stopping triggered at epoch {epoch + 1}")
                print(f"Best validation accuracy: {best_acc:.2f}% at epoch {best_epoch + 1}")
                break
    print(f"Fold {fold_num + 1} completed: Best Val Acc {best_acc:.2f}% at epoch {best_epoch + 1}")

    if best_model_state is None:
            best_model_state = {
                'fold': fold_num,
                'epoch': best_epoch,
                'model_state_dict': model.state_dict().copy(),
                'optimizer_state_dict': optimizer.state_dict().copy(),
                'scheduler_state_dict': scheduler.state_dict().copy(),
                'val_acc': best_acc,
                'class_names': dataset.classes,
                'train_losses': fold_train_losses.copy(),
                'val_losses': fold_val_losses.copy(),
                'train_accs': fold_train_accs.copy(),
                'val_accs': fold_val_accs.copy()
            }

    return best_acc, best_model_state


def cross_validate_model(dataset, num_epochs=number_epochs, n_splits=5, patience=10):
    """Perform k-fold cross-validation"""
    labels = [dataset.class_to_idx[dataset.samples[i][1]] for i in range(len(dataset))]
    labels = np.array(labels)

    # Inicializa k-fold
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)

    fold_results = []
    best_models = []
    epochs_used = []
    print(f"\nStarting {n_splits}-fold cross-validation with {len(dataset)} samples")
    print(f"Number of classes: {len(dataset.classes)}")
    print(f"Early stopping patience: {patience} epochs")

    for fold, (train_idx, val_idx) in enumerate(skf.split(range(len(dataset)), labels)):
        print(f"\n{'=' * 60}")
        print(f"Fold {fold + 1}/{n_splits}")
        print(f"{'=' * 60}")

        print_fold_stats(train_idx, val_idx, dataset)

        train_subset = Subset(dataset, train_idx)
        val_subset = Subset(dataset, val_idx)

        # Calcula pesos de classe for para esta parte específica do treino
        fold_class_weights = calculate_class_weights_for_split(train_idx, dataset)
        smoothed_weights = torch.log1p(fold_class_weights)

        # Cria carregadores de dados
        train_loader = DataLoader(
            train_subset,
            batch_size=32,
            shuffle=True,
            num_workers=2,
            pin_memory=True,
            persistent_workers=True)
        val_loader = DataLoader(
            val_subset,
            batch_size=32,
            shuffle=False,
            num_workers=2,
            pin_memory=True,
            persistent_workers=True)

        # Inicializa novo modelo e otimizador para cada fold
        model = BirdCNN(num_classes=len(dataset.classes))
        criterion = nn.CrossEntropyLoss(weight=smoothed_weights)
        optimizer = optim.Adam(model.parameters(), lr=0.005)
        scheduler = torch.optim.lr_scheduler.OneCycleLR(
            optimizer,
            max_lr=0.01,
            steps_per_epoch=len(train_loader),
            epochs=num_epochs,
            pct_start=0.25
        )

        # Treina e valida este fold
        fold_best_acc, fold_best_model_state = train_single_fold(
            model, train_loader, val_loader, criterion, optimizer, scheduler,
            num_epochs, fold
        )

        fold_results.append(fold_best_acc)
        best_models.append(fold_best_model_state)
        epochs_used.append(fold_best_model_state['epoch'] + 1)  # +1 because epochs are 0-indexed

        print(f"Fold {fold + 1} completed - Best Val Acc: {fold_best_acc:.2f}% "
              f"(used {epochs_used[-1]}/{num_epochs} epochs)")

    print(f"\n{'=' * 70}")
    print("CROSS-VALIDATION RESULTS SUMMARY")
    print(f"{'=' * 70}")

    fold_accuracies = np.array(fold_results)
    mean_accuracy = np.mean(fold_accuracies)
    std_accuracy = np.std(fold_accuracies)

    total_epochs_planned = n_splits * num_epochs
    total_epochs_used = sum(epochs_used)
    epochs_saved = total_epochs_planned - total_epochs_used

    print(f"Individual fold accuracies:")
    for i, acc in enumerate(fold_results):
        print(f"  Fold {i + 1}: {acc:.2f}%")

    print(f"\nMean CV Accuracy: {mean_accuracy:.2f}% ± {std_accuracy:.2f}%")
    print(f"Best fold: {np.argmax(fold_results) + 1} with {np.max(fold_results):.2f}%")
    print(f"Worst fold: {np.argmin(fold_results) + 1} with {np.min(fold_results):.2f}%")

    # Salva o melhor modelo de todos os folds
    best_fold_idx = np.argmax(fold_results)
    best_model_state = best_models[best_fold_idx]

    save_path = os.path.join(path_to_save_model, f"best_cv_model_fold{best_fold_idx + 1}.pth")
    torch.save(best_model_state, save_path)
    print(f"\nBest model saved from fold {best_fold_idx + 1} to: {save_path}")

    return mean_accuracy, best_model_state, fold_results, epochs_used

In [None]:
try:
    cv_mean_accuracy, best_model, fold_accuracies, epochs_used = cross_validate_model(
        dataset,
        num_epochs=number_epochs,
        n_splits=5,
        patience=10
    )

    total_time = time.time() - start_time
    print(f"\nTotal training time: {total_time:.2f} seconds ({total_time / 60:.2f} minutes)")
    print(f"Final Mean Cross-Validation Accuracy: {cv_mean_accuracy:.2f}%")

except Exception as e:
    print(f"Error during cross-validation: {e}")
    import traceback

    traceback.print_exc()