<a href="https://colab.research.google.com/github/Alokik-29/basic_transformer/blob/main/Basic_transforme.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [86]:
!nvidia-smi -L || echo "No GPU found (CPU will still work, just slower)"
!pip -q install torch torchaudio --upgrade

GPU 0: Tesla T4 (UUID: GPU-2782cd5a-198a-ff3d-8db9-8ce2334d4d17)


In [116]:
import os, math, random
from dataclasses import dataclass
from pathlib import Path
from typing import List, Tuple

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import torchaudio
from torchaudio import transforms as T

# Reproducibility
def set_seed(seed=42):
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
set_seed(42)

# Device selection
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Config dataclass
@dataclass
class CFG:
    sample_rate: int = 16000        # 16kHz audio
    target_secs: float = 1.0        # clip duration (1 second)
    n_mels: int = 64
    n_fft: int = 1024
    hop_length: int = 256
    f_min: int = 20
    f_max: int = 7600
    d_model: int = 128
    n_heads: int = 4
    n_layers: int = 2
    batch_size: int = 128
    lr: float = 1e-3
    epochs: int = 15

# Precompute Mel-Spectrograms
import os
import torch
from tqdm import tqdm

precompute_dir = "precomputed_mels"
os.makedirs(precompute_dir, exist_ok=True)

for idx, item in enumerate(tqdm(train_dataset + val_dataset + test_dataset)):
    # Adjust based on your dataset element structure
    waveform = item[0]           # already a tensor
    label = item[1]

    mel_path = os.path.join(precompute_dir, f"{idx}.pt")

    if not os.path.exists(mel_path):
        mel = mel_transform(waveform)
        torch.save({"mel": mel, "label": label_dict[label]}, mel_path)



AcceleratorError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [99]:
# Download Google Speech Commands dataset
train_dataset = torchaudio.datasets.SPEECHCOMMANDS(
    root="./", download=True, subset="training"
)
val_dataset = torchaudio.datasets.SPEECHCOMMANDS(
    root="./", download=True, subset="validation"
)
test_dataset = torchaudio.datasets.SPEECHCOMMANDS(
    root="./", download=True, subset="testing"
)

print("Train size:", len(train_dataset))
print("Validation size:", len(val_dataset))
print("Test size:", len(test_dataset))

# Take one sample
waveform, sample_rate, label, *_ = train_dataset[0]
print("Waveform shape:", waveform.shape)   # [1, samples]
print("Label:", label)
print("Sample rate:", sample_rate)

# Play the audio
import IPython.display as ipd
ipd.Audio(waveform.numpy(), rate=sample_rate)


Train size: 84843
Validation size: 9981
Test size: 11005
Waveform shape: torch.Size([1, 16000])
Label: backward
Sample rate: 16000


In [100]:
# Audio transform: waveform -> Mel Spectrogram
mel_transform = T.MelSpectrogram(
    sample_rate=CFG.sample_rate,
    n_fft=CFG.n_fft,
    hop_length=CFG.hop_length,
    n_mels=CFG.n_mels,
    f_min=CFG.f_min,
    f_max=CFG.f_max
)

# Convert one example
waveform, sample_rate, label, *_ = train_dataset[0]
mel_spec = mel_transform(waveform)

print("Mel spectrogram shape:", mel_spec.shape)  # [n_mels, time]


Mel spectrogram shape: torch.Size([1, 64, 63])


In [101]:
class SpeechCommandsDataset(Dataset):
    def __init__(self, base_dataset, transform, class_to_idx=None, target_secs=1.0):
        self.base = base_dataset
        self.transform = transform
        self.target_len = int(CFG.sample_rate * target_secs)

        # Build label dictionary if not provided
        if class_to_idx is None:
            labels = sorted(set(dat[2] for dat in base_dataset))
            self.class_to_idx = {c: i for i, c in enumerate(labels)}
        else:
            self.class_to_idx = class_to_idx

    def __len__(self):
        return len(self.base)

    def __getitem__(self, idx):
        waveform, sr, label, *_ = self.base[idx]

        # Ensure fixed duration (1 sec) by padding or truncating
        if waveform.size(1) < self.target_len:
            pad_len = self.target_len - waveform.size(1)
            waveform = torch.nn.functional.pad(waveform, (0, pad_len))
        else:
            waveform = waveform[:, :self.target_len]

        # Convert to Mel Spectrogram
        mel_spec = self.transform(waveform)   # [n_mels, time]

        # Normalize
        mel_spec = (mel_spec - mel_spec.mean()) / (mel_spec.std() + 1e-5)

        label_idx = self.class_to_idx[label]
        return mel_spec, label_idx


In [103]:
import time
from torch.utils.data import DataLoader, Dataset
import os
import torch
from sklearn.model_selection import train_test_split

# -----------------------------
# Dataset for precomputed Mel files
# -----------------------------
class PrecomputedMelDataset(Dataset):
    def __init__(self, pt_files: list):
        self.pt_files = pt_files

    def __len__(self):
        return len(self.pt_files)

    def __getitem__(self, idx):
        data = torch.load(self.pt_files[idx])
        mel = data["mel"]        # (n_mels, time)
        label = data["label"]
        return mel, label

# -----------------------------
# Get all precomputed files
# -----------------------------
all_files = sorted([os.path.join("precomputed_mels", f)
                    for f in os.listdir("precomputed_mels") if f.endswith(".pt")])

# Split into train/val/test (e.g., 80/10/10)
train_files, temp_files = train_test_split(all_files, test_size=0.2, random_state=42)
val_files, test_files   = train_test_split(temp_files, test_size=0.5, random_state=42)

print(f"Train files: {len(train_files)}, Val files: {len(val_files)}, Test files: {len(test_files)}")

# -----------------------------
# Wrap datasets
# -----------------------------
train_ds = PrecomputedMelDataset(train_files)
val_ds   = PrecomputedMelDataset(val_files)
test_ds  = PrecomputedMelDataset(test_files)

# -----------------------------
# DataLoaders (optimized)
# -----------------------------
train_loader = DataLoader(train_ds, batch_size=CFG.batch_size, shuffle=True,
                          num_workers=4, pin_memory=True, prefetch_factor=2, persistent_workers=True)
val_loader = DataLoader(val_ds, batch_size=CFG.batch_size, shuffle=False,
                        num_workers=4, pin_memory=True, prefetch_factor=2, persistent_workers=True)
test_loader = DataLoader(test_ds, batch_size=CFG.batch_size, shuffle=False,
                         num_workers=4, pin_memory=True, prefetch_factor=2, persistent_workers=True)

# -----------------------------
# Benchmark DataLoader
# -----------------------------
n_test_batches = 20
loader_iter = iter(train_loader)

# Warmup first batch (spawn workers)
_ = next(loader_iter)

start = time.time()
for i, (xb, yb) in enumerate(loader_iter):
    if i >= n_test_batches:
        break
end = time.time()

time_per_batch = (end - start) / n_test_batches
num_batches = len(train_loader)
epoch_time = time_per_batch * num_batches

print(f"Approx. {time_per_batch:.3f} sec per batch")
print(f"Approx. {epoch_time/60:.2f} minutes per epoch")


Train files: 84663, Val files: 10583, Test files: 10583




RuntimeError: Caught RuntimeError in DataLoader worker process 0.
Original Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/_utils/worker.py", line 349, in _worker_loop
    data = fetcher.fetch(index)  # type: ignore[possibly-undefined]
           ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/_utils/fetch.py", line 55, in fetch
    return self.collate_fn(data)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/_utils/collate.py", line 398, in default_collate
    return collate(batch, collate_fn_map=default_collate_fn_map)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/_utils/collate.py", line 212, in collate
    collate(samples, collate_fn_map=collate_fn_map)
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/_utils/collate.py", line 155, in collate
    return collate_fn_map[elem_type](batch, collate_fn_map=collate_fn_map)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/_utils/collate.py", line 271, in collate_tensor_fn
    out = elem.new(storage).resize_(len(batch), *list(elem.size()))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: Trying to resize storage that is not resizable


In [104]:
import torch
import torch.nn as nn
import math

class AudioTransformer(nn.Module):
    def __init__(self, num_classes, d_model=128, nhead=4, num_layers=2, max_len=512):
        super().__init__()
        self.d_model = d_model
        self.num_classes = num_classes

        # lazy projection initialized in forward once we know n_mels
        self.input_proj = None

        # Transformer encoder
        enc_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead, dim_feedforward=256, dropout=0.1, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(enc_layer, num_layers=num_layers)

        # Learnable positional encoding (simple & robust)
        self.pos_encoding = nn.Parameter(torch.zeros(1, max_len, d_model))

        # Classifier head
        self.fc = nn.Linear(d_model, num_classes)

        # Init
        nn.init.trunc_normal_(self.pos_encoding, std=0.02)
        nn.init.xavier_uniform_(self.fc.weight)
        nn.init.zeros_(self.fc.bias)

    def forward(self, x):
        # x: (B, 1, n_mels, T) or (B, n_mels, T). We accept both.
        if x.dim() == 4:
            x = x.squeeze(1)           # -> (B, n_mels, T)
        # Now (B, n_mels, T)
        x = x.transpose(1, 2)          # -> (B, T, n_mels)

        # Build projection lazily: n_mels -> d_model
        if self.input_proj is None:
            in_dim = x.size(-1)        # usually 64
            self.input_proj = nn.Linear(in_dim, self.d_model).to(x.device)
            nn.init.xavier_uniform_(self.input_proj.weight)
            nn.init.zeros_(self.input_proj.bias)

        # Project to d_model
        x = self.input_proj(x)         # (B, T, d_model)

        # Add positional encoding (trim/crop to sequence length)
        T = x.size(1)
        x = x + self.pos_encoding[:, :T, :]

        # Transformer encoder
        x = self.transformer(x)        # (B, T, d_model)

        # Temporal pooling
        x = x.mean(dim=1)              # (B, d_model)

        # Classifier
        out = self.fc(x)               # (B, num_classes)
        return out

In [105]:
num_classes = len(label_dict)  # from earlier cell
model = AudioTransformer(
    num_classes=num_classes,
    d_model=CFG.d_model,
    nhead=CFG.n_heads,
    num_layers=CFG.n_layers,
    max_len=512
).to(device)
# Loss & optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=CFG.lr)

# Accuracy helper
def accuracy(preds, labels):
    return (preds.argmax(dim=1) == labels).float().mean().item()

# Training & validation loops
def train_one_epoch(model, loader, optimizer, criterion):
    model.train()
    total_loss, total_acc = 0.0, 0.0
    for xb, yb in loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        out = model(xb)
        loss = criterion(out, yb)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        total_acc += accuracy(out, yb)
    return total_loss / len(loader), total_acc / len(loader)

def validate(model, loader, criterion):
    model.eval()
    total_loss, total_acc = 0.0, 0.0
    with torch.no_grad():
        for xb, yb in loader:
            xb, yb = xb.to(device), yb.to(device)
            out = model(xb)
            loss = criterion(out, yb)
            total_loss += loss.item()
            total_acc += accuracy(out, yb)
    return total_loss / len(loader), total_acc / len(loader)


In [111]:
train_loader = DataLoader(
    train_ds,
    batch_size=CFG.batch_size,
    shuffle=True,
    num_workers=4,
    pin_memory=True,
    collate_fn=collate_fn  # <-- add this
)

val_loader = DataLoader(
    val_ds,
    batch_size=CFG.batch_size,
    shuffle=False,
    num_workers=4,
    pin_memory=True,
    collate_fn=collate_fn  # <-- add this
)

for xb, yb in train_loader:
    print("Input batch shape:", xb.shape)  # (batch, seq_len, n_mels)
    print("Label shape:", yb.shape)
    break

Input batch shape: torch.Size([128, 1, 64, 63])
Label shape: torch.Size([128])


In [114]:
num_classes = len(label_dict)
all_labels = []

for _, y in train_ds:
    all_labels.append(y)

all_labels = torch.tensor(all_labels)
print("Num classes:", num_classes)
print("Min label:", all_labels.min().item())
print("Max label:", all_labels.max().item())


Num classes: 35
Min label: 16000
Max label: 16000


In [113]:
%env CUDA_LAUNCH_BLOCKING=1
# Training loop
epochs = CFG.epochs
for epoch in range(epochs):
    train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion)
    val_loss, val_acc = validate(model, val_loader, criterion)
    print(f"Epoch [{epoch+1}/{epochs}]")
    print(f"  Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
    print(f"  Val   Loss: {val_loss:.4f} | Val   Acc: {val_acc:.4f}")
    print("-" * 40)

env: CUDA_LAUNCH_BLOCKING=1


AcceleratorError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.
