# Bethy: Self-contained Kaggle Notebook

End-to-end breath sound classification without importing local `.py` files. All code (feature extraction, dataset, model, training, evaluation, prediction) lives in this notebook. Update the dataset path cell below if your Kaggle dataset name differs.

In [None]:
# Environment check and optional installs
import os, sys, subprocess, json
is_kaggle = os.path.exists('/kaggle')
print(f"Running on Kaggle: {is_kaggle}")
print(sys.version)
if is_kaggle:
    # Install any missing dependencies quietly
    pkgs = ['pyyaml','tensorboard']
    subprocess.run([sys.executable, '-m', 'pip', 'install', '-q', *pkgs], check=False)
    print('Dependencies checked/installed')

## Configuration

Configure paths and hyperparameters. The cell below auto-detects a likely dataset path under `/kaggle/input`. Adjust if needed.

In [None]:
import yaml, pprint, itertools
import torch
from pathlib import Path

# Detect dataset path - looks for folders containing .wav and .txt files
candidate_dirs = [
    Path('/kaggle/input/respiratory-sound-database/Respiratory_Sound_Database/Respiratory_Sound_Database/audio_and_txt_files'),
    Path('/kaggle/input/respiratory-sound-database/Respiratory_Sound_Database/Respiratory_Sound_Database'),
    Path('/kaggle/input/respiratory-sound-database/audio_and_txt_files'),
    Path('/kaggle/input/respiratory-sound-database'),
    Path('/kaggle/input/icbhi-respiratory-sound-database'),
    Path('/kaggle/input/icbhi'),
    Path('./data/icbhi')
]

data_dir = None
for d in candidate_dirs:
    if d.exists():
        # Check if this directory or subdirectories contain .wav files
        wav_files = list(d.glob('*.wav')) + list(d.glob('**/*.wav'))
        if wav_files:
            # Use the parent that directly contains .wav files, or the root
            data_dir = str(d)
            print(f"Found {len(wav_files)} wav files in: {d}")
            break

if data_dir is None:
    data_dir = str(candidate_dirs[-1])  # fallback placeholder
    print(f"⚠️ No wav files found! Using fallback: {data_dir}")
    print("Available input directories:")
    if Path('/kaggle/input').exists():
        for p in Path('/kaggle/input').iterdir():
            print(f"  - {p}")

config = {
    'model': {
        'cnn_channels': [64, 128, 256],
        'rnn_hidden_size': 256,
        'rnn_num_layers': 2,
        'attention_dim': 128,
        'dropout': 0.5,
        'num_classes': 4
    },
    'features': {
        'sample_rate': 16000,
        'n_fft': 2048,
        'hop_length': 512,
        'n_mels': 128,
        'n_mfcc': 40,
        'duration': 8.0
    },
    'training': {
        'batch_size': 32,
        'num_epochs': 50,
        'learning_rate': 1e-4,
        'weight_decay': 1e-4,
        'patience': 10,
        'num_workers': 2
    },
    'augmentation': {
        'time_stretch': True,
        'pitch_shift': True,
        'noise_injection': True,
        'mixup': False
    },
    'paths': {
        'data_dir': data_dir,
        'checkpoint_dir': '/kaggle/working/checkpoints' if is_kaggle else './checkpoints',
        'log_dir': '/kaggle/working/logs' if is_kaggle else './logs'
    },
    'device': 'cuda' if torch.cuda.is_available() else 'cpu',
    'seed': 42
}

print(f"\n{'='*50}")
print(f"Data directory: {config['paths']['data_dir']}")
print(f"Device: {config['device']}")
print(f"{'='*50}\n")
pprint.pprint(config['training'])

## Imports and utilities

Core imports plus a helper to set random seeds for reproducibility.

In [None]:
import random
import numpy as np
import torch
import torchaudio
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from typing import List, Dict, Tuple, Optional

def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(config['seed'])

## Feature extraction

Mel-spectrograms + MFCCs using torchaudio; normalize to zero mean/unit variance and pad/trim to 8 seconds.

In [None]:
class FeatureExtractor:
    def __init__(self, sample_rate=16000, n_fft=2048, hop_length=512, n_mels=128, n_mfcc=40, duration=8.0):
        self.sample_rate = sample_rate
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.n_mels = n_mels
        self.n_mfcc = n_mfcc
        self.duration = duration
        self.target_length = int(sample_rate * duration)

        self.mel_spectrogram = torchaudio.transforms.MelSpectrogram(
            sample_rate=sample_rate, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels
        )
        self.mfcc_transform = torchaudio.transforms.MFCC(
            sample_rate=sample_rate, n_mfcc=n_mfcc,
            melkwargs={'n_fft': n_fft, 'hop_length': hop_length, 'n_mels': n_mels}
        )

    def load_audio(self, audio_path: str) -> torch.Tensor:
        waveform, sr = torchaudio.load(audio_path)
        if sr != self.sample_rate:
            waveform = torchaudio.transforms.Resample(sr, self.sample_rate)(waveform)
        if waveform.shape[0] > 1:
            waveform = waveform.mean(dim=0, keepdim=True)
        if waveform.shape[1] < self.target_length:
            pad = self.target_length - waveform.shape[1]
            waveform = torch.nn.functional.pad(waveform, (0, pad))
        else:
            waveform = waveform[:, :self.target_length]
        return waveform

    def extract_mel_spectrogram(self, waveform: torch.Tensor) -> torch.Tensor:
        mel_spec = self.mel_spectrogram(waveform)
        return torch.log(mel_spec + 1e-9)

    def extract_mfcc(self, waveform: torch.Tensor) -> torch.Tensor:
        return self.mfcc_transform(waveform)

    def extract_features(self, audio_path: str) -> dict:
        waveform = self.load_audio(audio_path)
        mel = self.extract_mel_spectrogram(waveform)
        mfcc = self.extract_mfcc(waveform)
        return {'waveform': waveform, 'mel_spectrogram': mel, 'mfcc': mfcc}

    def normalize(self, features: torch.Tensor) -> torch.Tensor:
        mean = features.mean()
        std = features.std()
        return (features - mean) / (std + 1e-9)

def compute_deltas(features: torch.Tensor, width: int = 9) -> torch.Tensor:
    deltas = torch.zeros_like(features)
    for t in range(width, features.shape[-1] - width):
        deltas[..., t] = (features[..., t+1:t+width+1].sum(dim=-1) - features[..., t-width:t].sum(dim=-1)) / (2 * width)
    return deltas

## Augmentation

Simple waveform noise/gain and SpecAugment masks for mel-spectrograms.

In [None]:
import random
class AudioAugmentation:
    def __init__(self, sample_rate=16000, noise_factor=0.005, gain_range=(0.5,1.5)):
        self.sample_rate = sample_rate
        self.noise_factor = noise_factor
        self.gain_range = gain_range
    def add_noise(self, waveform):
        return waveform + torch.randn_like(waveform) * self.noise_factor
    def random_gain(self, waveform):
        return waveform * random.uniform(*self.gain_range)
    def apply_random(self, waveform):
        if random.random() > 0.5:
            waveform = self.add_noise(waveform)
        if random.random() > 0.5:
            waveform = self.random_gain(waveform)
        return waveform
class SpecAugment:
    def __init__(self, freq_mask_param=30, time_mask_param=40, num_masks=2):
        self.freq_mask = torchaudio.transforms.FrequencyMasking(freq_mask_param)
        self.time_mask = torchaudio.transforms.TimeMasking(time_mask_param)
        self.num_masks = num_masks
    def __call__(self, spectrogram: torch.Tensor) -> torch.Tensor:
        for _ in range(self.num_masks):
            spectrogram = self.freq_mask(spectrogram)
            spectrogram = self.time_mask(spectrogram)
        return spectrogram

## Dataset and dataloaders

ICBHI patient-wise split (60/20/20). Extract features, apply optional augmentations, return combined mel+MFCC tensor.

In [None]:
class ICBHIDataset(Dataset):
    """
    ICBHI dataset loader.
    
    Dataset structure:
    - Each audio file (*.wav) has a corresponding annotation file (*.txt) with same name
    - Annotation format per line: start_time end_time is_wheeze is_crackle
    - Values are space/tab separated
    """
    CLASS_MAPPING = {(0,0):0, (1,0):1, (0,1):2, (1,1):3}  # (wheeze, crackle) -> label
    CLASS_NAMES = ['normal', 'wheeze', 'crackle', 'both']
    
    def __init__(self, data_dir: str, split='train', feature_extractor=None, augment=False, config=None):
        self.data_dir = Path(data_dir)
        self.split = split
        self.augment = augment and split == 'train'
        self.feature_extractor = feature_extractor or FeatureExtractor(**config['features'])
        self.config = config
        if self.augment:
            self.audio_aug = AudioAugmentation(sample_rate=self.feature_extractor.sample_rate)
            self.spec_aug = SpecAugment()
        self.samples = self._load_annotations()
        self.class_weights = self._compute_class_weights()
    
    def _find_audio_files(self):
        """Find all .wav files in the dataset directory (searches common locations)"""
        audio_files = []
        search_paths = [
            self.data_dir,
            self.data_dir / 'audio',
            self.data_dir / 'audio_and_txt_files',
            self.data_dir / 'Respiratory_Sound_Database' / 'audio_and_txt_files',
        ]
        for search_path in search_paths:
            if search_path.exists():
                audio_files.extend(list(search_path.glob('*.wav')))
        return list(set(audio_files))
    
    def _load_annotations(self):
        """Load annotations from individual .txt files (same name as .wav files)"""
        samples = []
        audio_files = self._find_audio_files()
        
        if not audio_files:
            raise FileNotFoundError(f"No .wav files found in {self.data_dir}. Check your dataset path.")
        
        print(f"Found {len(audio_files)} audio files")
        
        for audio_path in audio_files:
            txt_path = audio_path.with_suffix('.txt')
            if not txt_path.exists():
                continue
            
            filename = audio_path.stem
            
            # Format: start_time end_time is_wheeze is_crackle
            try:
                with open(txt_path, 'r') as f:
                    for line_num, line in enumerate(f):
                        line = line.strip()
                        if not line:
                            continue
                        parts = line.split()
                        if len(parts) < 4:
                            continue
                        
                        start_time = float(parts[0])
                        end_time = float(parts[1])
                        is_wheeze = int(parts[2])
                        is_crackle = int(parts[3])
                        
                        label = self.CLASS_MAPPING[(is_wheeze, is_crackle)]
                        
                        samples.append({
                            'audio_path': audio_path,
                            'filename': filename,
                            'start_time': start_time,
                            'end_time': end_time,
                            'crackles': is_crackle,
                            'wheezes': is_wheeze,
                            'label': label,
                            'cycle_id': f"{filename}_{line_num}"
                        })
            except Exception as e:
                print(f"Warning: Error reading {txt_path}: {e}")
                continue
        
        if not samples:
            raise ValueError(f"No valid samples found. Check annotation format: 'start_time end_time is_wheeze is_crackle'")
        
        print(f"Loaded {len(samples)} respiratory cycles from {len(audio_files)} files")
        return self._split_patientwise(samples)
    
    def _split_patientwise(self, samples):
        """Split by patient ID (first part of filename before underscore)"""
        patient_ids = sorted({s['filename'].split('_')[0] for s in samples})
        n_train = int(0.6 * len(patient_ids))
        n_val = int(0.2 * len(patient_ids))
        train_p = set(patient_ids[:n_train])
        val_p = set(patient_ids[n_train:n_train + n_val])
        test_p = set(patient_ids[n_train + n_val:])
        
        if self.split == 'train':
            pool = train_p
        elif self.split == 'val':
            pool = val_p
        else:
            pool = test_p
        
        filtered = [s for s in samples if s['filename'].split('_')[0] in pool]
        print(f"{self.split} split: {len(filtered)} samples from {len(pool)} patients")
        return filtered
    
    def _compute_class_weights(self):
        if not self.samples:
            return torch.ones(4)
        labels = [s['label'] for s in self.samples]
        counts = np.bincount(labels, minlength=4)
        total = len(labels)
        weights = total / (len(counts) * counts + 1e-6)
        return torch.FloatTensor(weights)
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        s = self.samples[idx]
        feats = self.feature_extractor.extract_features(str(s['audio_path']))
        
        if self.augment:
            feats['waveform'] = self.audio_aug.apply_random(feats['waveform'])
            feats['mel_spectrogram'] = self.feature_extractor.extract_mel_spectrogram(feats['waveform'])
            feats['mel_spectrogram'] = self.spec_aug(feats['mel_spectrogram'])
        
        mel = self.feature_extractor.normalize(feats['mel_spectrogram'])
        mfcc = self.feature_extractor.normalize(feats['mfcc'])
        combined = torch.cat([mel, mfcc], dim=1)
        
        label = s['label']
        meta = {
            'filename': s['filename'],
            'audio_path': str(s['audio_path']),
            'crackles': s['crackles'],
            'wheezes': s['wheezes'],
            'start_time': s['start_time'],
            'end_time': s['end_time']
        }
        return combined, label, meta


def get_dataloaders(config):
    feat = FeatureExtractor(**config['features'])
    train_ds = ICBHIDataset(config['paths']['data_dir'], 'train', feat,
                            augment=config['augmentation']['noise_injection'], config=config)
    val_ds = ICBHIDataset(config['paths']['data_dir'], 'val', feat, augment=False, config=config)
    test_ds = ICBHIDataset(config['paths']['data_dir'], 'test', feat, augment=False, config=config)
    
    def make_loader(ds, shuffle):
        return DataLoader(ds, batch_size=config['training']['batch_size'],
                          shuffle=shuffle, num_workers=config['training']['num_workers'], pin_memory=True)
    
    return train_ds, val_ds, test_ds, make_loader(train_ds, True), make_loader(val_ds, False), make_loader(test_ds, False)

## Metrics and early stopping

Compute per-class metrics, macro averages, specificity, ICBHI score; simple early stopping helper.

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
class MetricsCalculator:
    CLASS_NAMES = ['normal','wheeze','crackle','both']
    def __init__(self, num_classes=4):
        self.num_classes = num_classes
    def _specificity(self, cm):
        spec = np.zeros(self.num_classes)
        for i in range(self.num_classes):
            tn = cm.sum() - cm[i,:].sum() - cm[:,i].sum() + cm[i,i]
            fp = cm[:,i].sum() - cm[i,i]
            spec[i] = tn / (tn + fp + 1e-9)
        return spec
    def calculate(self, y_true, y_pred, y_proba=None):
        metrics = {}
        metrics['accuracy'] = accuracy_score(y_true, y_pred)
        metrics['precision_macro'] = precision_score(y_true, y_pred, average='macro', zero_division=0)
        metrics['recall_macro'] = recall_score(y_true, y_pred, average='macro', zero_division=0)
        metrics['f1_macro'] = f1_score(y_true, y_pred, average='macro', zero_division=0)
        metrics['precision_weighted'] = precision_score(y_true, y_pred, average='weighted', zero_division=0)
        metrics['recall_weighted'] = recall_score(y_true, y_pred, average='weighted', zero_division=0)
        metrics['f1_weighted'] = f1_score(y_true, y_pred, average='weighted', zero_division=0)
        cm = confusion_matrix(y_true, y_pred, labels=range(self.num_classes))
        metrics['confusion_matrix'] = cm
        spec = self._specificity(cm)
        metrics['specificity_macro'] = spec.mean()
        metrics['icbhi_score'] = (metrics['recall_macro'] + metrics['specificity_macro']) / 2
        return metrics
    def report(self, y_true, y_pred):
        return classification_report(y_true, y_pred, target_names=self.CLASS_NAMES, zero_division=0)
class EarlyStopping:
    def __init__(self, patience=10, min_delta=0.0, mode='max'):
        self.patience = patience; self.min_delta = min_delta; self.mode = mode
        self.best = None; self.counter = 0; self.early_stop = False
    def __call__(self, current):
        if self.best is None:
            self.best = current; return False
        improved = current > self.best + self.min_delta if self.mode=='max' else current < self.best - self.min_delta
        if improved:
            self.best = current; self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True; return True
        return False

## Model: Hybrid CNN-RNN-Attention

CNN blocks on spectrograms → BiLSTM → attention → MLP classifier.

In [None]:
class AttentionLayer(nn.Module):
    def __init__(self, hidden_dim, attention_dim):
        super().__init__()
        self.att = nn.Sequential(nn.Linear(hidden_dim, attention_dim), nn.Tanh(), nn.Linear(attention_dim,1))
    def forward(self, x):
        scores = self.att(x).squeeze(-1)
        weights = F.softmax(scores, dim=1)
        context = torch.bmm(weights.unsqueeze(1), x).squeeze(1)
        return context, weights
class CNNBlock(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.conv1 = nn.Conv2d(in_ch, out_ch, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(out_ch)
        self.conv2 = nn.Conv2d(out_ch, out_ch, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_ch)
        self.pool = nn.MaxPool2d(2,2)
        self.drop = nn.Dropout2d(0.25)
    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool(x)
        return self.drop(x)
class HybridCNNRNNAttention(nn.Module):
    def __init__(self, input_channels=1, cnn_channels=[64,128,256], rnn_hidden_size=256, rnn_num_layers=2, attention_dim=128, num_classes=4, dropout=0.5):
        super().__init__()
        layers = []
        in_ch = input_channels
        for out_ch in cnn_channels:
            layers.append(CNNBlock(in_ch, out_ch))
            in_ch = out_ch
        self.cnn_blocks = nn.ModuleList(layers)
        self.cnn_output_channels = cnn_channels[-1]
        self.lstm = nn.LSTM(input_size=self.cnn_output_channels, hidden_size=rnn_hidden_size, num_layers=rnn_num_layers, batch_first=True, dropout=dropout if rnn_num_layers>1 else 0, bidirectional=True)
        self.attention = AttentionLayer(hidden_dim=rnn_hidden_size*2, attention_dim=attention_dim)
        self.classifier = nn.Sequential(nn.Linear(rnn_hidden_size*2,512), nn.ReLU(), nn.Dropout(dropout), nn.Linear(512,256), nn.ReLU(), nn.Dropout(dropout), nn.Linear(256,num_classes))
    def forward(self, x):
        for block in self.cnn_blocks:
            x = block(x)
        x = x.permute(0,3,2,1).reshape(x.size(0), x.size(3), -1)
        lstm_out,_ = self.lstm(x)
        context, attn = self.attention(lstm_out)
        logits = self.classifier(context)
        return logits, attn
    def predict_proba(self, x):
        logits, _ = self.forward(x)
        return F.softmax(logits, dim=1)
def create_model(config, device):
    mcfg = config['model']
    model = HybridCNNRNNAttention(input_channels=1, cnn_channels=mcfg['cnn_channels'], rnn_hidden_size=mcfg['rnn_hidden_size'], rnn_num_layers=mcfg['rnn_num_layers'], attention_dim=mcfg['attention_dim'], num_classes=mcfg['num_classes'], dropout=mcfg['dropout']).to(device)
    return model

## Training loop

Epoch-wise training/validation with class-weighted loss, LR scheduler, checkpointing, and early stopping.

In [None]:
def train_epoch(model, loader, criterion, optimizer, device, epoch):
    model.train()
    total_loss = 0.0
    preds_all, labels_all = [], []
    pbar = tqdm(loader, desc=f"Epoch {epoch} [Train]", leave=False)
    for features, labels, _ in pbar:
        features, labels = features.to(device), labels.to(device)
        optimizer.zero_grad()
        logits, _ = model(features)
        loss = criterion(logits, labels)
        loss.backward(); optimizer.step()
        total_loss += loss.item()
        preds = logits.argmax(dim=1)
        preds_all.extend(preds.cpu().numpy()); labels_all.extend(labels.cpu().numpy())
        pbar.set_postfix(loss=f"{loss.item():.4f}")
    avg_loss = total_loss/len(loader)
    acc = np.mean(np.array(preds_all)==np.array(labels_all))
    return avg_loss, acc, np.array(labels_all), np.array(preds_all)

def validate_epoch(model, loader, criterion, device, epoch):
    model.eval()
    total_loss = 0.0
    preds_all, labels_all, probas_all = [], [], []
    pbar = tqdm(loader, desc=f"Epoch {epoch} [Val]", leave=False)
    with torch.no_grad():
        for features, labels, _ in pbar:
            features, labels = features.to(device), labels.to(device)
            logits, attn = model(features)
            loss = criterion(logits, labels)
            total_loss += loss.item()
            probas = torch.softmax(logits, dim=1)
            preds = logits.argmax(dim=1)
            preds_all.extend(preds.cpu().numpy()); labels_all.extend(labels.cpu().numpy()); probas_all.extend(probas.cpu().numpy())
            pbar.set_postfix(loss=f"{loss.item():.4f}")
    avg_loss = total_loss/len(loader)
    acc = np.mean(np.array(preds_all)==np.array(labels_all))
    return avg_loss, acc, np.array(labels_all), np.array(preds_all), np.array(probas_all)

def save_checkpoint(model, optimizer, epoch, metrics, path, is_best=False):
    state = {'epoch': epoch, 'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'metrics': metrics}
    torch.save(state, path)
    if is_best:
        best_path = Path(path).with_name(Path(path).stem + '_best.pth')
        torch.save(state, best_path)

def train(config):
    device = torch.device(config['device'])
    Path(config['paths']['checkpoint_dir']).mkdir(parents=True, exist_ok=True)
    Path(config['paths']['log_dir']).mkdir(parents=True, exist_ok=True)
    train_ds, val_ds, test_ds, train_loader, val_loader, test_loader = get_dataloaders(config)
    model = create_model(config, device)
    class_weights = train_ds.class_weights.to(device)
    criterion = nn.CrossEntropyLoss(weight=class_weights)
    optimizer = torch.optim.Adam(model.parameters(), lr=config['training']['learning_rate'], weight_decay=config['training']['weight_decay'])
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)
    early_stop = EarlyStopping(patience=config['training']['patience'], mode='max')
    metrics_calc = MetricsCalculator()
    history = {'train_loss':[], 'val_loss':[], 'train_acc':[], 'val_acc':[], 'val_icbhi':[]}
    best_icbhi = -1
    for epoch in range(1, config['training']['num_epochs']+1):
        tr_loss, tr_acc, tr_y, tr_p = train_epoch(model, train_loader, criterion, optimizer, device, epoch)
        va_loss, va_acc, va_y, va_p, va_proba = validate_epoch(model, val_loader, criterion, device, epoch)
        va_metrics = metrics_calc.calculate(va_y, va_p, va_proba)
        scheduler.step(va_loss)
        history['train_loss'].append(tr_loss); history['val_loss'].append(va_loss); history['train_acc'].append(tr_acc); history['val_acc'].append(va_acc); history['val_icbhi'].append(va_metrics['icbhi_score'])
        print(f"Epoch {epoch}: train_loss={tr_loss:.4f} val_loss={va_loss:.4f} val_acc={va_acc:.4f} icbhi={va_metrics['icbhi_score']:.4f}")
        ckpt_path = str(Path(config['paths']['checkpoint_dir'])/f'checkpoint_epoch_{epoch}.pth')
        is_best = va_metrics['icbhi_score'] > best_icbhi
        if is_best: best_icbhi = va_metrics['icbhi_score']
        save_checkpoint(model, optimizer, epoch, va_metrics, ckpt_path, is_best=is_best)
        if early_stop(va_metrics['icbhi_score']):
            print(f"Early stopping at epoch {epoch}")
            break
    print(f"Best ICBHI score: {best_icbhi:.4f}")
    return model, history, (train_loader, val_loader, test_loader)

## Evaluation helper

Evaluate a trained model on a dataloader; returns metrics, predictions, probabilities.

In [None]:
def evaluate_model(model, loader, device, metrics_calc):
    model.eval()
    preds_all, labels_all, probas_all, attn_all, spec_all = [], [], [], [], []
    with torch.no_grad():
        for features, labels, _ in tqdm(loader, desc='Eval', leave=False):
            features = features.to(device)
            logits, attn = model(features)
            probas = torch.softmax(logits, dim=1)
            preds = logits.argmax(dim=1)
            preds_all.extend(preds.cpu().numpy())
            labels_all.extend(labels.numpy())
            probas_all.extend(probas.cpu().numpy())
            attn_all.extend(attn.cpu().numpy())
            spec_all.extend(features[:,0,:,:].cpu().numpy())
    metrics = metrics_calc.calculate(np.array(labels_all), np.array(preds_all), np.array(probas_all))
    report = metrics_calc.report(np.array(labels_all), np.array(preds_all))
    return {'metrics': metrics, 'report': report, 'labels': np.array(labels_all), 'preds': np.array(preds_all), 'probas': np.array(probas_all), 'attention': attn_all, 'spectrograms': spec_all}

## Prediction helper

Convenience class for single-file and batch predictions with probabilities.

In [None]:
class BreathSoundPredictor:
    def __init__(self, model_path: str, config: dict, device: torch.device):
        self.device = device
        self.config = config
        self.class_names = ICBHIDataset.CLASS_NAMES
        self.fe = FeatureExtractor(**config['features'])
        self.model = create_model(config, device)
        ckpt = torch.load(model_path, map_location=device)
        state = ckpt['model_state_dict'] if 'model_state_dict' in ckpt else ckpt
        self.model.load_state_dict(state)
        self.model.eval()
    def _prepare(self, audio_path: str):
        feats = self.fe.extract_features(audio_path)
        mel = self.fe.normalize(feats['mel_spectrogram'])
        mfcc = self.fe.normalize(feats['mfcc'])
        combined = torch.cat([mel, mfcc], dim=1).unsqueeze(0).to(self.device)
        return combined
    def predict_single(self, audio_path: str):
        with torch.no_grad():
            x = self._prepare(audio_path)
            logits, attn = self.model(x)
            probas = torch.softmax(logits, dim=1)[0]
            pred_idx = int(probas.argmax().item())
            return {
                'predicted_class': self.class_names[pred_idx],
                'predicted_index': pred_idx,
                'confidence': float(probas[pred_idx].item()),
                'probabilities': {name: float(probas[i].item()) for i,name in enumerate(self.class_names)},
                'attention': attn[0].cpu().numpy(),
            }
    def predict_batch(self, audio_paths: list, batch_size: int = 8):
        results = []
        for i in range(0, len(audio_paths), batch_size):
            batch = audio_paths[i:i+batch_size]
            tensors = []
            valid_paths = []
            for p in batch:
                try:
                    tensors.append(self._prepare(p))
                    valid_paths.append(p)
                except Exception as e:
                    results.append({'audio_path': p, 'error': str(e)})
            if not tensors:
                continue
            x = torch.cat(tensors, dim=0)
            with torch.no_grad():
                logits, attn = self.model(x)
                probas = torch.softmax(logits, dim=1)
                preds = probas.argmax(dim=1)
            for j,pth in enumerate(valid_paths):
                pred_idx = int(preds[j].item())
                results.append({
                    'audio_path': pth,
                    'predicted_class': self.class_names[pred_idx],
                    'predicted_index': pred_idx,
                    'confidence': float(probas[j,pred_idx].item()),
                    'probabilities': {name: float(probas[j,i].item()) for i,name in enumerate(self.class_names)}
                })
        return results

## Quickstart: train (optional)

Uncomment and run to train. For a fast smoke test, lower epochs and batch size. Training can be slow on CPU; enable GPU in Kaggle settings.

In [None]:
# Uncomment to run a short training session
## config['training']['num_epochs'] = 3
## config['training']['batch_size'] = 16
## model, history, loaders = train(config)
## train_loader, val_loader, test_loader = loaders
## print('Training done')

## Quickstart: evaluate a checkpoint

Point `best_ckpt_path` to your saved checkpoint (e.g., `/kaggle/working/checkpoints/checkpoint_epoch_XX_best.pth`).

In [None]:
# Example evaluation (set your checkpoint path)
best_ckpt_path = '/kaggle/working/checkpoints/checkpoint_epoch_1_best.pth'  # change me
## Uncomment to run evaluation
## if Path(best_ckpt_path).exists():
##     _, _, _, train_loader, val_loader, test_loader = get_dataloaders(config)
##     model = create_model(config, torch.device(config['device']))
##     ckpt = torch.load(best_ckpt_path, map_location=torch.device(config['device']))
##     state = ckpt['model_state_dict'] if 'model_state_dict' in ckpt else ckpt
##     model.load_state_dict(state)
##     metrics_calc = MetricsCalculator()
##     results = evaluate_model(model, test_loader, torch.device(config['device']), metrics_calc)
##     metrics_calc.print_metrics = lambda m, prefix='': print(prefix, m)
##     print(results['report'])
## else:
##     print('Checkpoint not found:', best_ckpt_path)

## Quickstart: single prediction

Use `BreathSoundPredictor` to classify one file or a folder.

In [None]:
# Example prediction (set your paths)
## predictor = BreathSoundPredictor(model_path=best_ckpt_path, config=config, device=torch.device(config['device']))
## audio_path = '/kaggle/input/your-audio.wav'
## if Path(audio_path).exists():
##     result = predictor.predict_single(audio_path)
##     print(result)
## else:
##     print('Audio file not found:', audio_path)