In [22]:
import os
import numpy as np
import librosa
import python_speech_features
from os import listdir
from os.path import isdir, join
import random
from collections import defaultdict, Counter
from sklearn.model_selection import KFold

## Audio Feature Extraction Functions

Utility functions for processing audio files and extracting MFCC (Mel-Frequency Cepstral Coefficients) features.

### `signal_to_mfcc()`
Converts raw audio signal to MFCC features using specified parameters:
- **Window length:** 25ms
- **Window step:** 20ms
- **Number of coefficients:** 13
- **Frequency range:** 300-4000 Hz (optimized for speech)
- **Returns:** Transposed MFCC matrix (features × time)

### `calc_mfcc()`
Loads audio file and extracts MFCC features:
- Loads audio at 8kHz sample rate using librosa
- Applies `signal_to_mfcc()` for feature extraction

### `chunk_audio_signal()`
Splits long audio into fixed-duration chunks (default: 1 second):
- Prevents variable-length inputs
- Each chunk processed independently
- **Returns:** List of MFCC features per chunk

### `extract_features()`
Batch feature extraction for dataset:
- Processes multiple audio files
- Filters to fixed MFCC length (default: 50 frames)
- **Returns:** Features (x), labels (y), and speaker IDs
- **Input:** List of tuples: `(folder, filename, speaker_id, label)`

**Purpose:** Standardizes audio preprocessing pipeline for consistent model input across all experiments.

In [27]:
def signal_to_mfcc(signal, sample_rate=8000, num_mfcc=13):
    """Convert audio signal to MFCC features."""
    mfccs = python_speech_features.base.mfcc(
        signal,
        samplerate=sample_rate,
        winlen=0.025,
        winstep=0.020,
        numcep=num_mfcc,
        nfilt=20,
        nfft=256,
        lowfreq=300,
        highfreq=4000,
        preemph=0.0,
        ceplifter=0,
        appendEnergy=True,
        winfunc=np.hanning
    )
    return mfccs.transpose()


def calc_mfcc(audio_path, sample_rate=8000, num_mfcc=13):
    """Extract MFCC features from audio file."""
    signal, _ = librosa.load(audio_path, sr=sample_rate)
    return signal_to_mfcc(signal, sample_rate, num_mfcc)


def chunk_audio_signal(signal, sample_rate=8000, chunk_duration=1.0):
    """
    Chunk audio signal into fixed-duration segments.
    Returns list of MFCC features for each chunk.
    """
    chunk_length = int(sample_rate * chunk_duration)
    
    chunks = []
    for i in range(0, len(signal) - chunk_length + 1, chunk_length):
        chunk = signal[i:i + chunk_length]
        mfccs = signal_to_mfcc(chunk, sample_rate)
        chunks.append(mfccs)
    
    return chunks


def extract_features(dataset_path, file_data, len_mfcc=50):
    """
    Extract features for list of files with speaker and label info.
    file_data: list of (target_folder, filename, speaker_id, label)
    Returns: x, y, speaker_ids
    """
    x, y, speaker_ids = [], [], []
    for target_folder, filename, speaker_id, label in file_data:
        path = join(dataset_path, target_folder, filename)
        mfccs = calc_mfcc(path)
        if mfccs.shape[1] == len_mfcc:
            x.append(mfccs)
            y.append(label)
            speaker_ids.append(speaker_id)
    return x, y, speaker_ids

## Preprocess Background Noise Files

Special preprocessing for background noise audio files to handle their variable length.

**Problem:** Background noise files are much longer than command utterances, causing class imbalance.

**Solution:** 
1. Chunk each background noise file into 1-second segments
2. Treat each chunk as an independent sample
3. Assign unique virtual speaker ID to each chunk (e.g., `bg_chunk_0`, `bg_chunk_1`)

**Process:**
- Loads all files from `_background_noise_` folder
- Splits each file into fixed-duration chunks
- Filters chunks to match required MFCC length (50 frames)
- Creates synthetic samples with virtual speaker IDs

**Returns:** List of tuples: `(target, virtual_speaker_id, mfccs, label)`

**Why this matters:**
- Increases background noise samples for balanced training
- Prevents overfitting on limited background noise files
- Each chunk represents unique acoustic context

**Output:** Prints number of chunks created from original background noise files.

In [28]:
def preprocess_background_noise(dataset_path, data_by_speaker_target, bg_label_index, len_mfcc=50):
    """
    Chunk all background noise files upfront and create virtual samples.
    Each chunk is treated as an independent sample with unique ID.
    Returns: List of (virtual_speaker_id, mfccs, label) for each chunk
    """
    bg_keys = [(target, spk) for target, spk in data_by_speaker_target.keys() 
               if target == '_background_noise_']
    
    if not bg_keys:
        return []
    
    bg_chunks = []
    chunk_counter = 0
    
    print(f"Preprocessing {len(bg_keys)} background noise files...")
    
    for target, speaker_id in bg_keys:
        for filename, label, _ in data_by_speaker_target[(target, speaker_id)]:
            path = join(dataset_path, target, filename)
            signal, _ = librosa.load(path, sr=8000)
            
            mfcc_chunks = chunk_audio_signal(signal)
            
            for mfccs in mfcc_chunks:
                if mfccs.shape[1] == len_mfcc:
                    virtual_speaker_id = f"bg_chunk_{chunk_counter}"
                    bg_chunks.append((target, virtual_speaker_id, mfccs, label))
                    chunk_counter += 1
    
    print(f"Created {len(bg_chunks)} background noise chunks from {len(bg_keys)} files")
    return bg_chunks

In [29]:
def load_speech_data(dataset_path):
    """
    Load speech command data organized by (target, speaker_id).
    Returns: target_list, data_by_speaker_target, bg_label_index
    """
    target_list = [name for name in listdir(dataset_path) 
                   if isdir(join(dataset_path, name)) and name != '.ipynb_checkpoints']
    
    data_by_speaker_target = defaultdict(list)
    bg_label_index = None
    
    for target_idx, target in enumerate(target_list):
        files = [f for f in listdir(join(dataset_path, target)) if f.endswith('.wav')]
        
        is_background = (target == '_background_noise_')
        if is_background:
            bg_label_index = target_idx
        
        for file in files:
            speaker_id = file.split('_')[0]
            data_by_speaker_target[(target, speaker_id)].append((file, target_idx, is_background))
    
    return target_list, data_by_speaker_target, bg_label_index

In [32]:
def create_k_folds(data_by_speaker_target, bg_chunks, n_splits=10, val_ratio=0.1, seed=42):
    """
    Create K-fold splits respecting speaker IDs.
    Background noise chunks are treated as individual speakers.
    """
    non_bg_keys = [(target, spk) for target, spk in data_by_speaker_target.keys() 
                   if target != '_background_noise_']
    
    bg_speaker_keys = [(chunk[0], chunk[1]) for chunk in bg_chunks] 
    
    all_speaker_keys = non_bg_keys + bg_speaker_keys
    
    random.seed(seed)
    random.shuffle(all_speaker_keys)
    
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=seed)
    folds = []
    
    for fold_idx, (train_val_idx, test_idx) in enumerate(kf.split(all_speaker_keys)):
        train_val_keys = [all_speaker_keys[i] for i in train_val_idx]
        test_keys = [all_speaker_keys[i] for i in test_idx]
        
        val_size = int(len(train_val_keys) * val_ratio)
        val_keys = train_val_keys[:val_size]
        train_keys = train_val_keys[val_size:]
        
        folds.append((train_keys, val_keys, test_keys))
        print(f"Fold {fold_idx + 1}/{n_splits} created")
    
    return folds

In [34]:
def extract_partition(dataset_path, data_by_speaker_target, bg_chunks_dict, speaker_keys, len_mfcc=50):
    """
    Extract features for a given partition with speaker ID tracking.
    Handles both regular files and pre-chunked background noise.
    Returns: x, y, speaker_ids
    """
    x, y, speaker_ids = [], [], []
    
    for target, speaker_id in speaker_keys:
        if speaker_id in bg_chunks_dict:
            mfccs, label = bg_chunks_dict[speaker_id]
            x.append(mfccs)
            y.append(label)
            speaker_ids.append(speaker_id)
        else:
            for filename, label, is_background in data_by_speaker_target[(target, speaker_id)]:
                if not is_background:  
                    path = join(dataset_path, target, filename)
                    mfccs = calc_mfcc(path)
                    if mfccs.shape[1] == len_mfcc:
                        x.append(mfccs)
                        y.append(label)
                        speaker_ids.append(speaker_id)
    
    return x, y, speaker_ids

In [35]:
def create_finetuning_data(x_test, y_test, speaker_ids_test, bg_label_index, ratio=0.3, seed=42):
    """
    Extract 30% of test data for fine-tuning.
    Groups by (speaker_id, label) to ensure same speaker appears in both sets.
    Background noise chunks are EXCLUDED from fine-tuning.
    """
    random.seed(seed)

    bg_indices = []
    regular_indices = []
    
    for i, (y, speaker) in enumerate(zip(y_test, speaker_ids_test)):
        if y == bg_label_index or speaker.startswith('bg_chunk_'):
            bg_indices.append(i)
        else:
            regular_indices.append(i)

    speaker_label_data = defaultdict(list)
    for idx in regular_indices:
        speaker = speaker_ids_test[idx]
        label = y_test[idx]
        speaker_label_data[(speaker, label)].append(idx)
    
    finetune_indices = set()

    for (speaker, label), indices in speaker_label_data.items():
        if len(indices) >= 2:
            num_to_take = max(1, int(len(indices) * ratio))
            selected = random.sample(indices, num_to_take)
            finetune_indices.update(selected)

    x_finetune = [x_test[i] for i in finetune_indices]
    y_finetune = [y_test[i] for i in finetune_indices]
    speaker_finetune = [speaker_ids_test[i] for i in finetune_indices]

    final_test_indices = (set(regular_indices) - finetune_indices) | set(bg_indices)
    
    x_remaining_test = [x_test[i] for i in final_test_indices]
    y_remaining_test = [y_test[i] for i in final_test_indices]
    speaker_remaining = [speaker_ids_test[i] for i in final_test_indices]
    
    return x_finetune, y_finetune, speaker_finetune, x_remaining_test, y_remaining_test, speaker_remaining

In [36]:
def save_fold(fold_idx, x_train, y_train, speaker_train, x_val, y_val, speaker_val,
              x_test, y_test, speaker_test, x_finetune, y_finetune, speaker_finetune,
              x_final_test, y_final_test, speaker_final_test, output_dir):
    """Save a single fold with all partitions including speaker IDs."""
    os.makedirs(output_dir, exist_ok=True)
    
    fold_file = join(output_dir, f"fold_{fold_idx + 1}.npz")
    np.savez(
        fold_file,
        x_train_fold=x_train,
        y_train_fold=y_train,
        speaker_train_fold=speaker_train,
        x_val_fold=x_val,
        y_val_fold=y_val,
        speaker_val_fold=speaker_val,
        x_test_fold=x_test,
        y_test_fold=y_test,
        speaker_test_fold=speaker_test,
        x_finetune_train=x_finetune,
        y_finetune_train=y_finetune,
        speaker_finetune_train=speaker_finetune,
        x_final_test=x_final_test,
        y_final_test=y_final_test,
        speaker_final_test=speaker_final_test
    )
    print(f"Fold {fold_idx + 1} saved to {fold_file}")

In [37]:
def create_and_save_folds(dataset_path, output_dir, n_splits=10, val_ratio=0.1, 
                          finetune_ratio=0.3, seed=42):
    """Complete pipeline: load data, create folds, extract fine-tuning data, and save."""
    print("Loading data...")
    target_list, data_by_speaker_target, bg_label_index = load_speech_data(dataset_path)
    print(f"Loaded {len(target_list)} targets")
    
    bg_chunks = preprocess_background_noise(dataset_path, data_by_speaker_target, bg_label_index)
    
    bg_chunks_dict = {chunk[1]: (chunk[2], chunk[3]) for chunk in bg_chunks} 
    
    if bg_label_index is not None:
        print(f"Background noise label index: {bg_label_index}")
    else:
        print("Warning: No background noise found")
    
    print(f"\nCreating {n_splits} folds...")
    folds = create_k_folds(data_by_speaker_target, bg_chunks, n_splits, val_ratio, seed)
    
    print("\nExtracting features and creating partitions...")
    for fold_idx, (train_keys, val_keys, test_keys) in enumerate(folds):
        print(f"\nProcessing Fold {fold_idx + 1}/{n_splits}")
        
        x_train, y_train, speaker_train = extract_partition(dataset_path, data_by_speaker_target, bg_chunks_dict, train_keys)
        x_val, y_val, speaker_val = extract_partition(dataset_path, data_by_speaker_target, bg_chunks_dict, val_keys)
        x_test, y_test, speaker_test = extract_partition(dataset_path, data_by_speaker_target, bg_chunks_dict, test_keys)
        
        x_finetune, y_finetune, speaker_finetune, x_final_test, y_final_test, speaker_final_test = create_finetuning_data(
            x_test, y_test, speaker_test, bg_label_index, finetune_ratio, seed + fold_idx
        )
        
        print(f"  Train: {len(x_train)}, Val: {len(x_val)}, Test: {len(x_test)}")
        print(f"  Fine-tune: {len(x_finetune)}, Final Test: {len(x_final_test)}")
        
        save_fold(fold_idx, x_train, y_train, speaker_train, x_val, y_val, speaker_val,
                  x_test, y_test, speaker_test, x_finetune, y_finetune, speaker_finetune,
                  x_final_test, y_final_test, speaker_final_test, output_dir)
    
    print(f"\nAll folds saved to {output_dir}")

## Create and Save K-Fold Data Partitions

Generate 10-fold cross-validation splits with speaker-independent partitioning and save to disk.

**Dataset Configuration:**
- **Dataset path:** Google Speech Commands v0.02
- **Number of folds:** 10
- **Validation split:** 10% of training data per fold
- **Fine-tuning split:** 30% of training data reserved for fine-tuning
- **Random seed:** 42 (for reproducibility)

**Splitting Strategy:**
- **Speaker-independent:** No speaker appears in both train and test sets
- **Stratified:** Maintains class distribution across folds
- **Special handling:** Background noise chunks treated as separate samples

**Output Files:** 
Creates `fold_1.npz` through `fold_10.npz` in `new_Data_particions/` directory.

**Each .npz file contains:**

In [38]:
dataset_path = "path/to/speech_commands_v0.02"
output_dir = "new_Data_particions"

create_and_save_folds(
    dataset_path=dataset_path,
    output_dir=output_dir,
    n_splits=10,
    val_ratio=0.1,
    finetune_ratio=0.3,
    seed=42
)

Loading data...
Loaded 36 targets
Preprocessing 6 background noise files...
Created 398 background noise chunks from 6 files
Background noise label index: 5

Creating 10 folds...
Fold 1/10 created
Fold 2/10 created
Fold 3/10 created
Fold 4/10 created
Fold 5/10 created
Fold 6/10 created
Fold 7/10 created
Fold 8/10 created
Fold 9/10 created
Fold 10/10 created

Extracting features and creating partitions...

Processing Fold 1/10
  Train: 77912, Val: 8253, Test: 9637
  Fine-tune: 2116, Final Test: 7521
Fold 1 saved to new_Data_particions/fold_1.npz

Processing Fold 2/10
  Train: 77821, Val: 8256, Test: 9725
  Fine-tune: 2201, Final Test: 7524
Fold 2 saved to new_Data_particions/fold_2.npz

Processing Fold 3/10
  Train: 77973, Val: 8263, Test: 9566
  Fine-tune: 2161, Final Test: 7405
Fold 3 saved to new_Data_particions/fold_3.npz

Processing Fold 4/10
  Train: 77820, Val: 8281, Test: 9701
  Fine-tune: 2151, Final Test: 7550
Fold 4 saved to new_Data_particions/fold_4.npz

Processing Fold 5/1

In [39]:
data = np.load('new_Data_particions/fold_1.npz')  

print("Arrays inside the .npz file:")
print(data.files)

Arrays inside the .npz file:
['x_train_fold', 'y_train_fold', 'speaker_train_fold', 'x_val_fold', 'y_val_fold', 'speaker_val_fold', 'x_test_fold', 'y_test_fold', 'speaker_test_fold', 'x_finetune_train', 'y_finetune_train', 'speaker_finetune_train', 'x_final_test', 'y_final_test', 'speaker_final_test']


In [23]:
def load_fold(fold_path):
    """Load a single fold file."""
    return np.load(fold_path, allow_pickle=True)


def analyze_fold_distribution(fold_data, target_list=None):
    """
    Analyze and print distribution of targets across all partitions.
    
    Args:
        fold_data: Loaded .npz file
        target_list: Optional list of target names for better readability
    """
    partitions = {
        'Training': ('y_train_fold', 'speaker_train_fold'),
        'Validation': ('y_val_fold', 'speaker_val_fold'),
        'Test (Full)': ('y_test_fold', 'speaker_test_fold'),
        'Fine-tune Train': ('y_finetune_train', 'speaker_finetune_train'),
        'Final Test': ('y_final_test', 'speaker_final_test')
    }
    
    print("="*80)
    print("FOLD DATA DISTRIBUTION ANALYSIS")
    print("="*80)
    
    for partition_name, (y_key, speaker_key) in partitions.items():
        y_data = fold_data[y_key]
        speaker_data = fold_data[speaker_key]
        
        print(f"\n{partition_name.upper()}")
        print("-"*80)
        print(f"Total samples: {len(y_data)}")
        print(f"Unique speakers: {len(set(speaker_data))}")
        
        # Count by target
        target_counts = Counter(y_data)
        print(f"\nDistribution by target (label):")
        for target_idx in sorted(target_counts.keys()):
            target_name = target_list[target_idx] if target_list and target_idx < len(target_list) else f"Label_{target_idx}"
            count = target_counts[target_idx]
            print(f"  {target_name:30s}: {count:5d} samples")
    
    print("\n" + "="*80)

In [43]:
def compare_partitions(fold_data):
    """Compare original test vs fine-tune + final test."""
    y_test_full = fold_data['y_test_fold']
    y_finetune = fold_data['y_finetune_train']
    y_final_test = fold_data['y_final_test']
    
    print("\n" + "="*80)
    print("PARTITION SPLIT VERIFICATION")
    print("="*80)
    print(f"Original Test Set:     {len(y_test_full)} samples")
    print(f"Fine-tune Train:       {len(y_finetune)} samples ({len(y_finetune)/len(y_test_full)*100:.1f}%)")
    print(f"Final Test:            {len(y_final_test)} samples ({len(y_final_test)/len(y_test_full)*100:.1f}%)")
    print(f"Sum (Finetune + Final): {len(y_finetune) + len(y_final_test)} samples")
    
    if len(y_finetune) + len(y_final_test) == len(y_test_full):
        print("Split is correct - no data loss")
    else:
        print("WARNING: Data mismatch!")

In [41]:
def check_speaker_overlap(fold_data):
    """Check if same speakers appear in fine-tune and final test (should be YES)."""
    speaker_finetune = set(fold_data['speaker_finetune_train'])
    speaker_final_test = set(fold_data['speaker_final_test'])
    
    overlap = speaker_finetune & speaker_final_test
    
    print("\n" + "="*80)
    print("SPEAKER OVERLAP ANALYSIS (Fine-tune ↔ Final Test)")
    print("="*80)
    print(f"Speakers in Fine-tune Train: {len(speaker_finetune)}")
    print(f"Speakers in Final Test:      {len(speaker_final_test)}")
    print(f"Overlapping speakers:        {len(overlap)}")

In [44]:
fold_path = "new_Data_particions/fold_1.npz"
fold_data = load_fold(fold_path)

target_list = ['stop', 'up', 'learn', 'bird', 'follow', '_background_noise_', 'wow', 'on', 'marvin', 'tree', 'no', 'dog', 'happy', 'off', 'down', 'six', 'sheila', 'bed', 'seven', 'visual', 'four', 'right', 'five', 'cat', 'house', 'left', 'go', 'eight', 'forward', 'one', 'yes', 'two', 'backward', 'nine', 'three', 'zero'] 

analyze_fold_distribution(fold_data, target_list)
compare_partitions(fold_data)
check_speaker_overlap(fold_data)

FOLD DATA DISTRIBUTION ANALYSIS

TRAINING
--------------------------------------------------------------------------------
Total samples: 77912
Unique speakers: 2765

Distribution by target (label):
  stop                          :  2916 samples
  up                            :  2614 samples
  learn                         :  1152 samples
  bird                          :  1449 samples
  follow                        :  1219 samples
  _background_noise_            :   314 samples
  wow                           :  1449 samples
  on                            :  2834 samples
  marvin                        :  1454 samples
  tree                          :  1162 samples
  no                            :  2911 samples
  dog                           :  1473 samples
  happy                         :  1415 samples
  off                           :  2799 samples
  down                          :  2937 samples
  six                           :  2895 samples
  sheila                        :