In [7]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.cuda.amp import autocast, GradScaler
from pathlib import Path
from PIL import Image
import torchvision.transforms as transforms
from sklearn.model_selection import StratifiedGroupKFold
from sklearn.metrics import f1_score, precision_score, recall_score, confusion_matrix, classification_report
import os
import warnings
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.auto import tqdm
warnings.filterwarnings('ignore')

SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# GPU setup
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
n_gpus = torch.cuda.device_count()
print(f"Device: {device}")
print(f"Available GPUs: {n_gpus}")
if n_gpus > 0:
    for i in range(n_gpus):
        print(f"  GPU {i}: {torch.cuda.get_device_name(i)}")
        print(f"  Memory: {torch.cuda.get_device_properties(i).total_memory / 1e9:.2f} GB")

Device: cpu
Available GPUs: 0


In [8]:
class LayerNorm2d(nn.Module):
    """LayerNorm for channels-first (NCHW) tensors."""
    def __init__(self, num_channels, eps=1e-6):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(num_channels))
        self.bias = nn.Parameter(torch.zeros(num_channels))
        self.eps = eps
    
    def forward(self, x):
        # x: (B, C, H, W)
        u = x.mean(1, keepdim=True)
        s = (x - u).pow(2).mean(1, keepdim=True)
        x = (x - u) / torch.sqrt(s + self.eps)
        x = self.weight[:, None, None] * x + self.bias[:, None, None]
        return x


class ConvNeXtBlock(nn.Module):
    """ConvNeXt Block: DWConv -> LayerNorm -> 1x1 Conv -> GELU -> 1x1 Conv -> Drop Path"""
    def __init__(self, dim, drop_path=0., layer_scale_init_value=1e-6):
        super().__init__()
        self.dwconv = nn.Conv2d(dim, dim, kernel_size=7, padding=3, groups=dim)  # Depthwise conv
        self.norm = LayerNorm2d(dim)
        self.pwconv1 = nn.Conv2d(dim, 4 * dim, kernel_size=1)  # Pointwise/1x1 conv
        self.act = nn.GELU()
        self.pwconv2 = nn.Conv2d(4 * dim, dim, kernel_size=1)
        self.gamma = nn.Parameter(layer_scale_init_value * torch.ones(dim)) if layer_scale_init_value > 0 else None
        self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity()
    
    def forward(self, x):
        shortcut = x
        x = self.dwconv(x)
        x = self.norm(x)
        x = self.pwconv1(x)
        x = self.act(x)
        x = self.pwconv2(x)
        if self.gamma is not None:
            x = self.gamma[:, None, None] * x
        x = shortcut + self.drop_path(x)
        return x


class DropPath(nn.Module):
    """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks)."""
    def __init__(self, drop_prob=None):
        super().__init__()
        self.drop_prob = drop_prob
    
    def forward(self, x):
        if self.drop_prob == 0. or not self.training:
            return x
        keep_prob = 1 - self.drop_prob
        shape = (x.shape[0],) + (1,) * (x.ndim - 1)
        random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
        random_tensor.floor_()  # Binarize
        output = x.div(keep_prob) * random_tensor
        return output


class ConvNeXt(nn.Module):
    """ConvNeXt architecture for image classification.
    
    Args:
        in_chans: Number of input image channels (default: 3)
        num_classes: Number of classes for classification head (default: 5)
        depths: Number of blocks at each stage (default: [3, 3, 9, 3] for Tiny)
        dims: Feature dimensions at each stage (default: [96, 192, 384, 768] for Tiny)
        drop_path_rate: Stochastic depth rate (default: 0.)
        layer_scale_init_value: Init value for Layer Scale (default: 1e-6)
    """
    def __init__(self, in_chans=3, num_classes=5, 
                 depths=[3, 3, 9, 3], dims=[96, 192, 384, 768],
                 drop_path_rate=0., layer_scale_init_value=1e-6):
        super().__init__()
        
        # Stem: 4x4 conv with stride 4
        self.stem = nn.Sequential(
            nn.Conv2d(in_chans, dims[0], kernel_size=4, stride=4),
            LayerNorm2d(dims[0])
        )
        
        # Build stages
        self.stages = nn.ModuleList()
        dp_rates = [x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))]
        cur = 0
        
        for i in range(4):
            # Downsampling layer (except first stage)
            if i > 0:
                downsample = nn.Sequential(
                    LayerNorm2d(dims[i-1]),
                    nn.Conv2d(dims[i-1], dims[i], kernel_size=2, stride=2)
                )
            else:
                downsample = nn.Identity()
            
            # Stage blocks
            stage = nn.Sequential(
                downsample,
                *[ConvNeXtBlock(dim=dims[i], drop_path=dp_rates[cur + j],
                               layer_scale_init_value=layer_scale_init_value)
                  for j in range(depths[i])]
            )
            self.stages.append(stage)
            cur += depths[i]
        
        # Head
        self.norm = LayerNorm2d(dims[-1])
        self.head = nn.Linear(dims[-1], num_classes)
        
        self.apply(self._init_weights)
    
    def _init_weights(self, m):
        if isinstance(m, (nn.Conv2d, nn.Linear)):
            nn.init.trunc_normal_(m.weight, std=0.02)
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)
    
    def forward(self, x):
        x = self.stem(x)
        for stage in self.stages:
            x = stage(x)
        x = self.norm(x)
        x = x.mean([-2, -1])  # Global average pooling
        x = self.head(x)
        return x


def convnext_tiny(num_classes=5, drop_path_rate=0., pretrained=False):
    """ConvNeXt-Tiny model"""
    model = ConvNeXt(depths=[3, 3, 9, 3], dims=[96, 192, 384, 768],
                     num_classes=num_classes, drop_path_rate=drop_path_rate)
    
    if pretrained:
        try:
            # Load ImageNet pretrained weights from torchvision
            import torchvision.models as models
            pretrained_model = models.convnext_tiny(weights='IMAGENET1K_V1')
            
            # Copy all weights except final classifier
            model_dict = model.state_dict()
            pretrained_dict = {k: v for k, v in pretrained_model.state_dict().items() 
                             if k in model_dict and 'head' not in k}
            model_dict.update(pretrained_dict)
            model.load_state_dict(model_dict)
            print("✓ Loaded ImageNet pretrained weights for ConvNeXt-Tiny")
        except Exception as e:
            print(f"⚠ Could not load pretrained weights: {e}")
            print("  Training from scratch...")
    
    return model


def convnext_small(num_classes=5, drop_path_rate=0., pretrained=False):
    """ConvNeXt-Small model"""
    model = ConvNeXt(depths=[3, 3, 27, 3], dims=[96, 192, 384, 768],
                     num_classes=num_classes, drop_path_rate=drop_path_rate)
    
    if pretrained:
        try:
            import torchvision.models as models
            pretrained_model = models.convnext_small(weights='IMAGENET1K_V1')
            model_dict = model.state_dict()
            pretrained_dict = {k: v for k, v in pretrained_model.state_dict().items() 
                             if k in model_dict and 'head' not in k}
            model_dict.update(pretrained_dict)
            model.load_state_dict(model_dict)
            print("✓ Loaded ImageNet pretrained weights for ConvNeXt-Small")
        except Exception as e:
            print(f"⚠ Could not load pretrained weights: {e}")
    
    return model


def convnext_base(num_classes=5, drop_path_rate=0., pretrained=False):
    """ConvNeXt-Base model"""
    model = ConvNeXt(depths=[3, 3, 27, 3], dims=[128, 256, 512, 1024],
                     num_classes=num_classes, drop_path_rate=drop_path_rate)
    
    if pretrained:
        try:
            import torchvision.models as models
            pretrained_model = models.convnext_base(weights='IMAGENET1K_V1')
            model_dict = model.state_dict()
            pretrained_dict = {k: v for k, v in pretrained_model.state_dict().items() 
                             if k in model_dict and 'head' not in k}
            model_dict.update(pretrained_dict)
            model.load_state_dict(model_dict)
            print("✓ Loaded ImageNet pretrained weights for ConvNeXt-Base")
        except Exception as e:
            print(f"⚠ Could not load pretrained weights: {e}")
    
    return model


def convnext_large(num_classes=5, drop_path_rate=0., pretrained=False):
    """ConvNeXt-Large model"""
    model = ConvNeXt(depths=[3, 3, 27, 3], dims=[192, 384, 768, 1536],
                     num_classes=num_classes, drop_path_rate=drop_path_rate)
    
    if pretrained:
        try:
            import torchvision.models as models
            pretrained_model = models.convnext_large(weights='IMAGENET1K_V1')
            model_dict = model.state_dict()
            pretrained_dict = {k: v for k, v in pretrained_model.state_dict().items() 
                             if k in model_dict and 'head' not in k}
            model_dict.update(pretrained_dict)
            model.load_state_dict(model_dict)
            print("✓ Loaded ImageNet pretrained weights for ConvNeXt-Large")
        except Exception as e:
            print(f"⚠ Could not load pretrained weights: {e}")
    
    return model


print("ConvNeXt models defined (Tiny, Small, Base, Large)")

ConvNeXt models defined (Tiny, Small, Base, Large)


# Inference on Test Dataset

In [9]:
class TestConfig:
    MODEL_PATH = Path('best_convnext_model(1).pth')
    TEST_DATA_PATH = Path('SAND_Challenge_task1_test_dataset/task1/test')
    OUTPUT_CSV = Path('submission.csv')
    
    PHONATION_TASKS = ['phonationA', 'phonationE', 'phonationI', 'phonationO', 'phonationU']
    RHYTHM_TASKS = ['rhythmPA', 'rhythmTA', 'rhythmKA']
    ALL_TASKS = PHONATION_TASKS + RHYTHM_TASKS
    
    IMAGE_SIZE = 224
    BATCH_SIZE = 32
    NUM_CLASSES = 5

test_cfg = TestConfig()
print(f"Model: {test_cfg.MODEL_PATH}")
print(f"Test data: {test_cfg.TEST_DATA_PATH}")
print(f"Output: {test_cfg.OUTPUT_CSV}")

Model: best_convnext_model(1).pth
Test data: SAND_Challenge_task1_test_dataset\task1\test
Output: submission.csv


In [10]:
import librosa
import librosa.display

SR_DIFFRES = 22050
N_MELS_DIFFRES = 256
N_FFT_DIFFRES = 2048
HOP_LENGTH_DIFFRES = 256

TASK_FREQ_RANGES = {
    'phonationA': (50, 8000),
    'phonationE': (50, 8000),
    'phonationI': (50, 8000),
    'phonationO': (50, 8000),
    'phonationU': (50, 8000),
    'rhythmPA': (50, 8000),
    'rhythmTA': (50, 8000),
    'rhythmKA': (50, 8000),
}

def extract_diffres_mel_spectrogram(y, sr, task_name):
    """Extract high temporal resolution mel-spectrogram for DiffRes method"""
    fmin, fmax = TASK_FREQ_RANGES.get(task_name, (50, 8000))
    
    S = librosa.feature.melspectrogram(
        y=y, 
        sr=sr, 
        n_fft=N_FFT_DIFFRES, 
        hop_length=HOP_LENGTH_DIFFRES,
        n_mels=N_MELS_DIFFRES,
        fmin=fmin,
        fmax=fmax,
        power=2.0
    )
    
    S_db = librosa.power_to_db(S, ref=np.max)
    
    if np.isnan(S_db).any():
        S_db = np.nan_to_num(S_db, nan=-80.0)
    
    if np.abs(S_db).max() < 1e-6:
        S_db = np.full_like(S_db, -80.0)
    
    return S_db

def extract_diffres_features(y, sr, task_name, include_deltas=True):
    """Extract DiffRes-ready features with high temporal resolution"""
    S_db = extract_diffres_mel_spectrogram(y, sr, task_name)
    
    if not include_deltas:
        return S_db
    
    try:
        delta = librosa.feature.delta(S_db, order=1)
    except:
        delta = np.zeros_like(S_db)
    
    try:
        delta2 = librosa.feature.delta(S_db, order=2)
    except:
        delta2 = np.zeros_like(S_db)
    
    enhanced = np.stack([S_db, delta, delta2], axis=0)
    
    return enhanced

def normalize_spectrogram(spec, method='per_sample'):
    """
    Normalize spectrogram for consistent appearance.
    """
    if method == 'per_sample':
        # Z-score normalization
        mean = spec.mean()
        std = spec.std()
        if std > 1e-8:
            normalized = (spec - mean) / std
        else:
            normalized = spec - mean
    elif method == 'minmax':
        # Min-max scaling
        min_val = spec.min()
        max_val = spec.max()
        if max_val - min_val > 1e-8:
            normalized = (spec - min_val) / (max_val - min_val)
        else:
            normalized = np.zeros_like(spec)
    else:
        normalized = spec
    
    return normalized

def extract_mel_spectrogram_diffres(audio_path, task_name):
    """Extract DiffRes mel spectrogram from audio file and return as PIL Image"""
    y, sr = librosa.load(audio_path, sr=SR_DIFFRES)
    
    features = extract_diffres_features(y, sr, task_name, include_deltas=True)
    
    if features.ndim == 3:
        for i in range(features.shape[0]):
            features[i] = normalize_spectrogram(features[i], method='per_sample')
    else:
        features = normalize_spectrogram(features, method='per_sample')
    
    if features.ndim == 3 and features.shape[0] >= 3:
        r = features[0]
        g = features[1]
        b = features[2]
        rgb = np.stack([r, g, b], axis=-1)
        img_array = (rgb * 255).astype(np.uint8)
        img = Image.fromarray(img_array, mode='RGB')
    elif features.ndim == 3:
        normalized = features[0]
        img_array = (normalized * 255).astype(np.uint8)
        img = Image.fromarray(img_array, mode='L').convert('RGB')
    else:
        img_array = (features * 255).astype(np.uint8)
        img = Image.fromarray(img_array, mode='L').convert('RGB')
    
    return img

print("DiffRes preprocessing functions defined")

DiffRes preprocessing functions defined


In [11]:
class TestDataset(Dataset):
    def __init__(self, test_data_path, transform=None):
        self.test_data_path = Path(test_data_path)
        self.transform = transform
        self.samples = []
        
        for task in test_cfg.ALL_TASKS:
            task_folder = self.test_data_path / task
            if task_folder.exists():
                for wav_file in task_folder.glob('*.wav'):
                    patient_id = wav_file.stem.split('_')[0]
                    self.samples.append({
                        'id': patient_id,
                        'task': task,
                        'path': wav_file
                    })
        
        self.patient_ids = sorted(list(set([s['id'] for s in self.samples])))
        print(f"Found {len(self.patient_ids)} unique patients")
        print(f"Total audio files: {len(self.samples)}")
    
    def __len__(self):
        return len(self.patient_ids)
    
    def __getitem__(self, idx):
        patient_id = self.patient_ids[idx]
        patient_samples = [s for s in self.samples if s['id'] == patient_id]
        
        images = []
        for sample in patient_samples:
            mel_image = extract_mel_spectrogram_diffres(sample['path'], sample['task'])
            if self.transform:
                mel_image = self.transform(mel_image)
            images.append(mel_image)
        
        images = torch.stack(images)
        return patient_id, images

test_transform = transforms.Compose([
    transforms.Resize((test_cfg.IMAGE_SIZE, test_cfg.IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

print("Test dataset class defined")

Test dataset class defined


In [12]:
def load_model(model_path, device):
    """Load trained model from checkpoint"""
    checkpoint = torch.load(model_path, map_location=device)
    
    model = convnext_base(
        num_classes=test_cfg.NUM_CLASSES,
        drop_path_rate=0.1,
        pretrained=False
    )
    
    if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint:
        state_dict = checkpoint['model_state_dict']
    else:
        state_dict = checkpoint
    
    new_state_dict = {}
    for k, v in state_dict.items():
        if k.startswith('module.'):
            new_state_dict[k[7:]] = v
        else:
            new_state_dict[k] = v
    
    model.load_state_dict(new_state_dict)
    model.to(device)
    model.eval()
    
    print(f"Model loaded from {model_path}")
    
    if isinstance(checkpoint, dict):
        if 'epoch' in checkpoint:
            print(f"Trained for {checkpoint['epoch']} epochs")
        if 'best_f1' in checkpoint:
            print(f"Best F1 score: {checkpoint['best_f1']:.4f}")
    
    return model

model = load_model(test_cfg.MODEL_PATH, device)
print("Model ready for inference")

Model loaded from best_convnext_model(1).pth
Model ready for inference


In [13]:
test_dataset = TestDataset(test_cfg.TEST_DATA_PATH, transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=0)

print(f"Test dataset: {len(test_dataset)} patients")

Found 67 unique patients
Total audio files: 536
Test dataset: 67 patients


In [14]:
def predict_test_data(model, test_loader, device):
    """Run inference on test data"""
    predictions = []
    
    with torch.no_grad():
        for patient_id, images in tqdm(test_loader, desc="Predicting"):
            images = images.squeeze(0).to(device)
            
            outputs = model(images)
            probs = torch.softmax(outputs, dim=1)
            avg_probs = probs.mean(dim=0)
            pred_class = avg_probs.argmax().item() + 1
            
            predictions.append({
                'ID': patient_id[0].replace('ID', ''),
                'CLASS': pred_class
            })
    
    return predictions

predictions = predict_test_data(model, test_loader, device)
print(f"Predictions completed for {len(predictions)} patients")

Predicting:   0%|          | 0/67 [00:00<?, ?it/s]

Predictions completed for 67 patients


In [15]:
submission_df = pd.DataFrame(predictions)
submission_df = submission_df.sort_values('ID').reset_index(drop=True)
submission_df.to_csv(test_cfg.OUTPUT_CSV, index=False)

print(f"Submission saved to {test_cfg.OUTPUT_CSV}")
print(submission_df.head(10))

Submission saved to submission.csv
    ID  CLASS
0  004      2
1  011      2
2  014      3
3  019      2
4  020      3
5  022      3
6  031      3
7  032      3
8  039      2
9  043      2


In [16]:
print(submission_df['CLASS'].value_counts().sort_index())
print(f"\nTotal predictions: {len(submission_df)}")

CLASS
2    39
3    28
Name: count, dtype: int64

Total predictions: 67
