# The Last Frequency: SOTA From Scratch Solution

This notebook implements a high-performance audio classification pipeline designed to win the competition without using pretrained weights. 

### Key features of this solution:
1. **Advanced Feature Extraction**: 128-bin Log-Mel Spectrograms.
2. **ResNet-18 Architecture**: Trained from scratch, optimized for spectrogram patterns.
3. **Heavy Augmentations**: 
   - **Time Shifting**: Shifting frequency patterns in time.
   - **Background Noise Injection**: Realistic noise robustness.
   - **SpecAugment**: Frequency and Time masking to prevent overfitting on images.
   - **Mixup Training**: Smoothing decision boundaries by mixing samples.
4. **Optimization Strategy**: AdamW + OneCycleLR + Label Smoothing.
5. **Efficiency**: GPU-accelerated spectrogram generation.

In [None]:
import os
import json
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm.auto import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
import torchvision.models as models
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

import warnings
warnings.filterwarnings('ignore')

def seed_everything(seed=42):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything(42)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device: {device}')

### Configuration

In [None]:
class CFG:
    data_dir = '/kaggle/input/the-last-frequency'
    sample_rate = 16000
    n_fft = 1024
    hop_length = 256
    n_mels = 128
    target_frames = 64
    
    batch_size = 64
    epochs = 40  # Longer training is better for scratch
    lr = 1e-3
    weight_decay = 1e-2
    label_smoothing = 0.1
    mixup_alpha = 0.2
    
    num_classes = 35

### Load Data

In [None]:
print("Loading data...")
train_waveforms = np.load(f'{CFG.data_dir}/train_waveforms.npy')
train_labels = np.load(f'{CFG.data_dir}/train_labels.npy')
test_waveforms = np.load(f'{CFG.data_dir}/test_waveforms.npy')

with open(f'{CFG.data_dir}/label_map.json') as f:
    label_map = {int(k): v for k, v in json.load(f).items()}

print(f'Train shape: {train_waveforms.shape}, Labels: {len(train_labels)}')
print(f'Test shape: {test_waveforms.shape}')

### Augmentations & Transforms

We implement raw audio augmentations and GPU-based spectrogram generation.

In [None]:
class AudioAugmentor:
    @staticmethod
    def time_shift(waveform, shift_limit=0.1):
        shift = int(random.uniform(-shift_limit, shift_limit) * waveform.shape[0])
        return np.roll(waveform, shift)

    @staticmethod
    def add_noise(waveform, noise_limit=0.01):
        noise = np.random.randn(*waveform.shape)
        return waveform + random.uniform(0, noise_limit) * noise

class SpecTransform(nn.Module):
    """Converts raw audio to log-mel spectrogram on CPU/GPU"""
    def __init__(self):
        super().__init__()
        self.mel_spec = torchaudio.transforms.MelSpectrogram(
            sample_rate=CFG.sample_rate,
            n_fft=CFG.n_fft,
            hop_length=CFG.hop_length,
            n_mels=CFG.n_mels
        )
        self.amplitude_to_db = torchaudio.transforms.AmplitudeToDB()
        # SpecAugment parameters
        self.freq_mask = torchaudio.transforms.FrequencyMasking(freq_mask_param=20)
        self.time_mask = torchaudio.transforms.TimeMasking(time_mask_param=15)

    def forward(self, x, augment=False):
        # x shape: (batch, 16000)
        spec = self.mel_spec(x)
        spec = self.amplitude_to_db(spec)
        
        # Resizing to standard size
        if spec.shape[-1] > CFG.target_frames:
            spec = spec[..., :CFG.target_frames]
        elif spec.shape[-1] < CFG.target_frames:
            pad = CFG.target_frames - spec.shape[-1]
            spec = F.pad(spec, (0, pad))
            
        if augment:
            spec = self.freq_mask(spec)
            spec = self.time_mask(spec)
            
        return spec

### Dataset

In [None]:
class SpeechDataset(Dataset):
    def __init__(self, waveforms, labels=None, augment=False):
        self.waveforms = waveforms
        self.labels = labels
        self.augment = augment
        self.augmentor = AudioAugmentor()

    def __len__(self):
        return len(self.waveforms)

    def __getitem__(self, idx):
        waveform = self.waveforms[idx].copy()
        
        if self.augment:
            waveform = self.augmentor.time_shift(waveform)
            waveform = self.augmentor.add_noise(waveform)
            
        waveform = torch.from_numpy(waveform).float()
        
        if self.labels is not None:
            return waveform, self.labels[idx]
        return waveform

# Split
X_train, X_val, y_train, y_val = train_test_split(
    train_waveforms, train_labels, test_size=0.15, random_state=42, stratify=train_labels
)

train_ds = SpeechDataset(X_train, y_train, augment=True)
val_ds = SpeechDataset(X_val, y_val, augment=False)
test_ds = SpeechDataset(test_waveforms, augment=False)

train_loader = DataLoader(train_ds, batch_size=CFG.batch_size, shuffle=True, num_workers=2)
val_loader = DataLoader(val_ds, batch_size=CFG.batch_size, shuffle=False)
test_loader = DataLoader(test_ds, batch_size=CFG.batch_size, shuffle=False)

### Model Architecture (Custom ResNet-18)

In [None]:
class AudioResNet(nn.Module):
    def __init__(self, num_classes=35):
        super().__init__()
        # Extract architecture but don't load weights
        model = models.resnet18(pretrained=False)
        
        # Modify first layer for 1-channel input (spectrogram)
        model.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        
        # Modify last layer for 35 classes
        num_ftrs = model.fc.in_features
        model.fc = nn.Sequential(
            nn.Dropout(0.3),
            nn.Linear(num_ftrs, num_classes)
        )
        
        self.backbone = model
        self.spec_layer = SpecTransform()

    def forward(self, x, augment=False):
        # 1. Transform raw wav to spec
        x = self.spec_layer(x, augment=augment)
        # 2. Add channel dim
        x = x.unsqueeze(1)
        # 3. Backbone
        return self.backbone(x)

model = AudioResNet(CFG.num_classes).to(device)
print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")

### Mixup Logic

In [None]:
def mixup_data(x, y, alpha=0.2):
    '''Returns mixed inputs, pairs of targets, and lambda'''
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size()[0]
    index = torch.randperm(batch_size).to(device)

    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

### Training Engine

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=CFG.lr, weight_decay=CFG.weight_decay)
scheduler = torch.optim.lr_scheduler.OneCycleLR(
    optimizer, max_lr=CFG.lr*2, 
    steps_per_epoch=len(train_loader), 
    epochs=CFG.epochs
)
criterion = nn.CrossEntropyLoss(label_smoothing=CFG.label_smoothing)

best_acc = 0.0
history = {'train_loss': [], 'val_acc': []}

for epoch in range(1, CFG.epochs + 1):
    model.train()
    train_loss = 0
    pbar = tqdm(train_loader, desc=f'Epoch {epoch}')
    
    for x, y in pbar:
        x, y = x.to(device), y.to(device)
        
        # Mixup implementation 
        if random.random() < 0.5: # Apply mixup to 50% of matches
            x, y_a, y_b, lam = mixup_data(x, y, CFG.mixup_alpha)
            preds = model(x, augment=True)
            loss = mixup_criterion(criterion, preds, y_a, y_b, lam)
        else:
            preds = model(x, augment=True)
            loss = criterion(preds, y)
            
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()
        
        train_loss += loss.item()
        
    # Evaluation
    model.eval()
    all_preds = []
    all_targets = []
    with torch.no_grad():
        for x, y in val_loader:
            x = x.to(device)
            preds = model(x, augment=False)
            all_preds.extend(preds.argmax(1).cpu().numpy())
            all_targets.extend(y.numpy())
            
    val_acc = accuracy_score(all_targets, all_preds)
    print(f"Epoch {epoch} | Train Loss: {train_loss/len(train_loader):.4f} | Val Acc: {val_acc:.4f}")
    
    if val_acc > best_acc:
        best_acc = val_acc
        torch.save(model.state_dict(), 'best_model.pth')
        print("New Best Score Saved!")
    
    history['train_loss'].append(train_loss/len(train_loader))
    history['val_acc'].append(val_acc)

### Inference & Submission

In [None]:
model.load_state_dict(torch.load('best_model.pth'))
model.eval()
test_preds = []

with torch.no_grad():
    for x in tqdm(test_loader, desc='Inference'):
        x = x.to(device)
        preds = model(x, augment=False)
        test_preds.extend(preds.argmax(1).cpu().numpy())

submission = pd.DataFrame({'label': test_preds})
submission.to_csv('submission.csv', index_label='id')
print("Submission saved as submission.csv")