In [3]:

import os
import math
import logging
import warnings
import torch
import torchaudio
import torch.nn.functional as F
import librosa
import numpy as np
import pandas as pd
import tensorflow as tf
import whisper
from joblib import load
from utils import ClinicalFeatureExtractor, EnhancedDementiaCNNBiLSTM
from eval_utils import (
    load_ground_truth, 
    perform_error_analysis, 
    save_results, 
    compare_models
)
# Suppress warnings
warnings.filterwarnings('ignore')
# Logger setup
logger = logging.getLogger(__name__)

### **Config**

In [4]:
# Audio processing configuration
AUDIO_CONFIG = {
    'sr': 16000,
    'n_mels': 70,
    'chunk_length': 5.0,    # in seconds
    'chunk_overlap': 2.0    # in seconds
}

# Model paths
MODEL_PATHS = {
    'bilstm': 'mybest_model.pth',
    'andy': 'models/model.tflite',
    'liam_model': 'models/tfidf_logistic.joblib',
    'liam_vectorizer': 'models/tfidf_vectorizer.joblib'
}

# Data paths
DATA_PATHS = {
    'test_audio_dir': "D:/2025/ADReSS-2020/ADReSS-IS2020-test/ADReSS-IS2020-data/test/Full_wave_enhanced_audio",
    'ground_truth': 'test_results.txt'
}

# Clinical feature dimension for BiLSTM
CLINICAL_FEATURE_DIM = 18

# Risk interpretation thresholds
RISK_THRESHOLDS = {
    'high': 0.7,
    'moderate': 0.5
}

### **Utils**

In [5]:
def load_audio(file_path, target_sr=16000):
    """Load and preprocess audio file"""
    waveform, sample_rate = torchaudio.load(file_path)
    waveform = torch.mean(waveform, dim=0)
    if sample_rate != target_sr:
        resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=target_sr)
        waveform = resampler(waveform)
    return waveform


def extract_fbank(waveform, config):
    """Extract acoustic features (FBank)"""
    fbank = torchaudio.compliance.kaldi.fbank(
        waveform.unsqueeze(0),
        num_mel_bins=config['n_mels'],
        sample_frequency=config['sr']
    )
    fbank = (fbank - fbank.mean(dim=0)) / (fbank.std(dim=0) + 1e-6)
    return fbank


def extract_clinical_features(waveform):
    """Extract clinical features from waveform"""
    clinical_extractor = ClinicalFeatureExtractor()
    clinical_features_dict = clinical_extractor.extract_all_features(waveform)
    
    # Convert to tensor and handle any inf/nan values
    clinical_values = []
    for key in sorted(clinical_features_dict.keys()):  # Ensure consistent ordering
        value = clinical_features_dict[key]
        if np.isinf(value) or np.isnan(value):
            value = 0.0
        clinical_values.append(value)
    
    clinical_features = torch.tensor(clinical_values, dtype=torch.float32)
    return clinical_features


def chunk_fbank(fbank, config):
    """Split acoustic features into chunks"""
    chunk_frames = int(config['chunk_length'] * (config['sr'] / 160))
    overlap_frames = int(config['chunk_overlap'] * (config['sr'] / 160))
    stride = chunk_frames - overlap_frames
    chunks = []
    n_frames = fbank.shape[0]
    for start in range(0, n_frames, stride):
        end = start + chunk_frames
        chunk = fbank[start:end]
        if chunk.shape[0] < chunk_frames:
            pad_size = chunk_frames - chunk.shape[0]
            chunk = F.pad(chunk, (0, 0, 0, pad_size))
        chunks.append(chunk)
    return torch.stack(chunks)


def extract_mfcc_from_file(audio_file_path, num_mfcc=13, n_fft=2048, hop_length=512, num_segments=10):
    """
    Extracts MFCC segments from a single audio file for Andy's model.
    Assumes the audio file is 30 seconds long.
    """
    SAMPLE_RATE = 22050
    TRACK_DURATION = 30  # seconds
    SAMPLES_PER_TRACK = SAMPLE_RATE * TRACK_DURATION

    # Load the audio file
    signal, sample_rate = librosa.load(audio_file_path, sr=SAMPLE_RATE)
    
    # Pad or trim the signal to exactly 30 seconds
    if len(signal) < SAMPLES_PER_TRACK:
        signal = np.pad(signal, (0, SAMPLES_PER_TRACK - len(signal)), mode='constant')
    else:
        signal = signal[:SAMPLES_PER_TRACK]

    samples_per_segment = int(SAMPLES_PER_TRACK / num_segments)
    num_mfcc_vectors_per_segment = math.ceil(samples_per_segment / hop_length)
    
    mfcc_segments = []
    
    # Process each segment
    for d in range(num_segments):
        start = samples_per_segment * d
        finish = start + samples_per_segment

        mfcc = librosa.feature.mfcc(y=signal[start:finish],
                                    sr=sample_rate,
                                    n_mfcc=num_mfcc,
                                    n_fft=n_fft,
                                    hop_length=hop_length)
        mfcc = mfcc.T  # shape: (time, num_mfcc)
        if len(mfcc) == num_mfcc_vectors_per_segment:
            mfcc_segments.append(mfcc)
    
    return mfcc_segments

### **BiLSTM Model implementation and inference**

In [6]:
class BiLSTMInference:
    def __init__(self, model_path, device, clinical_feature_dim=18):
        self.device = device
        self.model = self._load_model(model_path, clinical_feature_dim)
        
    def _load_model(self, model_path, clinical_feature_dim):
        """Load the enhanced model with clinical features"""
        try:
            model = EnhancedDementiaCNNBiLSTM(
                use_clinical_features=True, 
                clinical_feature_dim=clinical_feature_dim
            ).to(self.device)
            checkpoint = torch.load(model_path, map_location=self.device, weights_only=False)
            model.load_state_dict(checkpoint)
            print("✓ Enhanced BiLSTM model with clinical features loaded successfully")
            return model
        except FileNotFoundError:
            print("⚠ BiLSTM model not found")
            raise
    
    def _run_inference(self, chunks, clinical_features, threshold=0.5):
        """Run inference with both acoustic and clinical features"""
        self.model.eval()
        chunks = chunks.to(self.device)
        
        # Repeat clinical features for each chunk
        num_chunks = len(chunks)
        clinical_features_repeated = clinical_features.unsqueeze(0).repeat(num_chunks, 1).to(self.device)
        
        with torch.no_grad():
            outputs = self.model(chunks, clinical_features_repeated)
        
        avg_output = outputs.mean(dim=0)
        probability = torch.sigmoid(avg_output).item()
        pred_label = 1 if probability >= threshold else 0
        return probability, pred_label
    
    def process_file(self, file_path, config):
        """Process a single audio file"""
        # Load audio and extract features
        waveform = load_audio(file_path, target_sr=config['sr'])
        fbank = extract_fbank(waveform, config)
        clinical_features = extract_clinical_features(waveform)
        chunks = chunk_fbank(fbank, config)
        
        # Run inference
        probability, pred_label = self._run_inference(chunks, clinical_features)
        return probability, pred_label
    
    def benchmark_on_dataset(self, test_audio_dir, ground_truth_df, config):
        """Benchmark the model on the test dataset"""
        results = []
        
        for idx, row in ground_truth_df.iterrows():
            file_id = row['ID'].strip()
            gt_label = int(row['Label'])
            audio_file = f"{test_audio_dir}/{file_id}.wav"
            
            try:
                prob, pred_label = self.process_file(audio_file, config)
                results.append({
                    'ID': file_id,
                    'GroundTruth': gt_label,
                    'PredictedLabel': pred_label,
                    'PredictedProbability': prob
                })
            except Exception as e:
                print(f"Error processing {file_id}: {e}")
                continue
        
        results_df = pd.DataFrame(results)
        return results_df

### **Andy's TFLite Model implementation and inference**

In [7]:
class AndyInference:
    def __init__(self, model_path):
        self.model_path = model_path
        self.mapping = {0: "dementia", 1: "control"}
        
    def _predict(self, interpreter, X, input_details, output_details):
        """Runs inference on a single MFCC segment"""
        X = X[np.newaxis, ...].astype(np.float32)
        interpreter.set_tensor(input_details[0]['index'], X)
        interpreter.invoke()
        prediction = interpreter.get_tensor(output_details[0]['index'])
        return prediction[0]
    
    def process_file(self, audio_file_path, num_segments=10):
        """Process a single audio file"""
        mfcc_segments = extract_mfcc_from_file(audio_file_path, num_segments=num_segments)
        if not mfcc_segments:
            logger.error(f"No MFCC data extracted from: {audio_file_path}")
            return None, None
        
        interpreter = tf.lite.Interpreter(model_path=self.model_path)
        interpreter.allocate_tensors()
        input_details = interpreter.get_input_details()
        output_details = interpreter.get_output_details()
        
        predictions_list = []
        for segment in mfcc_segments:
            proba = self._predict(interpreter, segment, input_details, output_details)
            predictions_list.append(proba)
        
        avg_prediction = np.mean(predictions_list, axis=0)
        predicted_index = int(np.argmax(avg_prediction))
        predicted_class = self.mapping.get(predicted_index, "Unknown")
        predicted_probability = avg_prediction[predicted_index] * 100
        
        return predicted_class, predicted_probability
    
    def benchmark_on_dataset(self, test_audio_dir, ground_truth_df):
        """Benchmark the model on the test dataset"""
        results = []
        
        for file in os.listdir(test_audio_dir):
            if file.endswith('.wav'):
                file_path = os.path.join(test_audio_dir, file)
                prediction = self.process_file(file_path, num_segments=10)
                if prediction[0] is None:
                    continue
                
                predicted_class, predicted_probability = prediction
                file_id = os.path.splitext(file)[0]
                results.append({
                    'ID': file_id,
                    'predicted_class': predicted_class,
                    'predicted_probability': round(predicted_probability, 2)
                })
        
        results_df = pd.DataFrame(results)
        
        # Merge with ground truth
        ground_truth_df = ground_truth_df.copy()
        ground_truth_df['Label'] = ground_truth_df['Label'].replace({0: 'control', 1: 'dementia'})
        ground_truth_df['ID'] = ground_truth_df['ID'].str.strip()
        results_df['ID'] = results_df['ID'].str.strip()
        
        merged_df = pd.merge(ground_truth_df, results_df, on='ID', how='inner')
        return merged_df

### **Liam's Text Classification Model implementation and inference**

In [8]:
class LiamInference:
    def __init__(self, model_path, vectorizer_path, whisper_model_size="base"):
        self.model = load(model_path)
        self.vectorizer = load(vectorizer_path)
        self.whisper_model = whisper.load_model(whisper_model_size)
        self.mapping = {0: "control", 1: "dementia"}
        
    def _classify_text(self, input_text):
        """Cleans input text, vectorizes it, and classifies it"""
        # Convert to lowercase and remove non-alphabetic characters
        cleaned_input = ''.join([char for char in input_text.lower() if char.isalpha() or char.isspace()])
        # Vectorize the cleaned text
        input_vectorized = self.vectorizer.transform([cleaned_input])
        # Predict class and probabilities
        prediction = self.model.predict(input_vectorized)
        predict_proba = self.model.predict_proba(input_vectorized)
        probability = round(predict_proba[0][prediction[0]] * 100, 2)
        return prediction[0], probability
    
    def _transcribe_audio(self, audio_path):
        """Uses Whisper model to transcribe the given audio file"""
        result = self.whisper_model.transcribe(audio_path)
        return result["text"]
    
    def process_file(self, audio_file_path):
        """Process a single audio file"""
        transcription = self._transcribe_audio(audio_file_path)
        pred, prob = self._classify_text(transcription)
        predicted_class = self.mapping.get(pred, "Unknown")
        return predicted_class, prob, transcription
    
    def benchmark_on_dataset(self, test_audio_dir, ground_truth_df):
        """Benchmark the model on the test dataset"""
        predictions = []
        
        for file in os.listdir(test_audio_dir):
            if file.endswith('.wav'):
                file_path = os.path.join(test_audio_dir, file)
                predicted_class, prob, transcription = self.process_file(file_path)
                file_id = os.path.splitext(file)[0].strip()
                predictions.append({
                    "ID": file_id,
                    "predicted_class": predicted_class,
                    "predicted_probability": prob,
                    "transcription": transcription
                })
        
        results_df = pd.DataFrame(predictions)
        
        # Merge with ground truth
        ground_truth_df = ground_truth_df.copy()
        ground_truth_df['Label'] = ground_truth_df['Label'].replace({0: 'control', 1: 'dementia'})
        ground_truth_df['ID'] = ground_truth_df['ID'].str.strip()
        results_df['ID'] = results_df['ID'].str.strip()
        
        merged_df = pd.merge(ground_truth_df, results_df, on='ID', how='inner')
        return merged_df

### **Benchmark on the Testing set (ADReSSO-2020)**

In [9]:
def main():
    """Main benchmarking pipeline"""
    print("🚀 Starting Model Benchmarking Pipeline")
    print("="*60)
    
    # Setup device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    # Load ground truth
    print("📊 Loading ground truth data...")
    ground_truth_df = load_ground_truth(DATA_PATHS['ground_truth'])
    print(f"✓ Loaded {len(ground_truth_df)} test samples")
    
    # Initialize results storage
    all_results = {}
    
    # ==================== BiLSTM MODEL ====================
    print("\n🔄 Testing BiLSTM Model...")
    try:
        bilstm = BiLSTMInference(
            MODEL_PATHS['bilstm'], 
            device, 
            CLINICAL_FEATURE_DIM
        )
        bilstm_results = bilstm.benchmark_on_dataset(
            DATA_PATHS['test_audio_dir'], 
            ground_truth_df, 
            AUDIO_CONFIG
        )
        save_results(bilstm_results, "bilstm_predictions.csv", "BiLSTM")
        perform_error_analysis(bilstm_results, "BiLSTM", 'GroundTruth', 'PredictedLabel')
        all_results['bilstm'] = bilstm_results
        
    except Exception as e:
        print(f"❌ BiLSTM model failed: {e}")
        all_results['bilstm'] = None
    
    # ==================== ANDY MODEL ====================
    print("\n🔄 Testing Andy's Model...")
    try:
        andy = AndyInference(MODEL_PATHS['andy'])
        andy_results = andy.benchmark_on_dataset(
            DATA_PATHS['test_audio_dir'], 
            ground_truth_df
        )
        save_results(andy_results, "andy_predictions.csv", "Andy")
        perform_error_analysis(andy_results, "Andy", 'Label', 'predicted_class')
        all_results['andy'] = andy_results
        
    except Exception as e:
        print(f"❌ Andy's model failed: {e}")
        all_results['andy'] = None
    
    # ==================== LIAM MODEL ====================
    print("\n🔄 Testing Liam's Model...")
    try:
        liam = LiamInference(
            MODEL_PATHS['liam_model'], 
            MODEL_PATHS['liam_vectorizer']
        )
        liam_results = liam.benchmark_on_dataset(
            DATA_PATHS['test_audio_dir'], 
            ground_truth_df
        )
        save_results(liam_results, "liam_predictions.csv", "Liam")
        perform_error_analysis(liam_results, "Liam", 'Label', 'predicted_class')
        all_results['liam'] = liam_results
        
    except Exception as e:
        print(f"❌ Liam's model failed: {e}")
        all_results['liam'] = None
    
    # ==================== COMPARISON ====================
    if any(result is not None for result in all_results.values()):
        comparison_df = compare_models(
            all_results['bilstm'],
            all_results['andy'], 
            all_results['liam']
        )
        save_results(comparison_df, "model_comparison.csv", "Comparison")
    
    print("\n✅ Benchmarking Complete!")


if __name__ == "__main__":
    main()

🚀 Starting Model Benchmarking Pipeline
Using device: cpu
📊 Loading ground truth data...
✓ Loaded 48 test samples

🔄 Testing BiLSTM Model...
✓ Enhanced BiLSTM model with clinical features loaded successfully
✓ BiLSTM predictions saved to bilstm_predictions.csv

BiLSTM Model Error Analysis
Overall Accuracy: 77.08%

Confusion Matrix:
[[19  5]
 [ 6 18]]

Classification Report:
              precision    recall  f1-score   support

           0       0.76      0.79      0.78        24
           1       0.78      0.75      0.77        24

    accuracy                           0.77        48
   macro avg       0.77      0.77      0.77        48
weighted avg       0.77      0.77      0.77        48


False Positives: 5
Files wrongly classified as positive:
['S170' 'S177' 'S178' 'S197' 'S199']

False Negatives: 6
Files wrongly classified as negative:
['S164' 'S167' 'S181' 'S187' 'S200' 'S205']

🔄 Testing Andy's Model...
✓ Andy predictions saved to andy_predictions.csv

Andy Model Error Analys