In [1]:
from datasets import load_from_disk
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
import torchmetrics
from tqdm import tqdm
import torchaudio
from datasets import load_dataset, DatasetDict
import numpy as np
import random


SEED = 42
SAMPLE_RATE = 16000
BATCH_SIZE = 128
NUM_WORKERS = 24
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)
print(f"Using device: {DEVICE}")

  from .autonotebook import tqdm as notebook_tqdm


Using device: cuda


In [2]:
# dataset = load_dataset("Hibou-Foundation/big_ds_preprocessed_specto_noaugment_1_50g")
dataset = load_from_disk("../dataset/ds_2_noaugment_test.hf")

In [3]:
dataset = dataset.with_format("torch", columns=["audio", "label"])
print("Dataset splits:", {k: v.shape for k, v in dataset.items()})

Dataset splits: {'train': (61543, 2), 'val': (7222, 2), 'test': (7220, 2)}


In [4]:
# -----------------------------
# Collate function with Mel computation
# -----------------------------
def collate_fn(batch):
    xs = [b["audio"] for b in batch]
    ys = [b["label"] for b in batch]

    xs = torch.stack(xs, dim=0).unsqueeze(1)
    ys = torch.tensor(ys, dtype=torch.float32).unsqueeze(1)

    return xs, ys




train_loader = DataLoader(dataset["train"], batch_size=BATCH_SIZE, shuffle=True,
                          num_workers=NUM_WORKERS, collate_fn=collate_fn)
valid_loader = DataLoader(dataset["val"], batch_size=BATCH_SIZE, shuffle=False,
                          num_workers=NUM_WORKERS, collate_fn=collate_fn)
test_loader = DataLoader(dataset["test"], batch_size=BATCH_SIZE, shuffle=False,num_workers=NUM_WORKERS, collate_fn=collate_fn)

In [5]:
# -----------------------------
# Simplified CNN
# -----------------------------
class SimpleAudioCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.PReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(32, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.PReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(64, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.PReLU(),
            nn.AdaptiveAvgPool2d((1, 1))
        )
        self.fc = nn.Sequential(
            nn.Linear(128, 128),
            nn.PReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 1)
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

model = SimpleAudioCNN().to(DEVICE)

In [16]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# -----------------------------
# Residual Block Definition
# -----------------------------
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.prelu1 = nn.PReLU()

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        # Shortcut connection: Adjust for channel difference if needed
        if in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        else:
            self.shortcut = nn.Identity()

        self.prelu2 = nn.PReLU()

    def forward(self, x):
        # The main path
        residual = self.prelu1(self.bn1(self.conv1(x)))
        residual = self.bn2(self.conv2(residual))

        # Add shortcut (identity)
        shortcut = self.shortcut(x)

        # Activation after addition
        out = self.prelu2(residual + shortcut)
        return out

# -----------------------------
# Advanced CNN for Drone Audio
# -----------------------------
class AdvancedAudioCNN(nn.Module):
    def __init__(self):
        super().__init__()

        # Initial Conv Block: Larger kernel to capture basic acoustic patterns
        self.conv_init = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=5, padding=2), # 5x5 kernel
            nn.BatchNorm2d(32),
            nn.PReLU(),
            nn.MaxPool2d(2)
        )

        # Feature extraction blocks with Residual Connections
        self.res_block1 = nn.Sequential(
            ResidualBlock(32, 64),
            nn.MaxPool2d(2)
        )

        self.res_block2 = nn.Sequential(
            ResidualBlock(64, 128),
            nn.MaxPool2d(2)
        )

        # # Deeper layer
        # self.res_block3 = nn.Sequential(
        #     ResidualBlock(128, 256),
        #     nn.MaxPool2d(2)
        # )

        # Global Pooling (Average and Max)
        # This will be applied in the forward pass

        # Fully Connected Layer (FC)
        # Input features will be 256 (from GAP) + 256 (from GMP) = 512
        self.fc = nn.Sequential(
            nn.Linear(128 * 2, 128),
            nn.PReLU(),
            nn.Dropout(0.5), # Slightly increased dropout for better generalization
            nn.Linear(128, 1),
            # nn.Sigmoid() # Add this if you want the output to be a probability in [0, 1]
        )

    def forward(self, x):
        # Input shape: (Batch, 1, Freq, Time)
        x = self.conv_init(x)
        x = self.res_block1(x)
        x = self.res_block2(x)
        # x = self.res_block3(x)

        # Apply Global Average Pooling (GAP) and Global Max Pooling (GMP)
        gap = F.adaptive_avg_pool2d(x, (1, 1)).view(x.size(0), -1) # Output shape: (Batch, 256)
        gmp = F.adaptive_max_pool2d(x, (1, 1)).view(x.size(0), -1) # Output shape: (Batch, 256)

        # Concatenate both features before the FC layer
        x = torch.cat([gap, gmp], dim=1) # Output shape: (Batch, 512)

        return self.fc(x)

# Assuming DEVICE is already defined
model = AdvancedAudioCNN().to(DEVICE)

In [17]:
# -----------------------------
# Loss, optimizer, metric
# -----------------------------
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=5e-4)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)
metric_acc = torchmetrics.classification.BinaryAccuracy().to(DEVICE)

In [None]:
# -----------------------------
# Training loop
# -----------------------------
EPOCHS = 20
best_val_acc = 0

for epoch in range(EPOCHS):
    model.train()
    train_loss, train_acc = 0, 0

    for x, y in tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS} [Train]"):
        x, y = x.to(DEVICE), y.to(DEVICE)
        optimizer.zero_grad()
        out = model(x)
        loss = criterion(out, y)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * x.size(0)
        train_acc += metric_acc(out, y) * x.size(0)

    scheduler.step()
    train_loss /= len(dataset["train"])
    train_acc /= len(dataset["train"])

    # Validation
    model.eval()
    val_loss, val_acc = 0, 0
    with torch.no_grad():
        for x, y in tqdm(valid_loader, desc=f"Epoch {epoch+1}/{EPOCHS} [Valid]"):
            x, y = x.to(DEVICE), y.to(DEVICE)
            out = model(x)
            loss = criterion(out, y)
            val_loss += loss.item() * x.size(0)
            val_acc += metric_acc(out, y) * x.size(0)

    val_loss /= len(dataset["val"])
    val_acc /= len(dataset["val"])

    print(f"Epoch {epoch+1}/{EPOCHS} | Train Loss: {train_loss:.4f} | "
          f"Train Acc: {train_acc:.4f} | Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), "best_simple_cnn.pt")
        print("✅ Saved new best model!")

Epoch 1/20 [Train]: 100%|██████████| 481/481 [01:00<00:00,  7.91it/s]
Epoch 1/20 [Valid]: 100%|██████████| 57/57 [00:05<00:00, 10.71it/s]


Epoch 1/20 | Train Loss: 0.1347 | Train Acc: 0.9526 | Val Loss: 0.0763 | Val Acc: 0.9740
✅ Saved new best model!


Epoch 2/20 [Train]:  69%|██████▊   | 330/481 [00:45<00:23,  6.41it/s]

In [22]:
# Load model
model = SimpleAudioCNN().to(DEVICE)
model.load_state_dict(torch.load("best_simple_cnn.pt", map_location=DEVICE))
model.eval()
print("Loaded best model and set to eval mode")

Loaded best model and set to eval mode


In [23]:
all_preds = []
all_probs = []
all_labels = []

with torch.no_grad():
    for x, y in tqdm(test_loader, desc="Testing"):
        x = x.to(DEVICE)
        y = y.to(DEVICE)
        logits = model(x)
        probs = torch.sigmoid(logits)
        preds = (probs > 0.5).float()

        all_preds.extend(preds.cpu().numpy())
        all_probs.extend(probs.cpu().numpy())
        all_labels.extend(y.cpu().numpy().flatten())

Testing: 100%|██████████| 29/29 [00:05<00:00,  5.00it/s]


In [21]:
# -----------------------------
# Metrics
# -----------------------------
import pandas as pd
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

df_results = pd.DataFrame({
    "true_label": all_labels,
    "pred_label": all_preds,
    "confidence": all_probs
})

accuracy = accuracy_score(all_labels, all_preds)
print(f"Test Accuracy: {accuracy*100:.2f}%")

# print("\nClassification Report:")
# print(classification_report(all_labels, all_preds, target_names=dataset["train"].features["label"].names, digits=3))

cm = confusion_matrix(all_labels, all_preds)
print(cm)


Test Accuracy: 95.76%
[[5171  162]
 [ 144 1743]]
