In [4]:
import os
import numpy as np
import pandas as pd
import librosa
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, BatchNormalization, ReLU, Reshape, Bidirectional, LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import StratifiedGroupKFold
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report
from tqdm import tqdm

# --- CONFIGURATION ---
DATASET_DIR = 'dataset' 
SAVE_DIR = 'results_dual_pipeline'
os.makedirs(SAVE_DIR, exist_ok=True)

SAMPLE_RATE = 16000
FRAME_LENGTH = 0.025
FRAME_STRIDE = 0.010
N_MFCC = 13
N_COCHLEAGRAM_BANDS = 64
F_MIN = 100
F_MAX = 2500
MAX_PAD_LEN = 500
BATCH_SIZE = 32
EPOCHS = 30

def get_label(crackle, wheeze):
    if crackle == 0 and wheeze == 0: return 'Normal'
    if crackle == 1 and wheeze == 0: return 'Crackle'
    if crackle == 0 and wheeze == 1: return 'Wheeze'
    return 'Wheeze'

# --- 1. AUGMENTATION & FEATURE EXTRACTION ---

def augment_audio(y, sr):
    """
    Generates 3 augmented versions of the original audio segment.
    """
    augmented_versions = []
    
    # A. Time Stretch
    try:
        rate = np.random.uniform(0.8, 1.2)
        y_stretch = librosa.effects.time_stretch(y, rate=rate)
        # Fix length back to original
        if len(y_stretch) > len(y):
            y_stretch = y_stretch[:len(y)]
        else:
            y_stretch = np.pad(y_stretch, (0, len(y) - len(y_stretch)))
        augmented_versions.append(y_stretch)
    except:
        pass # Skip if stretch fails on tiny clips
    
    # B. Pitch Shift
    try:
        steps = np.random.uniform(-2, 2)
        y_shift = librosa.effects.pitch_shift(y, sr=sr, n_steps=steps)
        augmented_versions.append(y_shift)
    except:
        pass

    # C. Noise Injection
    try:
        noise_amp = 0.005 * np.random.uniform() * np.amax(y)
        y_noise = y.astype('float64') + noise_amp * np.random.normal(size=y.shape[0])
        augmented_versions.append(y_noise)
    except:
        pass
    
    return augmented_versions

def extract_dual_features(y, sr):
    """
    Extracts Stacked MFCCs + Cochleogram from a raw audio array (y).
    """
    # Padding/Truncating
    target_len = int(MAX_PAD_LEN * (FRAME_STRIDE * SAMPLE_RATE))
    if len(y) < target_len:
        y = np.pad(y, (0, target_len - len(y)))
    else:
        y = y[:target_len]

    # Pipeline A: MFCCs
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=N_MFCC, n_fft=int(FRAME_LENGTH*sr), hop_length=int(FRAME_STRIDE*sr))
    delta_mfcc = librosa.feature.delta(mfcc)
    delta2_mfcc = librosa.feature.delta(mfcc, order=2)
    mfcc_combined = np.concatenate((mfcc, delta_mfcc, delta2_mfcc), axis=0)

    # Pipeline B: Cochleogram (Mel-Spectrogram)
    cochleogram = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=N_COCHLEAGRAM_BANDS, fmin=F_MIN, fmax=F_MAX, n_fft=int(FRAME_LENGTH*sr), hop_length=int(FRAME_STRIDE*sr))
    cochleogram = librosa.power_to_db(cochleogram, ref=np.max)

    # Fusion
    min_time = min(mfcc_combined.shape[1], cochleogram.shape[1])
    mfcc_combined = mfcc_combined[:, :min_time]
    cochleogram = cochleogram[:, :min_time]
    fused_features = np.concatenate((mfcc_combined, cochleogram), axis=0)
    
    # Pad to MAX_PAD_LEN (Time Axis)
    if fused_features.shape[1] < MAX_PAD_LEN:
        pad_width = MAX_PAD_LEN - fused_features.shape[1]
        fused_features = np.pad(fused_features, ((0,0), (0, pad_width)), mode='constant')
    else:
        fused_features = fused_features[:, :MAX_PAD_LEN]
        
    # Z-Score Normalization
    mean = np.mean(fused_features)
    std = np.std(fused_features)
    fused_features = (fused_features - mean) / (std + 1e-6)
    
    return fused_features

def prepare_dataset(dataset_dir):
    features_list = []
    labels_list = []
    groups_list = []
    
    txt_files = [f for f in os.listdir(dataset_dir) if f.endswith('.txt')]
    print(f"Processing {len(txt_files)} files with augmentation...")
    
    for txt_file in tqdm(txt_files):
        base_name = txt_file.split('.')[0]
        wav_file = os.path.join(dataset_dir, base_name + '.wav')
        if not os.path.exists(wav_file): continue
        
        # Load Full Audio Once
        try:
            full_audio, sr = librosa.load(wav_file, sr=SAMPLE_RATE)
        except Exception as e:
            print(f"Failed to load {wav_file}: {e}")
            continue

        df_ann = pd.read_csv(os.path.join(dataset_dir, txt_file), sep='\t', header=None, names=['start', 'end', 'crackle', 'wheeze'])
        
        for _, row in df_ann.iterrows():
            if (row['end'] - row['start']) < 0.2: continue 
            
            start_sample = int(row['start'] * sr)
            end_sample = int(row['end'] * sr)
            
            # Boundary Check
            if end_sample > len(full_audio): end_sample = len(full_audio)
            if start_sample >= end_sample: continue

            # Extract Segment
            y_segment = full_audio[start_sample:end_sample]
            label = get_label(row['crackle'], row['wheeze'])
            
            try:
                # 1. Original (Corrected Call)
                feat = extract_dual_features(y_segment, sr)
                features_list.append(feat)
                labels_list.append(label)
                groups_list.append(base_name)
                
                # 2. Augmented (Corrected Call)
                augmented_segments = augment_audio(y_segment, sr)
                for aug_y in augmented_segments:
                    aug_feat = extract_dual_features(aug_y, sr)
                    features_list.append(aug_feat)
                    labels_list.append(label)
                    groups_list.append(base_name)
                    
            except Exception as e:
                print(f"Error processing segment in {base_name}: {e}")
                
    return np.array(features_list), np.array(labels_list), np.array(groups_list)

# --- 2. BUILD MODEL ---
def build_hybrid_model(input_shape, num_classes):
    inputs = Input(shape=input_shape)
    
    # CNN
    x = Conv2D(32, kernel_size=(3, 3), padding='same')(inputs)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    
    x = Conv2D(64, kernel_size=(3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    
    # Reshape for LSTM: (Batch, Time, Features)
    # New Time = Original Time / 4 (due to 2 MaxPools)
    # New Feat = Original Feat / 4 * Filters
    target_shape = (x.shape[2], x.shape[1] * x.shape[3]) # (Time, Freq*Filters)
    x = Reshape(target_shape)(x)
    
    # BiLSTM
    x = Bidirectional(LSTM(64, return_sequences=True))(x)
    x = Bidirectional(LSTM(32))(x)
    
    # Dense
    x = Dense(64, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(num_classes, activation='softmax')(x)
    
    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer=Adam(learning_rate=0.001),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    return model

# --- 3. EXECUTION ---
if __name__ == "__main__":
    print("--- 1. Extracting Dual Features (With Augmentation) ---")
    X, y_raw, groups = prepare_dataset(DATASET_DIR)
    
    # Safety Check
    if len(X) == 0:
        print("CRITICAL: No data loaded. Check paths.")
        exit()

    # Encode Labels
    le = LabelEncoder()
    y_encoded = le.fit_transform(y_raw)
    y_categorical = to_categorical(y_encoded)
    
    # Reshape X for CNN: (Batch, Height, Width, Channels)
    # Height=103, Width=500
    X = X[..., np.newaxis]
    
    print(f"Data Shape: {X.shape}")
    print(f"Labels: {le.classes_}")
    
    # Grouped CV
    kfold = StratifiedGroupKFold(n_splits=5)
    
    fold_acc = []
    
    print("\n--- 2. Starting 5-Fold Grouped Cross Validation ---")
    
    for fold, (train_idx, val_idx) in enumerate(kfold.split(X, y_encoded, groups=groups)):
        print(f"\nTraining Fold {fold+1}...")
        
        X_train, X_val = X[train_idx], X[val_idx]
        y_train, y_val = y_categorical[train_idx], y_categorical[val_idx]
        
        model = build_hybrid_model(input_shape=(X.shape[1], X.shape[2], 1), num_classes=len(le.classes_))
        
        model.fit(X_train, y_train, validation_data=(X_val, y_val),
                  epochs=EPOCHS, batch_size=BATCH_SIZE, verbose=1)
        
        # Evaluate
        val_loss, val_acc = model.evaluate(X_val, y_val, verbose=0)
        fold_acc.append(val_acc)
        print(f"Fold {fold+1} Accuracy: {val_acc:.4f}")

    print("\n--- Cross-Validation Results ---")
    print(f"Average Accuracy: {np.mean(fold_acc):.4f}")
    
    model.save(os.path.join(SAVE_DIR, 'dual_pipeline_model_final.h5'))

--- 1. Extracting Dual Features (With Augmentation) ---
Processing 193 files with augmentation...


100%|██████████| 193/193 [00:35<00:00,  5.42it/s]


Data Shape: (3520, 103, 500, 1)
Labels: ['Crackle' 'Normal' 'Wheeze']

--- 2. Starting 5-Fold Grouped Cross Validation ---

Training Fold 1...
Epoch 1/30
[1m88/88[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 533ms/step - accuracy: 0.5132 - loss: 1.0208 - val_accuracy: 0.5085 - val_loss: 1.0875
Epoch 2/30
[1m88/88[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 540ms/step - accuracy: 0.5060 - loss: 1.0050 - val_accuracy: 0.5085 - val_loss: 1.0720
Epoch 3/30
[1m88/88[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m47s[0m 535ms/step - accuracy: 0.5231 - loss: 0.9827 - val_accuracy: 0.5056 - val_loss: 1.0713
Epoch 4/30
[1m88/88[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 540ms/step - accuracy: 0.5455 - loss: 0.9738 - val_accuracy: 0.5085 - val_loss: 1.1246
Epoch 5/30
[1m88/88[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m47s[0m 529ms/step - accuracy: 0.5569 - loss: 0.9652 - val_accuracy: 0.4718 - val_loss: 1.0330
Epoch 6/30
[1m88/88[0m [32m━━━━━━━━━━━━



Fold 5 Accuracy: 0.6221

--- Cross-Validation Results ---
Average Accuracy: 0.5658
