<a href="https://colab.research.google.com/github/blessingoraz/baby-cry-classifier/blob/main/01_training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

CNN for audio files
- Do some processing
- Check audio lengths/duration and sample rate
- How do you detect noise?
- Convert audio to images(checkout mel)
- Split dataset to training and test

Training
- Transfer learning
- Adjusting learning rate
- check-pointing
- Regularization and Dropout
- Data Augmentation
- Training a larger model
Using the model

In [59]:
!pip install torch torchvision torchaudio scikit-learn



In [83]:
import numpy as np
from sklearn.metrics import f1_score, classification_report

X_train = np.load("processed/X_train.npy")  # (N, 1, n_mels, time)
y_train = np.load("processed/y_train.npy")  # (N,)
X_val   = np.load("processed/X_val.npy")
y_val   = np.load("processed/y_val.npy")
X_test  = np.load("processed/X_test.npy")
y_test  = np.load("processed/y_test.npy")

X_train.shape, y_train.shape


((306, 1, 128, 219), (306,))

## Dataset + DataLoader (with SpecAugment)

Weâ€™ll do simple augmentation only in training:

Random time masking

Random freq masking

These work great for spectrograms and are easy.

In [61]:
import torch
from torch.utils.data import Dataset

class BabyCryNpyDataset(Dataset):
    """
    - Instead of loading PIL images from disk, we load spectrogram tensors from numpy arrays.
    - 'transform' works like torchvision transforms: it modifies the spectrogram before returning it.
    """
    def __init__(self, X, y, transform=None):
        self.X = X  # numpy array: (N, 1, n_mels, time)
        self.y = y  # numpy array: (N,)
        self.transform = transform

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        spec = torch.tensor(self.X[idx], dtype=torch.float32)  # (1, n_mels, time)
        label = torch.tensor(self.y[idx], dtype=torch.long)

        if self.transform:
            spec = self.transform(spec)

        return spec, label


In [62]:
# Compose (like torchvision.transforms.Compose)
class Compose:
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, x):
        for t in self.transforms:
            x = t(x)
        return x


In [63]:
# Simple SpecAugment transforms
import torchaudio.transforms as T

class SpecAugment:
    def __init__(self, freq_mask=12, time_mask=24, p=0.7):
        self.freq = T.FrequencyMasking(freq_mask_param=freq_mask)
        self.time = T.TimeMasking(time_mask_param=time_mask)
        self.p = p

    def __call__(self, x):
        # x: (1, n_mels, time)
        if torch.rand(1).item() < self.p:
            x = self.freq(x)
            x = self.time(x)
        return x


In [64]:
# Resize to 224x224
import torch.nn.functional as F

class ResizeSpec:
    def __init__(self, height=224, width=224):
        self.height = height
        self.width = width

    def __call__(self, x):
        # x: (1, H, W)
        x = x.unsqueeze(0)  # (1,1,H,W)
        x = F.interpolate(x, size=(self.height, self.width), mode="bilinear", align_corners=False)
        return x.squeeze(0)  # (1, height, width)


### Create train/val transforms

In [65]:
'''
Train: resize + augmentation

Validation: resize only

Test: resize only
'''
train_transforms = Compose([
    ResizeSpec(224, 224),     # like transforms.Resize
    SpecAugment(freq_mask=12, time_mask=24, p=0.7)  # augmentation
])

val_transforms = Compose([
    ResizeSpec(224, 224)
])

test_transforms = Compose([
    ResizeSpec(224, 224)
])


### Create DataLoaders

In [66]:
from torch.utils.data import DataLoader

train_dataset = BabyCryNpyDataset(X_train, y_train, transform=train_transforms)
val_dataset   = BabyCryNpyDataset(X_val, y_val, transform=val_transforms)
test_dataset  = BabyCryNpyDataset(X_test, y_test, transform=val_transforms)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2, pin_memory=True)
val_loader   = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=2, pin_memory=True)
test_loader  = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=2, pin_memory=True)

# sanity check
xb, yb = next(iter(train_loader))
xb.shape, yb.shape


(torch.Size([32, 1, 224, 224]), torch.Size([32]))

### Handle class imbalance (weighted loss)

Because the dataset is highly imbalanced, I used class-weighted cross-entropy to penalize misclassification of minority classes more heavily. This encourages the model to learn discriminative features for all cry types rather than overfitting to the majority class.

In [67]:
import numpy as np
from collections import Counter

counts = Counter(y_train.tolist())
num_classes = len(set(y_train.tolist()))

class_counts = np.array([counts[i] for i in range(num_classes)], dtype=np.float32)
class_weights = class_counts.sum() / (num_classes * class_counts)
class_weights = torch.tensor(class_weights, dtype=torch.float32)

class_weights


tensor([3.8250, 3.8250, 7.6500, 2.1250, 0.1678, 5.4643, 3.1875, 2.3906])

In [68]:
for i, w in enumerate(class_weights):
    print(f"Class {i}: weight={w:.2f}, count={counts[i]}")


Class 0: weight=3.83, count=10
Class 1: weight=3.83, count=10
Class 2: weight=7.65, count=5
Class 3: weight=2.12, count=18
Class 4: weight=0.17, count=228
Class 5: weight=5.46, count=7
Class 6: weight=3.19, count=12
Class 7: weight=2.39, count=16


### Transfer Learning Model (ResNet)

I chose ResNet18 as my baseline because it provides a strong balance between model capacity and stability, which is especially important for small, imbalanced datasets. Its residual connections help prevent overfitting and make transfer learning more effective on spectrogram-based audio data.

We also add Dropout before the classifier head.

In [69]:
import torch.nn as nn
import torchvision.models as models

class CryResNet(nn.Module):
    def __init__(self, num_classes, backbone="resnet18", pretrained=True, dropout=0.3, freeze_backbone=True):
        super().__init__()

        # Load pre-trained Resnet18
        if backbone == "resnet18":
            self.base_model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT if pretrained else None)
        elif backbone == "resnet34":
            self.base_model = models.resnet34(weights=models.ResNet34_Weights.DEFAULT if pretrained else None)
        else:
            raise ValueError("backbone must be resnet18 or resnet34")

        # Freeze base model parameters
        if freeze_backbone:
            for param in self.base_model.parameters():
                param.requires_grad = False

        # Replace classifier head
        in_features = self.base_model.fc.in_features
        self.base_model.fc = nn.Sequential(
            nn.Dropout(dropout),
            nn.Linear(in_features, num_classes)
        )

    def forward(self, x):
        # x: (B, 1, n_mels, time) -> convert to 3-channel
        x = x.repeat(1, 3, 1, 1)
        return self.base_model(x)


### Train the Model

In [70]:
import os
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = CryResNet(num_classes=num_classes, backbone="resnet18", pretrained=True, dropout=0.3)
model.to(device)

criterion = nn.CrossEntropyLoss(weight=class_weights).to(device)
optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)

scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode="min", factor=0.5, patience=2
)

I used a ReduceLROnPlateau scheduler to automatically lower the learning rate when validation loss stopped improving, allowing for more stable fine-tuning on a small and imbalanced dataset.

### Training + validation loops
Because the dataset is highly imbalanced, accuracy alone is insufficient. I therefore tracked macro-F1 and per-class recall during validation to ensure that minority cry categories were not ignored.

NB: Do this later:
- use macro F1
- summarize results in a table

In [85]:
def macro_f1_from_logits(logits, labels):
    preds = torch.argmax(logits, dim=1).detach().cpu().numpy()
    true  = labels.detach().cpu().numpy()
    return f1_score(true, preds, average="macro")


In [84]:
import os

def train_and_evaluate(
    model,
    optimizer,
    scheduler,
    train_loader,
    val_loader,
    criterion,
    num_epochs,
    device,
    ckpt_dir="models",
    ckpt_name="best.pt"):

    os.makedirs(ckpt_dir, exist_ok=True)
    best_val_acc = 0.0
    best_epoch = 0
    best_path = os.path.join(ckpt_dir, ckpt_name)

    for epoch in range(num_epochs):
        # TRAIN
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_loss = running_loss / len(train_loader)
        train_acc = correct / total

        # VALIDATION
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

        val_loss /= len(val_loader)
        val_acc = val_correct / val_total

        # Scheduler watches val_loss
        scheduler.step(val_loss)

        # ===== CHECKPOINT =====
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_epoch = epoch + 1

            torch.save({
                "epoch": best_epoch,
                "model_state_dict": model.state_dict(),
                "optimizer_state_dict": optimizer.state_dict(),
                "best_val_acc": best_val_acc,
            }, best_path)

        print(f"Epoch {epoch+1}/{num_epochs}")
        print(f"  Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
        print(f"  Val   Loss: {val_loss:.4f}, Val   Acc: {val_acc:.4f}")

    return best_val_acc, best_epoch, best_path


### Tuning the Learning Rate
- Try multiple values: [0.0001, 0.001, 0.01, 0.1]
- Train for a few epochs each
- Compare validation accuracy
- Choose the rate with best performance and smallest train/val gap

In [73]:
def make_model(learning_rate=1e-3):
    print(f"Number of classes: {num_classes}")
    model = CryResNet(num_classes=num_classes)
    model.to(device)

    optimizer = optim.AdamW(
        model.parameters(),
        lr=learning_rate,
        weight_decay=1e-4
    )

    return model, optimizer

Testing different learning rates:

In [74]:
import random
import numpy as np


def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

In [75]:
results = []
learning_rates = [0.0001, 0.001, 0.01, 0.1]

for lr in learning_rates:
    print(f'\n=== Learning Rate: {lr} ===')
    set_seed(42)
    model, optimizer = make_model(learning_rate=lr)

    criterion = nn.CrossEntropyLoss(weight=class_weights).to(device)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.5, patience=2)
    best_val_acc, best_epoch = train_and_evaluate(
        model=model,
        optimizer=optimizer,
        scheduler=scheduler,
        train_loader=train_loader,
        val_loader=val_loader,
        criterion=criterion,
        num_epochs=20,
        device=device
    )
    results.append((lr, best_val_acc, best_epoch))

print("\nSummary (best val acc per LR):")
for lr, acc, ep in results:
    print(f"lr={lr:<7} best_val_acc={acc:.4f} at epoch {ep}")


=== Learning Rate: 0.0001 ===
Number of classes: 8
Epoch 1/20
  Train Loss: 2.2762, Train Acc: 0.0980
  Val   Loss: 2.1879, Val   Acc: 0.2621
Epoch 2/20
  Train Loss: 2.2079, Train Acc: 0.1275
  Val   Loss: 2.2277, Val   Acc: 0.4272
Epoch 3/20
  Train Loss: 2.2493, Train Acc: 0.2353
  Val   Loss: 2.2581, Val   Acc: 0.5534
Epoch 4/20
  Train Loss: 2.2623, Train Acc: 0.1928
  Val   Loss: 2.1853, Val   Acc: 0.6505
Epoch 5/20
  Train Loss: 2.1338, Train Acc: 0.2190
  Val   Loss: 2.1666, Val   Acc: 0.5922
Epoch 6/20
  Train Loss: 2.1276, Train Acc: 0.2288
  Val   Loss: 2.1373, Val   Acc: 0.5340
Epoch 7/20
  Train Loss: 2.1820, Train Acc: 0.1732
  Val   Loss: 2.0818, Val   Acc: 0.4563
Epoch 8/20
  Train Loss: 2.2176, Train Acc: 0.2026
  Val   Loss: 2.0263, Val   Acc: 0.3786
Epoch 9/20
  Train Loss: 2.0726, Train Acc: 0.2157
  Val   Loss: 2.0202, Val   Acc: 0.2136
Epoch 10/20
  Train Loss: 2.1080, Train Acc: 0.2353
  Val   Loss: 2.0565, Val   Acc: 0.1553
Epoch 11/20
  Train Loss: 2.0968, Tra

The best learning rate is 0.01 (accuracy 0.7087).

 ### Adding Inner Layers

In ResNet, the model already does pooling + flatten internally, so you can just replace the fc head with a small MLP (inner layer + ReLU + dropout + output).

In [76]:
import torch
import torch.nn as nn
import torchvision.models as models

class CryResNet(nn.Module):
    def __init__(
        self,
        num_classes,
        backbone="resnet18",
        pretrained=True,
        freeze_backbone=True,
        size_inner=256,       # <- inner layer size
    ):
        super().__init__()

        # Load backbone
        if backbone == "resnet18":
            self.base_model = models.resnet18(
                weights=models.ResNet18_Weights.DEFAULT if pretrained else None
            )
        elif backbone == "resnet34":
            self.base_model = models.resnet34(
                weights=models.ResNet34_Weights.DEFAULT if pretrained else None
            )
        else:
            raise ValueError("backbone must be resnet18 or resnet34")

        # Freeze backbone
        if freeze_backbone:
            for p in self.base_model.parameters():
                p.requires_grad = False

        # Replace classifier head with: inner -> ReLU -> output
        in_features = self.base_model.fc.in_features  # resnet18 => usually 512

        self.base_model.fc = nn.Sequential(
            nn.Linear(in_features, size_inner),
            nn.ReLU(),
            nn.Linear(size_inner, num_classes)
        )

        # Ensure head is trainable (safe even if freeze_backbone=True)
        for p in self.base_model.fc.parameters():
            p.requires_grad = True

    def forward(self, x):
        # (B,1,224,224) -> (B,3,224,224)
        x = x.repeat(1, 3, 1, 1)
        return self.base_model(x)


In [79]:
import torch.optim as optim

def make_model(learning_rate=0.01, size_inner=256, backbone="resnet18"):
    model = CryResNet(
        num_classes=num_classes,
        backbone=backbone,
        pretrained=True,
        freeze_backbone=True,
        size_inner=size_inner,
    ).to(device)

    optimizer = optim.AdamW(
        [p for p in model.parameters() if p.requires_grad],
        lr=learning_rate,
        weight_decay=1e-4
    )
    return model, optimizer


In [82]:
inner_sizes = [64, 128, 256, 512]
learning_rate = 0.01

for size_inner in inner_sizes:
    print(f"\n=== inner={size_inner} ===")
    model, optimizer = make_model(learning_rate=learning_rate, size_inner=size_inner)

    criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.5, patience=2)

    best_val_acc, best_epoch, best_path = train_and_evaluate(
        model=model,
        optimizer=optimizer,
        scheduler=scheduler,
        train_loader=train_loader,
        val_loader=val_loader,
        criterion=criterion,
        num_epochs=10,
        device=device
    )

    print(f"Best val acc={best_val_acc:.4f} at epoch {best_epoch} | saved: {best_path}")



=== inner=64 ===
Epoch 1/10
  Train Loss: 3.0428, Train Acc: 0.1176
  Val   Loss: 2.0387, Val   Acc: 0.0291
Epoch 2/10
  Train Loss: 2.0787, Train Acc: 0.0556
  Val   Loss: 2.0371, Val   Acc: 0.0583
Epoch 3/10
  Train Loss: 2.0816, Train Acc: 0.0523
  Val   Loss: 2.0643, Val   Acc: 0.0583
Epoch 4/10
  Train Loss: 2.0611, Train Acc: 0.0458
  Val   Loss: 2.0735, Val   Acc: 0.0388
Epoch 5/10
  Train Loss: 2.0871, Train Acc: 0.0425
  Val   Loss: 2.0744, Val   Acc: 0.0388
Epoch 6/10
  Train Loss: 2.0783, Train Acc: 0.0359
  Val   Loss: 2.0750, Val   Acc: 0.0388
Epoch 7/10
  Train Loss: 2.0897, Train Acc: 0.0392
  Val   Loss: 2.0499, Val   Acc: 0.0485
Epoch 8/10
  Train Loss: 2.0614, Train Acc: 0.0359
  Val   Loss: 1.9832, Val   Acc: 0.0388
Epoch 9/10
  Train Loss: 2.0561, Train Acc: 0.0359
  Val   Loss: 1.9700, Val   Acc: 0.0485
Epoch 10/10
  Train Loss: 2.0533, Train Acc: 0.0392
  Val   Loss: 2.0832, Val   Acc: 0.3301
Best val acc=0.3301 at epoch 10 | saved: models/best.pt

=== inner=128 

In [None]:
# Reload best model later

ckpt = torch.load("models/best.pt", map_location=device)
model.load_state_dict(ckpt["model_state_dict"])
print("Loaded best checkpoint from epoch", ckpt["epoch"], "val_acc", ckpt["best_val_acc"])


### Dropout Regularization

### Using the model (single prediction)

This predicts from a spectrogram already in your .npy format.

In [None]:
import json

# If you saved label_map earlier (recommended):
# with open("data/splits/label_map.json") as f:
#     label_info = json.load(f)
# id2label = {int(k): v for k, v in label_info["id2label"].items()}

# Quick fallback:
id2label = {i: c for i, c in enumerate(sorted(os.listdir("data/raw")))}

@torch.no_grad()
def predict_one(spec_np):
    """
    spec_np: numpy array with shape (1, n_mels, time) OR (n_mels, time)
    returns: (label, confidence, probs)
    """
    if spec_np.ndim == 2:
        spec_np = spec_np[np.newaxis, :, :]
    x = torch.tensor(spec_np, dtype=torch.float32).unsqueeze(0).to(device)  # (1,1,mels,time)
    logits = model(x)
    probs = torch.softmax(logits, dim=1).cpu().numpy()[0]
    pred_id = int(np.argmax(probs))
    return id2label[pred_id], float(probs[pred_id]), probs

label, conf, probs = predict_one(X_test[0][0])
label, conf
