In [1]:
import os
from pathlib import Path
import random

import librosa
import numpy as np
import matplotlib.pyplot as plt
from sklearn import metrics

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

# Reproducibility
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

# Path to the dataset (relative to the notebook)
DATA_ROOT = Path("../data")
MACHINE_TYPES = ["fan", "pump", "slider", "ToyCar", "ToyConveyor", "valve"]

# Audio and feature configuration
SAMPLE_RATE = 16000
N_FFT = 1024
HOP_LENGTH = 512
N_MELS = 128

# (voor proxy-outlier gaan we vaak met volledige spectrogrammen werken, maar
# het is prima om deze patch-config ook alvast klaar te hebben voor evt. sliding windows)
PATCH_FRAMES = 64
PATCH_HOP = 32

print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())

if torch.cuda.is_available():
    print("Number of CUDA devices:", torch.cuda.device_count())
    print("Current device:", torch.cuda.current_device())
    print("Device name:", torch.cuda.get_device_name(torch.cuda.current_device()))
    device = torch.device("cuda")
else:
    print("CUDA not available, falling back to CPU.")
    device = torch.device("cpu")

print("Using device:", device)


PyTorch version: 2.9.1+cu126
CUDA available: True
Number of CUDA devices: 1
Current device: 0
Device name: NVIDIA GeForce RTX 4070 Laptop GPU
Using device: cuda


In [2]:
def scan_files(data_root=DATA_ROOT, machine_types=None):
    """
    Scan ../data and return a list of dictionaries:
        {
            'path': Path,
            'machine_type': str,
            'split': 'train' or 'test',
            'label': 0 (normal) or 1 (anomaly)
        }

    Deze structuur is hetzelfde als in de IDC-TransAE notebook,
    zodat je file_list in beide experimenten uitwisselbaar is.
    """
    if machine_types is None:
        machine_types = MACHINE_TYPES

    all_files = []

    for mtype in machine_types:
        for split in ["train", "test"]:
            split_dir = data_root / mtype / split
            if not split_dir.exists():
                continue

            for fname in sorted(split_dir.glob("*.wav")):
                name_lower = fname.name.lower()

                if "normal" in name_lower:
                    label = 0
                elif "anomaly" in name_lower:
                    label = 1
                else:
                    # fallback als de bestandsnaam geen expliciet label bevat
                    label = None

                all_files.append({
                    "path": fname,
                    "machine_type": mtype,
                    "split": split,
                    "label": label,
                })

    return all_files


file_list = scan_files()
print(len(file_list))
file_list[:5]


30987


[{'path': WindowsPath('../data/fan/train/normal_id_00_00000000.wav'),
  'machine_type': 'fan',
  'split': 'train',
  'label': 0},
 {'path': WindowsPath('../data/fan/train/normal_id_00_00000001.wav'),
  'machine_type': 'fan',
  'split': 'train',
  'label': 0},
 {'path': WindowsPath('../data/fan/train/normal_id_00_00000002.wav'),
  'machine_type': 'fan',
  'split': 'train',
  'label': 0},
 {'path': WindowsPath('../data/fan/train/normal_id_00_00000003.wav'),
  'machine_type': 'fan',
  'split': 'train',
  'label': 0},
 {'path': WindowsPath('../data/fan/train/normal_id_00_00000004.wav'),
  'machine_type': 'fan',
  'split': 'train',
  'label': 0}]

In [3]:
def load_audio(path, sr=SAMPLE_RATE):
    """
    Load audio file as mono at a fixed sample rate.

    Returns:
        audio: np.ndarray, shape (samples,)
        sr   : int, sample rate
    """
    audio, sr = librosa.load(path, sr=sr, mono=True)
    return audio, sr


def audio_to_logmelspec(
    audio,
    sr,
    n_fft=N_FFT,
    hop_length=HOP_LENGTH,
    n_mels=N_MELS,
):
    """
    Convert a waveform into a log-Mel spectrogram.
    audio : 1D numpy array
    sr    : sample rate

    Returns:
        log-Mel spectrogram of shape (n_mels, n_frames)
    """
    S = librosa.feature.melspectrogram(
        y=audio,
        sr=sr,
        n_fft=n_fft,
        hop_length=hop_length,
        n_mels=n_mels,
        power=2.0,
    )

    logS = np.log(S + 1e-12)
    return logS


In [4]:
def build_target_vs_po_lists(file_list, target_type):
    """
    Bouw twee lijsten:
        target_train : alle NORMAL train samples van target machine
        po_train     : alle NORMAL train samples van alle andere machines

    Voor evaluatie:
        target_test_normal
        target_test_anomaly

    Returns dictionary met 4 lijsten.
    """
    target_train = []
    po_train = []
    target_test_normal = []
    target_test_anomaly = []

    for entry in file_list:
        m = entry["machine_type"]
        split = entry["split"]
        label = entry["label"]

        # Train splits
        if split == "train":
            if m == target_type and label == 0:
                target_train.append(entry)
            elif m != target_type and label == 0:
                po_train.append(entry)

        # Test splits for final evaluation
        elif split == "test":
            if m == target_type:
                if label == 0:
                    target_test_normal.append(entry)
                elif label == 1:
                    target_test_anomaly.append(entry)

    return {
        "target_train": target_train,
        "po_train": po_train,
        "target_test_normal": target_test_normal,
        "target_test_anomaly": target_test_anomaly,
    }


In [5]:
splits = build_target_vs_po_lists(file_list, target_type="fan")
for k, v in splits.items():
    print(k, len(v))


target_train 3675
po_train 16444
target_test_normal 400
target_test_anomaly 1475


In [6]:
class ProxyOUDataset(Dataset):
    """
    Binaire classificatie:
        label=1 → target machine (normal)
        label=0 → proxy outlier sample
    """

    def __init__(self, entries, label, transform=None):
        """
        entries : list of dicts uit scan_files()
        label   : class label (1 = target, 0 = proxy outlier)
        """
        self.entries = entries
        self.label = label
        self.transform = transform

    def __len__(self):
        return len(self.entries)

    def __getitem__(self, idx):
        item = self.entries[idx]
        path = item["path"]

        audio, sr = load_audio(path)
        spec = audio_to_logmelspec(audio, sr)   # (n_mels, n_frames)

        # optioneel: korte spectrogrammen pad-of-crop
        spec = torch.tensor(spec, dtype=torch.float32)

        # Return (C, H, W)
        spec = spec.unsqueeze(0)   # [1, n_mels, n_frames]

        y = torch.tensor(self.label, dtype=torch.long)
        return spec, y


In [7]:
def collate_spectrogram_batch(batch):
    """
    Pad alle spectrogrammen in de batch rechts met nullen zodat
    ze dezelfde tijd-dimensie (n_frames) hebben.

    batch: list of (spec, label), waarbij
           spec: [1, n_mels, T_i]
           label: scalar tensor
    return:
           x: [B, 1, n_mels, T_max]
           y: [B]
    """
    specs, labels = zip(*batch)  # tuples van tensors

    # Bepaal maximale lengte in deze batch
    widths = [s.shape[-1] for s in specs]
    max_w = max(widths)

    batch_size = len(specs)
    # Init met nullen
    x = torch.zeros(batch_size, 1, N_MELS, max_w, dtype=specs[0].dtype)

    for i, s in enumerate(specs):
        T = s.shape[-1]
        x[i, :, :, :T] = s  # vul tot eigen lengte

    y = torch.stack(labels, dim=0)

    return x, y


In [8]:
# Klein testje met een handgemaakte "batch"
a = torch.randn(1, N_MELS, 313)
b = torch.randn(1, N_MELS, 344)

xb, yb = collate_spectrogram_batch([(a, torch.tensor(1)), (b, torch.tensor(0))])
print(xb.shape, yb.shape)

torch.Size([2, 1, 128, 344]) torch.Size([2])


In [9]:
target_train_ds = ProxyOUDataset(splits["target_train"], label=1)
po_train_ds     = ProxyOUDataset(splits["po_train"], label=0)

print(len(target_train_ds), len(po_train_ds))

spec, label = target_train_ds[0]
spec.shape, label


3675 16444


(torch.Size([1, 128, 313]), tensor(1))

In [10]:
class SmallConvClassifier(nn.Module):
    """
    Simpele CNN die log-Mel spectrogrammen classificeert.
    Output: 2 logits (target vs PO)
    """

    def __init__(self):
        super().__init__()

        self.conv = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d((2, 2)),

            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d((2, 2)),

            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveMaxPool2d((8, 8))
        )

        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64 * 8 * 8, 128),
            nn.ReLU(),
            nn.Linear(128, 2)
        )

    def forward(self, x):
        x = self.conv(x)
        x = self.fc(x)
        return x


In [11]:
model = SmallConvClassifier().to(device)

dummy = torch.randn(2, 1, 128, 200).to(device)
out = model(dummy)
out


tensor([[-0.0706, -0.0140],
        [-0.0744, -0.0036]], device='cuda:0', grad_fn=<AddmmBackward0>)

In [12]:
def train_proxy_epoch(model, loader, optimizer, device):
    model.train()
    total_loss = 0
    correct = 0
    count = 0

    for x, y in loader:
        x = x.to(device)
        y = y.to(device)

        optimizer.zero_grad()
        logits = model(x)
        loss = F.cross_entropy(logits, y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * x.size(0)

        preds = logits.argmax(1)
        correct += (preds == y).sum().item()
        count += x.size(0)

    return total_loss / count, correct / count


In [13]:
def eval_proxy_epoch(model, loader, device):
    model.eval()
    total_loss = 0
    correct = 0
    count = 0
    scores = []
    labels = []

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            y = y.to(device)

            logits = model(x)
            loss = F.cross_entropy(logits, y)

            total_loss += loss.item() * x.size(0)
            preds = logits.argmax(1)
            correct += (preds == y).sum().item()
            count += x.size(0)

            # Probability of class 1 (target)
            probs = F.softmax(logits, dim=1)[:, 1]
            scores.extend(probs.cpu().numpy())
            labels.extend(y.cpu().numpy())

    return (
        total_loss / count,
        correct / count,
        np.array(scores),
        np.array(labels),
    )


In [14]:
# Combine target + PO into one training dataset
train_ds = torch.utils.data.ConcatDataset([
    ProxyOUDataset(splits["target_train"], label=1),
    ProxyOUDataset(splits["po_train"], label=0)
])

test_ds = torch.utils.data.ConcatDataset([
    ProxyOUDataset(splits["target_test_normal"], label=1),
    ProxyOUDataset(splits["target_test_anomaly"], label=0),
])


In [15]:
from torch.utils.data import WeightedRandomSampler

num_target = len(splits["target_train"])
num_proxy = len(splits["po_train"])

weights = [1/num_target]*num_target + [1/num_proxy]*num_proxy

sampler = WeightedRandomSampler(weights, num_samples=num_target+num_proxy, replacement=True)

train_loader = DataLoader(
    train_ds,
    batch_size=16,
    sampler=sampler,
    collate_fn=collate_spectrogram_batch,
)

test_loader = DataLoader(
    test_ds,
    batch_size=8,
    shuffle=False,
    collate_fn=collate_spectrogram_batch,
)

In [None]:
model = SmallConvClassifier().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

loss, acc = train_proxy_epoch(model, train_loader, optimizer, device)
print("Train loss:", loss, "Acc:", acc)

In [None]:
test_ds = torch.utils.data.ConcatDataset([
    ProxyOUDataset(splits["target_test_normal"], label=1),
    ProxyOUDataset(splits["target_test_anomaly"], label=0)
])

test_loader = DataLoader(test_ds, batch_size=8, shuffle=False)

test_loss, test_acc, scores, labels = eval_proxy_epoch(model, test_loader, device)

auc = metrics.roc_auc_score(labels, scores)
print("Test Loss:", test_loss)
print("Test Accuracy:", test_acc)
print("AUC:", auc)


Test Loss: 1.4292188715338707
Test Accuracy: 0.37546666666666667
AUC: 0.6665152542372882


In [None]:
print("Train target samples:", len(splits["target_train"]))
print("Train proxy samples :", len(splits["po_train"]))
print("Test normal samples :", len(splits["target_test_normal"]))
print("Test anomaly samples:", len(splits["target_test_anomaly"]))


Train target samples: 3675
Train proxy samples : 16444
Test normal samples : 400
Test anomaly samples: 1475
