In [3]:
#one level up into project folder
import os
#os.chdir("..")

#print("Current working directory:", os.getcwd())

In [26]:
import os
import sys
import glob
import numpy as np
from PIL import Image

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import torchvision.transforms as T

import mlflow
import dagshub

from visioninfantnet.utils.ml_utils.metric.classification_metric import (
    get_classification_score,
)
from visioninfantnet.exception.exception import VisionInfantNetException

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device
print(f"Using device: {device}")

Using device: cuda


In [5]:
from dotenv import load_dotenv
load_dotenv()
mlflow_uri = os.getenv("MLFLOW_TRACKING_URI")
mlflow.set_tracking_uri(mlflow_uri)
dagshub.init(repo_owner='arunps12', repo_name='VisionInfantNet', mlflow=True)

import mlflow
with mlflow.start_run():
  mlflow.log_param('parameter name', 'value')
  mlflow.log_metric('metric name', 1)

üèÉ View run receptive-duck-897 at: https://dagshub.com/arunps12/VisionInfantNet.mlflow/#/experiments/0/runs/eb73d4a856de40f5afcde1a29d8b3a20
üß™ View experiment at: https://dagshub.com/arunps12/VisionInfantNet.mlflow/#/experiments/0


In [6]:
#Paths and data loading helper

ARTIFACT_ROOT = os.getenv("ARTIFACT_ROOT")

TRAIN_IMG_DIR = os.path.join(ARTIFACT_ROOT, "data_transformation", "spectrograms", "train")
VAL_IMG_DIR   = os.path.join(ARTIFACT_ROOT, "data_transformation", "spectrograms", "valid")

TRAIN_LABEL_NPY = os.path.join(ARTIFACT_ROOT, "data_transformation", "features", "train_labels.npy")
VAL_LABEL_NPY   = os.path.join(ARTIFACT_ROOT, "data_transformation", "features", "valid_labels.npy")

print(TRAIN_IMG_DIR)
print(VAL_IMG_DIR)
print(TRAIN_LABEL_NPY)
print(VAL_LABEL_NPY)


/itf-fi-ml/home/arunps/Projects/VisionInfantNet/artifacts/12_02_2025_10_27_46/data_transformation/spectrograms/train
/itf-fi-ml/home/arunps/Projects/VisionInfantNet/artifacts/12_02_2025_10_27_46/data_transformation/spectrograms/valid
/itf-fi-ml/home/arunps/Projects/VisionInfantNet/artifacts/12_02_2025_10_27_46/data_transformation/features/train_labels.npy
/itf-fi-ml/home/arunps/Projects/VisionInfantNet/artifacts/12_02_2025_10_27_46/data_transformation/features/valid_labels.npy


In [7]:
# Load labels and image paths
train_labels = np.load(TRAIN_LABEL_NPY)
val_labels   = np.load(VAL_LABEL_NPY)

train_image_paths = sorted(glob.glob(os.path.join(TRAIN_IMG_DIR, "*.png")))
val_image_paths   = sorted(glob.glob(os.path.join(VAL_IMG_DIR, "*.png")))

print(len(train_image_paths), len(train_labels))
print(len(val_image_paths), len(val_labels))

num_classes = len(np.unique(train_labels))
num_classes


3600 3600
3580 3580


5

In [27]:
import numpy as np

unique_labels = sorted(set(train_labels) | set(val_labels))
label_to_idx = {lab: i for i, lab in enumerate(unique_labels)}
idx_to_label = {i: lab for lab, i in label_to_idx.items()}

print("Label mapping:", label_to_idx)
num_classes = len(unique_labels)
print("num_classes:", num_classes)


Label mapping: {np.str_('Canonical'): 0, np.str_('Crying'): 1, np.str_('Junk'): 2, np.str_('Laughing'): 3, np.str_('Non-canonical'): 4}
num_classes: 5


In [28]:
train_labels_idx = np.array([label_to_idx[l] for l in train_labels])
val_labels_idx   = np.array([label_to_idx[l] for l in val_labels])
print(train_labels_idx[:10])
print(val_labels_idx[:10])

[1 1 1 1 1 1 1 1 1 1]
[1 1 1 1 1 1 1 1 1 1]


In [29]:
import numpy as np
import torch

# counts per class in TRAIN set
class_counts = np.bincount(train_labels_idx, minlength=num_classes)
print("Class counts:", class_counts)

# inverse-frequency weights ‚Üí higher for minority classes
class_weights = 1.0 / (class_counts + 1e-6)
print("Class weights (per class):", class_weights)

# per-sample weights
sample_weights = class_weights[train_labels_idx]     # shape: [num_train_samples]
sample_weights_tensor = torch.from_numpy(sample_weights).float()
print("Sample weights shape:", sample_weights_tensor.shape)
print("Sample weights (first 10):", sample_weights_tensor[:10])

Class counts: [ 444  243 1430   46 1437]
Class weights (per class): [0.00225225 0.00411523 0.0006993  0.02173913 0.00069589]
Sample weights shape: torch.Size([3600])
Sample weights (first 10): tensor([0.0041, 0.0041, 0.0041, 0.0041, 0.0041, 0.0041, 0.0041, 0.0041, 0.0041,
        0.0041])


In [30]:
import random
import torch

def _get_ft_axes(spec: torch.Tensor):
    """
    For image/spectrogram tensors shaped [C,H,W] or [H,W].
    Treat H as 'freq' and W as 'time'.
    """
    if spec.ndim == 3:   # [C, H, W]
        return 1, 2
    elif spec.ndim == 2: # [H, W]
        return 0, 1
    else:
        raise ValueError(f"Unexpected tensor shape: {spec.shape}")

def random_time_mask(spec, max_mask_pct=0.1, num_masks=1):
    """
    Mask along the time axis (width).
    """
    F_axis, T_axis = _get_ft_axes(spec)
    _, T = (spec.shape[F_axis], spec.shape[T_axis])
    out = spec.clone()
    max_mask = int(T * max_mask_pct)
    if max_mask < 1:
        return out
    for _ in range(num_masks):
        t = random.randint(0, max_mask)
        t0 = random.randint(0, max(0, T - t))
        idx = torch.arange(t0, t0 + t, device=spec.device)
        out.index_fill_(T_axis, idx, 0.0)
    return out

def random_freq_mask(spec, max_mask_pct=0.1, num_masks=1):
    """
    Mask along the frequency axis (height).
    """
    F_axis, T_axis = _get_ft_axes(spec)
    F_dim, _ = (spec.shape[F_axis], spec.shape[T_axis])
    out = spec.clone()
    max_mask = int(F_dim * max_mask_pct)
    if max_mask < 1:
        return out
    for _ in range(num_masks):
        f = random.randint(0, max_mask)
        f0 = random.randint(0, max(0, F_dim - f))
        idx = torch.arange(f0, f0 + f, device=spec.device)
        out.index_fill_(F_axis, idx, 0.0)
    return out

def random_time_shift(spec, max_shift_pct=0.1):
    """
    Roll along time axis.
    """
    F_axis, T_axis = _get_ft_axes(spec)
    _, T = (spec.shape[F_axis], spec.shape[T_axis])
    max_shift = int(T * max_shift_pct)
    if max_shift < 1:
        return spec
    shift = random.randint(-max_shift, max_shift)
    return torch.roll(spec, shifts=shift, dims=T_axis)

def random_gain(spec, min_gain=0.8, max_gain=1.2):
    """
    Multiply tensor by a random gain factor.
    """
    gain = random.uniform(min_gain, max_gain)
    return spec * gain

def apply_augmentations(spec, label, aug_cfg: dict):
    """
    Apply a combination of augmentations defined in aug_cfg to spec.
    """
    if aug_cfg.get("time_mask", False):
        spec = random_time_mask(
            spec,
            max_mask_pct=aug_cfg.get("time_mask_pct", 0.1),
            num_masks=aug_cfg.get("time_mask_num", 1),
        )

    if aug_cfg.get("freq_mask", False):
        spec = random_freq_mask(
            spec,
            max_mask_pct=aug_cfg.get("freq_mask_pct", 0.1),
            num_masks=aug_cfg.get("freq_mask_num", 1),
        )

    if aug_cfg.get("time_shift", False):
        spec = random_time_shift(
            spec,
            max_shift_pct=aug_cfg.get("time_shift_pct", 0.1),
        )

    if aug_cfg.get("gain", False):
        spec = random_gain(
            spec,
            min_gain=aug_cfg.get("gain_min", 0.8),
            max_gain=aug_cfg.get("gain_max", 1.2),
        )

    return spec


In [31]:
import torch
from torch.utils.data import Dataset
from torchvision import transforms as T
from PIL import Image

class SpectrogramImageDataset(Dataset):
    def __init__(self, image_paths, labels, train: bool = True, aug_cfg: dict = None):
        self.image_paths = image_paths
        self.labels = labels
        self.train = train
        self.aug_cfg = aug_cfg or {}

        base_transforms = [
            T.Resize((224, 224)),
            T.ToTensor(),              # -> [1, H, W], values [0,1]
        ]
        self.transforms = T.Compose(base_transforms)

        # Normalize to [-1,1] per channel
        self.normalize = T.Normalize(
            mean=[0.5, 0.5, 0.5],
            std=[0.5, 0.5, 0.5],
        )

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        path = self.image_paths[idx]
        label = int(self.labels[idx])

        # Load grayscale PNG (0..255)
        img = Image.open(path).convert("L")

        # Apply transforms ‚Üí [1,224,224] in [0,1]
        img = self.transforms(img)

        # Convert grayscale -> RGB by repeating channels ‚Üí [3,224,224]
        img = img.repeat(3, 1, 1)

        # Apply augmentations only for training
        if self.train and self.aug_cfg:
            img = apply_augmentations(img, label, self.aug_cfg)

        # Normalize to [-1,1]
        img = self.normalize(img)

        return img, torch.tensor(label, dtype=torch.long)


In [32]:
import numpy as np

def mixup_batch(inputs, labels, alpha=0.4):
    """
    MixUp for one batch.
    Returns mixed_inputs, targets_a, targets_b, lam.
    """
    if alpha <= 0:
        return inputs, labels, labels, 1.0

    lam = np.random.beta(alpha, alpha)
    batch_size = inputs.size(0)
    index = torch.randperm(batch_size, device=inputs.device)

    mixed_inputs = lam * inputs + (1 - lam) * inputs[index]
    targets_a, targets_b = labels, labels[index]
    return mixed_inputs, targets_a, targets_b, lam


In [33]:
class ConvBlock(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.block = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),  # downsample by /2
        )

    def forward(self, x):
        return self.block(x)


class BaseCNNSpectrogram(nn.Module):
    def __init__(
        self,
        num_classes: int,
        base_channels: int = 32,
        num_blocks: int = 3,
    ):
        super().__init__()

        channels = [3] + [base_channels * (2 ** i) for i in range(num_blocks)]
        conv_blocks = []
        for i in range(num_blocks):
            conv_blocks.append(ConvBlock(channels[i], channels[i+1]))
        self.conv = nn.Sequential(*conv_blocks)

        last_channels = channels[-1]
        spatial_size = 224 // (2 ** num_blocks)  # after MaxPool(2) num_blocks times
        self.flatten_dim = last_channels * spatial_size * spatial_size

        # Single DNN classification layer: Flatten -> Linear -> logits
        self.fc = nn.Linear(self.flatten_dim, num_classes)

    def forward(self, x):
        x = self.conv(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x


In [34]:
import torch.nn.init as init
import torch.nn as nn

def init_cnn_weights(m, init_method="pytorch_default"):
    """
    Apply chosen initialization method to Conv and Linear layers.
    For 'pytorch_default' we do nothing.
    """
    if not isinstance(m, (nn.Conv2d, nn.Linear)):
        return

    if init_method == "pytorch_default":
        return  # keep PyTorch default

    if init_method == "kaiming_normal":
        init.kaiming_normal_(m.weight, nonlinearity="relu")
    elif init_method == "kaiming_uniform":
        init.kaiming_uniform_(m.weight, nonlinearity="relu")
    elif init_method == "xavier_normal":
        init.xavier_normal_(m.weight)
    elif init_method == "xavier_uniform":
        init.xavier_uniform_(m.weight)

    if m.bias is not None:
        nn.init.zeros_(m.bias)


In [38]:
BATCH_SIZE = 64
LR = 1e-3
WEIGHT_DECAY = 1e-4
EPOCHS = 15

NUM_BLOCKS = 5
BASE_CHANNELS = 64
INIT_METHOD = "pytorch_default"

MLFLOW_EXPERIMENT = "cnn_exp4_augment_ablation"
mlflow.set_experiment(MLFLOW_EXPERIMENT)
print("Using MLflow experiment:", MLFLOW_EXPERIMENT)

def train_one_cnn_experiment(
    num_blocks: int,
    base_channels: int,
    train_dataset,
    val_dataset,
    num_classes: int,
    batch_size: int = 64,
    lr: float = 1e-3,
    weight_decay: float = 1e-4,
    epochs: int = 15,
    run_name: str = None,
    MODEL_DIR: str = "saved_models",
    init_method: str = "pytorch_default",
    use_mixup: bool = False,
    mixup_alpha: float = 0.4,
    aug_cfg_name: str = "none",
    use_weighted_sampler: bool = False,
    sample_weights: torch.Tensor = None,
):
    if use_weighted_sampler and sample_weights is not None:
        sampler = WeightedRandomSampler(
            weights=sample_weights,
            num_samples=len(sample_weights),
            replacement=True,
        )
        train_loader = DataLoader(
            train_dataset,
            batch_size=batch_size,
            sampler=sampler,
            num_workers=4,
            pin_memory=True,
        )
    else:
        train_loader = DataLoader(
            train_dataset,
            batch_size=batch_size,
            shuffle=True,
            num_workers=4,
            pin_memory=True,
    )
    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=4,
        pin_memory=True,
    )

    model = BaseCNNSpectrogram(
        num_classes=num_classes,
        base_channels=base_channels,
        num_blocks=num_blocks,
    ).to(device)

    model.apply(lambda m: init_cnn_weights(m, init_method=init_method))

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(
        model.parameters(),
        lr=lr,
        weight_decay=weight_decay,
    )

    if run_name is None:
        run_name = f"cnn_blocks{num_blocks}_base{base_channels}_{init_method}_{aug_cfg_name}"

    best_val_uar = 0.0
    best_epoch = -1

    os.makedirs(MODEL_DIR, exist_ok=True)

    with mlflow.start_run(run_name=run_name):
        mlflow.log_param("num_blocks", num_blocks)
        mlflow.log_param("base_channels", base_channels)
        mlflow.log_param("batch_size", batch_size)
        mlflow.log_param("lr", lr)
        mlflow.log_param("weight_decay", weight_decay)
        mlflow.log_param("epochs", epochs)
        mlflow.log_param("model_type", "BaseCNN_flatten_linear")
        mlflow.log_param("init_method", init_method)
        mlflow.log_param("use_mixup", use_mixup)
        mlflow.log_param("mixup_alpha", mixup_alpha)
        mlflow.log_param("aug_cfg_name", aug_cfg_name)

        for epoch in range(epochs):
            # ----- TRAIN -----
            model.train()
            train_loss = 0.0
            train_correct = 0
            train_total = 0

            for imgs, labels in train_loader:
                imgs = imgs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                if use_mixup:
                    imgs_mixed, targets_a, targets_b, lam = mixup_batch(
                        imgs, labels, alpha=mixup_alpha
                    )
                    outputs = model(imgs_mixed)
                    loss = lam * criterion(outputs, targets_a) + \
                           (1 - lam) * criterion(outputs, targets_b)
                else:
                    outputs = model(imgs)
                    loss = criterion(outputs, labels)

                loss.backward()
                optimizer.step()

                train_loss += loss.item() * imgs.size(0)
                preds = outputs.argmax(dim=1)
                train_correct += (preds == labels).sum().item()
                train_total += labels.size(0)

            train_loss /= train_total
            train_acc = train_correct / train_total

            # ----- VALIDATION -----
            model.eval()
            val_loss = 0.0
            val_correct = 0
            val_total = 0
            all_true, all_pred = [], []

            with torch.no_grad():
                for imgs, labels in val_loader:
                    imgs = imgs.to(device)
                    labels = labels.to(device)

                    outputs = model(imgs)
                    loss = criterion(outputs, labels)

                    val_loss += loss.item() * imgs.size(0)
                    preds = outputs.argmax(dim=1)

                    val_correct += (preds == labels).sum().item()
                    val_total += labels.size(0)

                    all_true.extend(labels.cpu().numpy().tolist())
                    all_pred.extend(preds.cpu().numpy().tolist())

            val_loss /= val_total
            val_acc = val_correct / val_total

            metrics = get_classification_score(
                y_true=all_true,
                y_pred=all_pred,
                average="weighted",
            )

            mlflow.log_metric("train_loss", train_loss, step=epoch)
            mlflow.log_metric("train_acc", train_acc, step=epoch)
            mlflow.log_metric("val_loss", val_loss, step=epoch)
            mlflow.log_metric("val_acc", val_acc, step=epoch)
            mlflow.log_metric("val_f1", metrics.f1_score, step=epoch)
            mlflow.log_metric("val_precision", metrics.precision_score, step=epoch)
            mlflow.log_metric("val_recall", metrics.recall_score, step=epoch)
            mlflow.log_metric("val_uar", metrics.uar, step=epoch)

            if metrics.uar > best_val_uar:
                best_val_uar = metrics.uar
                best_epoch = epoch
                model_path = os.path.join(
                    MODEL_DIR,
                    f"best_model_blocks{num_blocks}_base{base_channels}_init_{init_method}_aug_{aug_cfg_name}.pt"
                )
                torch.save(model.state_dict(), model_path)
                mlflow.log_artifact(model_path)

            print(
                f"[{run_name}] Epoch {epoch+1}/{epochs} | "
                f"Train Loss: {train_loss:.4f} Acc: {train_acc:.3f} | "
                f"Val Loss: {val_loss:.4f} Acc: {val_acc:.3f} | "
                f"F1: {metrics.f1_score:.3f} UAR: {metrics.uar:.3f}"
            )

        mlflow.log_metric("best_val_uar", best_val_uar)
        mlflow.log_param("best_epoch", best_epoch)

    return model, best_val_uar


Using MLflow experiment: cnn_exp4_augment_ablation


In [36]:
FULL_AUG_CFG = {
    "time_mask": True,
    "time_mask_pct": 0.1,
    "time_mask_num": 2,
    "freq_mask": True,
    "freq_mask_pct": 0.1,
    "freq_mask_num": 2,
    "time_shift": True,
    "time_shift_pct": 0.1,
    "gain": True,
    "gain_min": 0.8,
    "gain_max": 1.2,
}

AUG_EXPERIMENTS = {
    
    "all_aug": {
        "aug_cfg": FULL_AUG_CFG,
        "use_mixup": True,
    },
    "no_time_mask": {
        "aug_cfg": {**FULL_AUG_CFG, "time_mask": False},
        "use_mixup": True,
    },
    "no_freq_mask": {
        "aug_cfg": {**FULL_AUG_CFG, "freq_mask": False},
        "use_mixup": True,
    },
    "no_time_shift": {
        "aug_cfg": {**FULL_AUG_CFG, "time_shift": False},
        "use_mixup": True,
    },
    "no_gain": {
        "aug_cfg": {**FULL_AUG_CFG, "gain": False},
        "use_mixup": True,
    },
    "no_mixup": {
        "aug_cfg": FULL_AUG_CFG,
        "use_mixup": False,
    },

    

    
    "no_aug": {
        "aug_cfg": {},      
        "use_mixup": False,
    },

    
    "mixup_only": {
        "aug_cfg": {
            "time_mask": False,
            "freq_mask": False,
            "time_shift": False,
            "gain": False,
        },
        "use_mixup": True,
    },
}


In [39]:
aug_results = {}

for name, cfg in AUG_EXPERIMENTS.items():
    print(f"\n=== Running augmentation config: {name} ===")

    train_dataset = SpectrogramImageDataset(
        train_image_paths,
        train_labels_idx,
        train=True,
        aug_cfg=cfg["aug_cfg"],
    )

    val_dataset = SpectrogramImageDataset(
        val_image_paths,
        val_labels_idx,
        train=False,
        aug_cfg=None,
    )

    run_name = f"aug_{name}_blocks{NUM_BLOCKS}_base{BASE_CHANNELS}"

    model, best_uar = train_one_cnn_experiment(
        num_blocks=NUM_BLOCKS,
        base_channels=BASE_CHANNELS,
        train_dataset=train_dataset,
        val_dataset=val_dataset,
        num_classes=num_classes,
        batch_size=BATCH_SIZE,
        lr=LR,
        weight_decay=WEIGHT_DECAY,
        epochs=EPOCHS,
        run_name=run_name,
        MODEL_DIR="saved_models",
        init_method=INIT_METHOD,
        use_mixup=cfg["use_mixup"],
        mixup_alpha=0.4,
        aug_cfg_name=name,
        use_weighted_sampler=True,                 
        sample_weights=sample_weights_tensor, 
    )

    aug_results[name] = best_uar
    print(f"--> {name}: best UAR={best_uar:.4f}")
    del model
    torch.cuda.empty_cache()

print("\n*** Augmentation ablation summary ***")
for name, uar in aug_results.items():
    print(f"{name:15s} -> UAR={uar:.4f}")

best_cfg = max(aug_results, key=aug_results.get)
print(f"\n>>> Best augmentation config = {best_cfg} with UAR={aug_results[best_cfg]:.4f}")



=== Running augmentation config: all_aug ===
[aug_all_aug_blocks5_base64] Epoch 1/15 | Train Loss: 9.6731 Acc: 0.198 | Val Loss: 2.7912 Acc: 0.296 | F1: 0.305 UAR: 0.208
[aug_all_aug_blocks5_base64] Epoch 2/15 | Train Loss: 2.4970 Acc: 0.213 | Val Loss: 7.0259 Acc: 0.048 | F1: 0.063 UAR: 0.183
[aug_all_aug_blocks5_base64] Epoch 3/15 | Train Loss: 2.9495 Acc: 0.215 | Val Loss: 3.0683 Acc: 0.178 | F1: 0.166 UAR: 0.217
[aug_all_aug_blocks5_base64] Epoch 4/15 | Train Loss: 2.5504 Acc: 0.213 | Val Loss: 4.0270 Acc: 0.079 | F1: 0.093 UAR: 0.219
[aug_all_aug_blocks5_base64] Epoch 5/15 | Train Loss: 2.8584 Acc: 0.205 | Val Loss: 4.4925 Acc: 0.267 | F1: 0.254 UAR: 0.234
[aug_all_aug_blocks5_base64] Epoch 6/15 | Train Loss: 2.2459 Acc: 0.231 | Val Loss: 1.4509 Acc: 0.318 | F1: 0.306 UAR: 0.231
[aug_all_aug_blocks5_base64] Epoch 7/15 | Train Loss: 2.1179 Acc: 0.233 | Val Loss: 1.6644 Acc: 0.334 | F1: 0.345 UAR: 0.233
[aug_all_aug_blocks5_base64] Epoch 8/15 | Train Loss: 2.0060 Acc: 0.220 | Val L