In [1]:
import pandas as pd
df = pd.read_csv('data/Animal_Sound.csv')

In [2]:

df['path'] = df['name'].apply(lambda x: f"data/sounds/{x}")
df['path'][0]
audio_path = df['path'][3]
audio_path

'data/sounds/Lion_12.wav'

In [3]:
import numpy as np
import librosa
from scipy.signal import resample

def extract_and_stack_spectrograms(s, sr, window_sizes, T_target=None, F_target=None):
    """
    Extract multi-resolution spectrograms, project to same time dimension,
    and stack into a single 3D tensor (K, T, F).

    Parameters:
    - s: np.ndarray, audio waveform
    - sr: int, sample rate
    - window_sizes: list of int, FFT window sizes
    - T_target: int or None, target time frames (if None, use max across resolutions)
    - F_target: int or None, target frequency bins (if None, use min across resolutions)

    Returns:
    - xMR_stacked: np.ndarray of shape (K, T_target, F_target)
    """
    spectrograms = []
    T_list = []
    F_list = []

    # Step 1: Extract STFT magnitudes
    for ω in window_sizes:
        hop_length = ω // 4
        S = librosa.stft(s, n_fft=ω, hop_length=hop_length, win_length=ω)
        S_mag = np.abs(S).T  # Shape: (T_i, F_i)
        spectrograms.append(S_mag)
        T_list.append(S_mag.shape[0])
        F_list.append(S_mag.shape[1])

    # Set target dimensions
    T_target = T_target or max(T_list)
    F_target = F_target or min(F_list)  # To ensure all can be trimmed safely

    # Step 2–4: Resample time & trim freq, then stack
    processed = []
    for spec in spectrograms:
        # Resample along time axis to T_target
        spec_resampled = resample(spec, T_target, axis=0)

        # Trim or pad frequency axis to F_target
        if spec_resampled.shape[1] > F_target:
            spec_trimmed = spec_resampled[:, :F_target]
        else:
            pad_width = F_target - spec_resampled.shape[1]
            spec_trimmed = np.pad(spec_resampled, ((0, 0), (0, pad_width)))

        processed.append(spec_trimmed)

    # Stack to shape (K, T_target, F_target)
    xMR_stacked = np.stack(processed, axis=0)
    return xMR_stacked


In [4]:
import librosa
import torch

# lead file
s, sr = librosa.load(audio_path, sr=22050)

# # We propose to extract spectrograms of different temporal resolutions of FFT
window_sizes = [256, 512, 1024]

xMR = extract_and_stack_spectrograms(s, sr, window_sizes)

print(xMR.shape)  # (K=3, T, F)


(3, 2723, 129)


In [5]:
import scipy.ndimage
def extract_xMR_xMRMF(file_path, 
                      sr=22050, 
                      window_sizes=[32, 64, 128], 
                      hop_length=256, 
                      target_T=None,
                      target_F=128):  # FIX: Added target_F to unify frequency bins
    y, _ = librosa.load(file_path, sr=sr)
    
    xMR_list = []
    mel_list = []
    T_list = []

    for win_size in window_sizes:
        # Raw FFT-based spectrogram
        S_complex = librosa.stft(y, n_fft=win_size, hop_length=hop_length, window='hann')
        S_mag = np.abs(S_complex)
        S_db = librosa.amplitude_to_db(S_mag, ref=np.max)

        # Mel-filtered version (use same freq bins as raw FFT for now)
        mel_spec = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=win_size,
                                                  hop_length=hop_length, n_mels=S_db.shape[0],
                                                  window='hann')
        mel_db = librosa.power_to_db(mel_spec, ref=np.max)

        xMR_list.append(S_db)
        mel_list.append(mel_db)
        T_list.append(S_db.shape[1])

    # Determine common time length
    if target_T is None:
        target_T = max(T_list)

    xMR_aligned = []
    mel_aligned = []

    for xmr, mel in zip(xMR_list, mel_list):
        # Resample time (axis=1) and frequency (axis=0) to match target_T and target_F
        xmr_resampled = scipy.ndimage.zoom(xmr, (target_F / xmr.shape[0], target_T / xmr.shape[1]), order=1)
        mel_resampled = scipy.ndimage.zoom(mel, (target_F / mel.shape[0], target_T / mel.shape[1]), order=1)

        xMR_aligned.append(xmr_resampled[..., np.newaxis])  # Shape: (F, T, 1)
        mel_aligned.append(mel_resampled[..., np.newaxis])  # Shape: (F, T, 1)

    # Now all shapes are (target_F, target_T, 1), so we can stack
    xMR = np.stack(xMR_aligned, axis=1).transpose(2, 1, 0, 3)   # (F, K, T, 1) → (T, K, F, 1)
    mel = np.stack(mel_aligned, axis=1).transpose(2, 1, 0, 3)   # (F, K, T, 1) → (T, K, F, 1)

    xMRMF = np.concatenate([mel, xMR], axis=-1)  # (T, K, F, 2)

    return xMR, xMRMF


In [6]:
import librosa
import torch

xMR, xMRMF = extract_xMR_xMRMF(audio_path)


xMRMF.shape  # Should be (T, K, F, 2) where K is number of resolutions, T is time frames, F is frequency bins

  mel_basis = filters.mel(sr=sr, n_fft=n_fft, **kwargs)


(681, 3, 128, 2)

In [7]:
import numpy as np
import torch

def prepare_xMRMF_for_patching(xMRMF_np, batch_first=True):
    """
    Convert xMRMF from shape (T, K, F, 2) → (B, T, F, R)
    
    Args:
        xMRMF_np: numpy array of shape (T, K, F, 2)
        batch_first: whether to return shape (B, T, F, R) (default: True)
        
    Returns:
        torch.Tensor of shape (B, T, F, R)
    """
    # xMRMF_np: (T, K, F, 2)
    T, K, F, Filt = xMRMF_np.shape
    R = K * Filt
    
    # Reshape (T, K, F, 2) → (T, F, R)
    xMRMF_np = xMRMF_np.transpose(0, 2, 1, 3)  # (T, F, K, 2)
    xMRMF_np = xMRMF_np.reshape(T, F, R)      # (T, F, R)

    # Add batch dimension: (B=1, T, F, R)
    xMRMF_tensor = torch.tensor(xMRMF_np, dtype=torch.float32).unsqueeze(0)

    if not batch_first:
        xMRMF_tensor = xMRMF_tensor.permute(1, 0, 2, 3)  # (T, B, F, R)

    return xMRMF_tensor  # Shape: (B, T, F, R)


In [8]:
xMRMF_tensor = prepare_xMRMF_for_patching(xMRMF, batch_first=True)

In [9]:
xMRMF_tensor.shape

torch.Size([1, 681, 128, 6])

In [10]:
import os
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
from tqdm import tqdm

class DatasetLoader:
    def __init__(self, csv_path, sr=22050, window_sizes=[32, 64, 128],
                 target_T=128, target_F=128, test_size=0.2, random_state=42):
        self.df = pd.read_csv(csv_path)
        self.df['path'] = df['name'].apply(lambda x: f"data/sounds/{x}")
        self.df['label'] = df['name'].str.lower().str.split("_").str[0]
        self.sr = sr
        self.window_sizes = window_sizes
        self.target_T = target_T
        self.target_F = target_F
        self.test_size = test_size
        self.random_state = random_state
        self.label_to_index = {label: i for i, label in enumerate(sorted(self.df['label'].unique()))}

    def extract_features_and_labels(self, paths, labels):
        data = []
        targets = []

        for path, label in tqdm(zip(paths, labels), total=len(paths), desc="Extracting xMRMF features"):
            try:
                _, xMRMF_np = extract_xMR_xMRMF(path,
                                                sr=self.sr,
                                                window_sizes=self.window_sizes,
                                                target_T=self.target_T,
                                                target_F=self.target_F)
                xMRMF_tensor = prepare_xMRMF_for_patching(xMRMF_np)  # (1, T, F, R)
                data.append(xMRMF_tensor.squeeze(0))  # Remove batch dim: (T, F, R)
                targets.append(self.label_to_index[label])
            except Exception as e:
                print(f"Warning: Failed to process {path}: {e}")

        return torch.stack(data), torch.tensor(targets)

    def load_dataset(self):
        # Split train/val
        train_df, val_df = train_test_split(self.df, test_size=self.test_size, random_state=self.random_state, stratify=self.df['label'])

        train_data, train_labels = self.extract_features_and_labels(train_df['path'], train_df['label'])
        val_data, val_labels = self.extract_features_and_labels(val_df['path'], val_df['label'])

        train_dataset = [train_data, train_labels]
        val_dataset = [val_data, val_labels]

        input_dim = train_data.shape[1:]  # (T, F, R)
        n_classes = len(self.label_to_index)

        return train_dataset, val_dataset, input_dim, n_classes


In [11]:
# data = DatasetLoader('data/Animal_Sound.csv')
# train_dataset, val_dataset, input_dim, n_classes = data.load_dataset()
# print(f"Train dataset shape: {train_dataset[0].shape}, Labels shape: {train_dataset[1].shape}")
# print(f"Validation dataset shape: {val_dataset[0].shape}, Labels shape: {val_dataset[1].shape}")
# print(f"Input dimension: {input_dim}, Number of classes: {n_classes}")

In [12]:
import torch
import torch.nn as nn

class PatchEmbed(nn.Module):
    def __init__(self, patch_size=(8, 8), in_channels=6, embed_dim=96):
        super().__init__()
        self.proj = nn.Conv2d(in_channels, embed_dim, kernel_size=patch_size, stride=patch_size)

    def forward(self, x):
        # x: (B, T, F, R) → (B, R, T, F)
        x = x.permute(0, 3, 1, 2)
        x = self.proj(x)  # (B, embed_dim, T', F')
        x = x.flatten(2).transpose(1, 2)  # (B, N_patches, embed_dim)
        return x

class PositionalEncoding3D(nn.Module):
    def __init__(self, T, F, R, embed_dim):
        super().__init__()
        self.time_emb = nn.Parameter(torch.randn(T, embed_dim))
        self.freq_emb = nn.Parameter(torch.randn(F, embed_dim))
        self.res_emb = nn.Parameter(torch.randn(R, embed_dim))
        self.proj = nn.Linear(2 * embed_dim, embed_dim)

    def forward(self, x, R_idx):
        # x: (B, N, D)
        B, N, D = x.shape
        # We assume T * F = N (patch grid), R_idx: resolution one-hot or idx per token
        time_pos = self.time_emb.unsqueeze(1)  # (T, 1, D)
        freq_pos = self.freq_emb.unsqueeze(0)  # (1, F, D)
        pe2d = (time_pos + freq_pos).reshape(-1, D)  # (T*F, D)

        res_pos = self.res_emb[R_idx]  # (N, D)
        pos = self.proj(torch.cat([pe2d[:N], res_pos], dim=-1))  # (N, D)
        return x + pos.unsqueeze(0)

class AcousticAttentionBlock(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        self.mha_mel = nn.MultiheadAttention(embed_dim, num_heads // 2, batch_first=True)
        self.mha_fft = nn.MultiheadAttention(embed_dim, num_heads // 2, batch_first=True)
        self.norm = nn.LayerNorm(embed_dim)
        self.ffn = nn.Sequential(
            nn.Linear(embed_dim, embed_dim * 4),
            nn.GELU(),
            nn.Linear(embed_dim * 4, embed_dim)
        )

    def forward(self, x, filter_type_mask):
        # x: (B, N, D), filter_type_mask: (B, N) → 0=mel, 1=fft
        x_mel = x.clone()
        x_fft = x.clone()

        x_mel[filter_type_mask == 1] = 0  # mask out fft in mel head
        x_fft[filter_type_mask == 0] = 0  # mask out mel in fft head

        x1, _ = self.mha_mel(x_mel, x_mel, x_mel)
        x2, _ = self.mha_fft(x_fft, x_fft, x_fft)

        x_out = x + x1 + x2
        x_out = self.norm(x_out)
        return x_out + self.ffn(x_out)

class AcousticTransformer(nn.Module):
    def __init__(self, T=128, F=128, R=6, embed_dim=96, num_heads=4, num_layers=4, num_classes=13):
        super().__init__()
        self.patch_embed = PatchEmbed(patch_size=(8, 8), in_channels=R, embed_dim=embed_dim)
        self.pos_enc = PositionalEncoding3D(T//8, F//8, R, embed_dim)
        self.blocks = nn.ModuleList([
            AcousticAttentionBlock(embed_dim, num_heads) for _ in range(num_layers)
        ])
        self.cls_head = nn.Sequential(
            nn.LayerNorm(embed_dim),
            nn.Linear(embed_dim, num_classes)
        )

    def forward(self, x):
        B, T, F, R = x.shape  # <--- THIS IS THE FIX
        B = x.shape[0]
        x = self.patch_embed(x)  # (B, N, D)
        # Dummy R indices assuming uniform resolution arrangement (for demo)
        N = x.size(1)
        R_idx = torch.arange(0, R).repeat((N // R) + 1)[:N].to(x.device)

        # Dummy binary mask for filters (mel=0, fft=1)
        filter_mask = (R_idx % 2).unsqueeze(0).repeat(B, 1)

        x = self.pos_enc(x, R_idx)

        for blk in self.blocks:
            x = blk(x, filter_mask)

        x = x.mean(dim=1)  # Global average pooling over patches
        return self.cls_head(x)


In [19]:
from torch.utils.data import DataLoader, TensorDataset
import torch
import torch.nn as nn
import torch.optim as optim

# Load dataset
loader = DatasetLoader(csv_path='data/Animal_Sound.csv')
train_data, val_data, input_dim, num_classes = loader.load_dataset()

# Wrap into TensorDataset
train_dataset = TensorDataset(train_data[0], train_data[1])  # [data, labels]
val_dataset = TensorDataset(val_data[0], val_data[1])

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize model
model = AcousticTransformer(
    T=input_dim[0],
    F=input_dim[1],
    R=input_dim[2],
    num_classes=num_classes
).to(device)

# Optimizer & loss
optimizer = optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

# Training loop
for epoch in range(70):
    model.train()
    total_loss = 0
    for x, y in train_loader:
        x, y = x.to(device), y.to(device)

        optimizer.zero_grad()
        output = model(x)
        loss = criterion(output, y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch + 1}: Train Loss = {total_loss / len(train_loader):.4f}")


  mel_basis = filters.mel(sr=sr, n_fft=n_fft, **kwargs)
Extracting xMRMF features: 100%|██████████| 520/520 [00:02<00:00, 223.58it/s]
Extracting xMRMF features: 100%|██████████| 130/130 [00:00<00:00, 169.27it/s]


Epoch 1: Train Loss = 2.6406
Epoch 2: Train Loss = 2.5692
Epoch 3: Train Loss = 2.5348
Epoch 4: Train Loss = 2.5242
Epoch 5: Train Loss = 2.4825
Epoch 6: Train Loss = 2.4156
Epoch 7: Train Loss = 2.2216
Epoch 8: Train Loss = 2.1723
Epoch 9: Train Loss = 1.9923
Epoch 10: Train Loss = 1.9342
Epoch 11: Train Loss = 1.8029
Epoch 12: Train Loss = 1.8051
Epoch 13: Train Loss = 1.8540
Epoch 14: Train Loss = 1.7603
Epoch 15: Train Loss = 1.6826
Epoch 16: Train Loss = 1.5986
Epoch 17: Train Loss = 1.7506
Epoch 18: Train Loss = 1.5985
Epoch 19: Train Loss = 1.5784
Epoch 20: Train Loss = 1.4928
Epoch 21: Train Loss = 1.4699
Epoch 22: Train Loss = 1.3910
Epoch 23: Train Loss = 1.3293
Epoch 24: Train Loss = 1.4455
Epoch 25: Train Loss = 1.2997
Epoch 26: Train Loss = 1.2864
Epoch 27: Train Loss = 1.2060
Epoch 28: Train Loss = 1.1737
Epoch 29: Train Loss = 1.1524
Epoch 30: Train Loss = 1.1485
Epoch 31: Train Loss = 1.1772
Epoch 32: Train Loss = 1.1105
Epoch 33: Train Loss = 1.0440
Epoch 34: Train Los

In [20]:
# Evaluate
model.eval()
correct, total = 0, 0
with torch.no_grad():
    for x, y in val_loader:
        x, y = x.to(device), y.to(device)
        output = model(x)
        preds = torch.argmax(output, dim=1)
        correct += (preds == y).sum().item()
        total += y.size(0)

acc = correct / total * 100
print(f"Validation Accuracy: {acc:.2f}%")

Validation Accuracy: 67.69%
