Peak Normalization

In [1]:
import os
import librosa
import numpy as np
import soundfile as sf
from tqdm import tqdm

def normalize_audio_file(input_path, output_path, target_peak=0.98):

    try:
        # Load the audio file, preserving its original sample rate
        audio, sr = librosa.load(input_path, sr=None)

        # Skip silent files
        if np.max(np.abs(audio)) == 0:
            # Just copy the silent file to the destination
            sf.write(output_path, audio, sr)
            return "Silent"

        # Perform peak normalization
        peak = np.max(np.abs(audio))
        scaling_factor = target_peak / peak
        normalized_audio = audio * scaling_factor
        
        # Save the normalized audio
        sf.write(output_path, normalized_audio, sr)
        
        return "Success"
        
    except Exception as e:
        return f"Error: {e}"

def process_directory(input_dir, output_dir, target_peak=0.98):
   
    # --- 1. Find all audio files to process ---
    audio_files = []
    # os.walk will go through the input_dir and all its subdirectories
    for root, _, files in os.walk(input_dir):
        for filename in files:
            # Process only .wav files (adjust if you have other formats)
            if filename.lower().endswith('.wav'):
                audio_files.append(os.path.join(root, filename))

    if not audio_files:
        print("No .wav files found in the specified input directory. Exiting.")
        return

    print(f"Found {len(audio_files)} .wav files to process.")

    # --- 2. Process files with a progress bar ---
    # tqdm creates the visual progress bar
    for input_path in tqdm(audio_files, desc="Normalizing audio files"):
        # Create the corresponding output path, preserving sub-folder structure
        relative_path = os.path.relpath(input_path, input_dir)
        output_path = os.path.join(output_dir, relative_path)
        
        # Ensure the output sub-directory exists
        output_sub_dir = os.path.dirname(output_path)
        os.makedirs(output_sub_dir, exist_ok=True)
        
        # Normalize the individual file
        status = normalize_audio_file(input_path, output_path, target_peak)
        
        if status != "Success" and status != "Silent":
            # Optional: Print errors for files that failed
            print(f"Could not process {input_path}. Reason: {status}")

# --- Main execution block ---
if __name__ == "__main__":
    # ---  USER CONFIGURATION ---
    # 1. Set the path to your original SVD dataset
    INPUT_FOLDER = "Datasets/"
    
    # 2. Set the path where the new normalized dataset will be saved
    OUTPUT_FOLDER = "DatasetNormalized/"
    # --- END CONFIGURATION ---

    print("Starting audio normalization process...")
    print(f"Input Directory:  {INPUT_FOLDER}")
    print(f"Output Directory: {OUTPUT_FOLDER}")
    
    process_directory(INPUT_FOLDER, OUTPUT_FOLDER)
    
    print("\nNormalization process complete!")
    print(f"Normalized files are saved in: {OUTPUT_FOLDER}")

Starting audio normalization process...
Input Directory:  Datasets/
Output Directory: DatasetNormalized/
Found 864 .wav files to process.


Normalizing audio files: 100%|██████████| 864/864 [00:02<00:00, 375.98it/s]


Normalization process complete!
Normalized files are saved in: DatasetNormalized/





Silence Removal

In [2]:
import os
import librosa
import numpy as np
import soundfile as sf
from tqdm import tqdm
import warnings

# Optional: Suppress librosa warnings
warnings.filterwarnings('ignore', category=UserWarning, module='librosa')

print("Libraries imported successfully.")

Libraries imported successfully.


In [3]:
# ---  USER CONFIGURATION ---

# 1. Set the path to your NORMALIZED dataset (the output from the previous step)
INPUT_FOLDER = "DatasetNormalized/"

# 2. Set the path where the new trimmed (silence-removed) dataset will be saved
OUTPUT_FOLDER = "DatasetTrimmed/"

# 3. Set the silence threshold in decibels (dB) below the peak amplitude.
#    A higher value (e.g., 40) is less aggressive.
#    A lower value (e.g., 20) is more aggressive.
#    30 is a good starting point.
SILENCE_THRESHOLD_DB = 30

# --- END CONFIGURATION ---

# Create the output directory to make sure it exists
os.makedirs(OUTPUT_FOLDER, exist_ok=True)

print(f"Input Directory (normalized files): '{INPUT_FOLDER}'")
print(f"Output Directory (trimmed files):   '{OUTPUT_FOLDER}'")
print(f"Silence Threshold: {SILENCE_THRESHOLD_DB} dB")

Input Directory (normalized files): 'DatasetNormalized/'
Output Directory (trimmed files):   'DatasetTrimmed/'
Silence Threshold: 30 dB


In [4]:
def trim_silence_from_file(input_path, output_path, top_db):
   
    try:
        # Load the audio file
        audio, sr = librosa.load(input_path, sr=None)

        # librosa.effects.split returns the start and end indices of non-silent chunks
        non_silent_intervals = librosa.effects.split(audio, top_db=top_db)

        # Concatenate the non-silent audio chunks together
        trimmed_audio = np.concatenate([audio[start:end] for start, end in non_silent_intervals])
        
        # Handle cases where the file becomes empty after trimming
        if len(trimmed_audio) == 0:
            # Save a tiny amount of silence to avoid creating an empty file
            trimmed_audio = np.zeros(1, dtype=np.float32)
            sf.write(output_path, trimmed_audio, sr)
            return "Silent_File"
        
        # Save the trimmed audio
        sf.write(output_path, trimmed_audio, sr)
        return "Success"
        
    except Exception as e:
        return f"Error: {e}"

print("Silence removal function defined.")

Silence removal function defined.


In [5]:
# --- 1. Find all audio files to process ---
audio_files_to_process = []
for root, _, files in os.walk(INPUT_FOLDER):
    for filename in files:
        if filename.lower().endswith('.wav'):
            full_path = os.path.join(root, filename)
            audio_files_to_process.append(full_path)

if not audio_files_to_process:
    print(f"Warning: No .wav files were found in '{INPUT_FOLDER}'. Please check the path.")
else:
    print(f"Found {len(audio_files_to_process)} .wav files. Starting silence removal...")

    # --- 2. Process files with a progress bar ---
    for input_path in tqdm(audio_files_to_process, desc="Trimming silence"):
        # Create the corresponding output path, preserving the sub-folder structure
        relative_path = os.path.relpath(input_path, INPUT_FOLDER)
        output_path = os.path.join(OUTPUT_FOLDER, relative_path)
        
        # Ensure the output sub-directory exists before saving
        output_sub_dir = os.path.dirname(output_path)
        os.makedirs(output_sub_dir, exist_ok=True)
        
        # Trim silence from the individual file
        status = trim_silence_from_file(input_path, output_path, SILENCE_THRESHOLD_DB)
        
        if status.startswith("Error"):
            print(f"Could not process {input_path}. Reason: {status}")

    print("\n------------------------------------")
    print("Silence removal process complete!")
    print(f"All trimmed files have been saved in: '{OUTPUT_FOLDER}'")
    print("------------------------------------")

Found 864 .wav files. Starting silence removal...


Trimming silence: 100%|██████████| 864/864 [00:09<00:00, 88.27it/s]


------------------------------------
Silence removal process complete!
All trimmed files have been saved in: 'DatasetTrimmed/'
------------------------------------





Merge small Classes together

In [6]:
import os
import shutil
from tqdm import tqdm  # <-- THIS IS THE ONLY LINE THAT HAS BEEN CHANGED
import warnings

# Optional: Suppress warnings
warnings.filterwarnings('ignore')

# --------------------------------------------------------------------------
#                           USER CONFIGURATION
# --------------------------------------------------------------------------
# ---  Please edit the variables below ---

# 1. Set the path to your trimmed dataset folder.
SOURCE_DATASET_FOLDER = "DatasetTrimmed"

# 2. Set the path where the new, auto-merged dataset will be saved.
DESTINATION_DATASET_FOLDER = "Dataset_AutoMerged"

# 3. !! IMPORTANT: Set your merging threshold.
#    Any class with FEWER files than this number will be automatically merged.
MERGE_THRESHOLD = 50 

# 4. Give a name to the new, combined class folder for the small classes.
NEW_CLASS_NAME = "Other_Pathologies"

# --------------------------------------------------------------------------
#                         MAIN PROCESSING SCRIPT
# --------------------------------------------------------------------------

# --- Phase 1: Analyze the Dataset ---
print("--- Phase 1: Analyzing Dataset ---")

class_counts = {}
try:
    source_classes = [d for d in os.listdir(SOURCE_DATASET_FOLDER) if os.path.isdir(os.path.join(SOURCE_DATASET_FOLDER, d))]
    if not source_classes:
        raise FileNotFoundError(f"No subdirectories found in '{SOURCE_DATASET_FOLDER}'.")

    for class_name in source_classes:
        class_path = os.path.join(SOURCE_DATASET_FOLDER, class_name)
        num_files = len([f for f in os.listdir(class_path) if f.lower().endswith('.wav')])
        class_counts[class_name] = num_files

    print("\n[INFO] File counts per class:")
    sorted_counts = sorted(class_counts.items(), key=lambda item: item[1], reverse=True)
    for name, count in sorted_counts:
        print(f"  - {name}: {count} files")

    classes_to_merge = [name for name, count in class_counts.items() if count < MERGE_THRESHOLD]
    classes_to_keep = [name for name, count in class_counts.items() if count >= MERGE_THRESHOLD]

    print("\n[INFO] Reorganization Plan:")
    print(f"Threshold set to: {MERGE_THRESHOLD} files.")
    if classes_to_merge:
        print(f"The following {len(classes_to_merge)} classes will be MERGED into '{NEW_CLASS_NAME}':")
        print(f"  {classes_to_merge}")
    else:
        print("No classes fall below the threshold. All classes will be copied as-is.")
    
    if classes_to_keep:
         print(f"The following {len(classes_to_keep)} classes will be KEPT SEPARATE:")
         print(f"  {classes_to_keep}")

    # --- Phase 2: Execute the Reorganization ---
    print("\n--- Phase 2: Executing Reorganization ---")
    os.makedirs(DESTINATION_DATASET_FOLDER, exist_ok=True)
    
    for class_name in tqdm(source_classes, desc="Reorganizing classes"):
        source_class_path = os.path.join(SOURCE_DATASET_FOLDER, class_name)
        
        if class_name in classes_to_merge:
            destination_class_path = os.path.join(DESTINATION_DATASET_FOLDER, NEW_CLASS_NAME)
        else:
            destination_class_path = os.path.join(DESTINATION_DATASET_FOLDER, class_name)
            
        os.makedirs(destination_class_path, exist_ok=True)
        
        files_to_copy = [f for f in os.listdir(source_class_path) if f.lower().endswith('.wav')]
        for filename in files_to_copy:
            source_file_path = os.path.join(source_class_path, filename)
            destination_file_path = os.path.join(destination_class_path, filename)
            shutil.copy2(source_file_path, destination_file_path)

    print("\n" + "=" * 50)
    print("      Automated reorganization complete!      ")
    print(f"The final dataset is ready for use in:\n'{DESTINATION_DATASET_FOLDER}'")
    print("=" * 50)

except Exception as e:
    print(f"\nAn error occurred: {e}")
    print("Please check your folder paths and ensure the source directory is structured correctly.")

--- Phase 1: Analyzing Dataset ---

[INFO] File counts per class:
  - Dysarthia: 130 files
  - Dysphonie: 130 files
  - Laryngitis: 130 files
  - parkinson: 130 files
  - spasmodische_dysphonie: 130 files
  - Vox senilis: 130 files
  - Laryngozele: 84 files

[INFO] Reorganization Plan:
Threshold set to: 50 files.
No classes fall below the threshold. All classes will be copied as-is.
The following 7 classes will be KEPT SEPARATE:
  ['Dysarthia', 'Dysphonie', 'Laryngitis', 'Laryngozele', 'parkinson', 'spasmodische_dysphonie', 'Vox senilis']

--- Phase 2: Executing Reorganization ---


Reorganizing classes: 100%|██████████| 7/7 [00:00<00:00, 32.95it/s]


      Automated reorganization complete!      
The final dataset is ready for use in:
'Dataset_AutoMerged'





Split Dataset 

In [7]:
import os
import shutil
import numpy as np
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import warnings

# Optional: Suppress warnings
warnings.filterwarnings('ignore')

# --------------------------------------------------------------------------
#                           USER CONFIGURATION
# --------------------------------------------------------------------------
# ---  Please edit the variables below ---

# 1. Set the path to your final, merged, and preprocessed dataset.
#    This should be the output from the previous merging step.
SOURCE_FOLDER = "Dataset_AutoMerged"

# 2. Set the path where the final split dataset will be created.
DESTINATION_FOLDER = "Dataset_For_Training"

# 3. Define the split ratios. These MUST sum to 1.0.
TRAIN_SIZE = 0.8  # 80%
VAL_SIZE = 0.1    # 10%
TEST_SIZE = 0.1   # 10%

# 4. Set a random state for reproducibility of the split.
RANDOM_STATE = 42

# --------------------------------------------------------------------------
#                         MAIN PROCESSING SCRIPT
# --------------------------------------------------------------------------

# --- 1. Validation and Setup ---
print("--- Phase 1: Validating Configuration and Collecting Files ---")

if not np.isclose(TRAIN_SIZE + VAL_SIZE + TEST_SIZE, 1.0):
    print(f"Error: Split ratios must sum to 1.0. Current sum is {TRAIN_SIZE + VAL_SIZE + TEST_SIZE}")
else:
    # Create the main destination directory
    os.makedirs(DESTINATION_FOLDER, exist_ok=True)
    
    # --- 2. Collect all file paths and their corresponding labels ---
    filepaths = []
    labels = []
    
    classes = [d for d in os.listdir(SOURCE_FOLDER) if os.path.isdir(os.path.join(SOURCE_FOLDER, d))]
    print(f"Found {len(classes)} classes: {classes}")

    for cls in classes:
        class_path = os.path.join(SOURCE_FOLDER, cls)
        files = os.listdir(class_path)
        for file in files:
            if file.lower().endswith('.wav'):
                filepaths.append(os.path.join(class_path, file))
                labels.append(cls)

    print(f"Collected a total of {len(filepaths)} files.")

    # --- 3. Perform the stratified split ---
    print("\n--- Phase 2: Performing Stratified Split ---")
    
    # First split: separate out the training set
    X_train, X_temp, y_train, y_temp = train_test_split(
        filepaths, labels,
        train_size=TRAIN_SIZE,
        stratify=labels,
        random_state=RANDOM_STATE
    )

    # Second split: split the remainder into validation and test sets
    # Calculate the ratio for the second split
    relative_test_size = TEST_SIZE / (VAL_SIZE + TEST_SIZE)
    
    X_val, X_test, y_val, y_test = train_test_split(
        X_temp, y_temp,
        test_size=relative_test_size,
        stratify=y_temp,
        random_state=RANDOM_STATE
    )

    print("Split successful. Planned distribution:")
    print(f"  - Training samples:   {len(X_train)}")
    print(f"  - Validation samples: {len(X_val)}")
    print(f"  - Test samples:       {len(X_test)}")

    # --- 4. Copy files to the new directory structure ---
    print("\n--- Phase 3: Copying Files to Destination ---")

    def copy_files(file_list, destination_name):
        destination_path = os.path.join(DESTINATION_FOLDER, destination_name)
        os.makedirs(destination_path, exist_ok=True)
        
        for file_path in tqdm(file_list, desc=f"Copying to {destination_name}"):
            class_name = os.path.basename(os.path.dirname(file_path))
            destination_class_path = os.path.join(destination_path, class_name)
            os.makedirs(destination_class_path, exist_ok=True)
            shutil.copy2(file_path, destination_class_path)

    copy_files(X_train, "train")
    copy_files(X_val, "validation")
    copy_files(X_test, "test")

    # --- 5. Final Verification ---
    print("\n--- Phase 4: Verifying Final File Counts ---")
    final_counts = {}
    for split in ["train", "validation", "test"]:
        split_path = os.path.join(DESTINATION_FOLDER, split)
        final_counts[split] = {}
        for cls in os.listdir(split_path):
            count = len(os.listdir(os.path.join(split_path, cls)))
            final_counts[split][cls] = count
    
    print("Final distribution of files:")
    for split, class_data in final_counts.items():
        print(f"\n{split.capitalize()} Set:")
        total = 0
        for cls, count in sorted(class_data.items()):
            print(f"  - {cls}: {count} files")
            total += count
        print(f"  ------------------\n  Total: {total} files")

    print("\n" + "=" * 50)
    print("      Dataset splitting process complete!      ")
    print(f"The final dataset is ready for training in:\n'{DESTINATION_FOLDER}'")
    print("=" * 50)

--- Phase 1: Validating Configuration and Collecting Files ---
Found 7 classes: ['Dysarthia', 'Dysphonie', 'Laryngitis', 'Laryngozele', 'parkinson', 'spasmodische_dysphonie', 'Vox senilis']
Collected a total of 864 files.

--- Phase 2: Performing Stratified Split ---
Split successful. Planned distribution:
  - Training samples:   691
  - Validation samples: 86
  - Test samples:       87

--- Phase 3: Copying Files to Destination ---


Copying to train: 100%|██████████| 691/691 [00:00<00:00, 2845.41it/s]
Copying to validation: 100%|██████████| 86/86 [00:00<00:00, 3232.89it/s]
Copying to test: 100%|██████████| 87/87 [00:00<00:00, 3412.33it/s]


--- Phase 4: Verifying Final File Counts ---
Final distribution of files:

Train Set:
  - Dysarthia: 403 files
  - Dysphonie: 410 files
  - Laryngitis: 409 files
  - Laryngozele: 260 files
  - Vox senilis: 403 files
  - parkinson: 409 files
  - spasmodische_dysphonie: 412 files
  ------------------
  Total: 2706 files

Validation Set:
  - Dysarthia: 13 files
  - Dysphonie: 13 files
  - Laryngitis: 13 files
  - Laryngozele: 8 files
  - Vox senilis: 13 files
  - parkinson: 13 files
  - spasmodische_dysphonie: 13 files
  ------------------
  Total: 86 files

Test Set:
  - Dysarthia: 13 files
  - Dysphonie: 13 files
  - Laryngitis: 13 files
  - Laryngozele: 9 files
  - Vox senilis: 13 files
  - parkinson: 13 files
  - spasmodische_dysphonie: 13 files
  ------------------
  Total: 87 files

      Dataset splitting process complete!      
The final dataset is ready for training in:
'Dataset_For_Training'





Augment Train Folder

In [8]:
import os
import librosa
import numpy as np
import soundfile as sf
from tqdm import tqdm
import random
import warnings
from scipy.signal import butter, lfilter, convolve

# Optional: Suppress warnings from librosa, etc.
warnings.filterwarnings('ignore')

# --------------------------------------------------------------------------
#                           USER CONFIGURATION
# --------------------------------------------------------------------------
# --- Please edit the variables below ---

# 1. Set the path to the 'train' subfolder of your dataset.
TRAIN_DATA_FOLDER = "Dataset_For_Training/train"

# 2. Set the path to a folder containing Room Impulse Response (RIR) WAV files for reverb.
#    You can download free, high-quality RIRs here: https://www.openair.hosted.york.ac.uk/
#    Place a few .wav files from the dataset in the folder specified below.
#    If you don't want to use reverb, leave the folder path empty: ""
RIR_FOLDER = "rir_filters"

# 3. Set how many augmented versions to create for EACH original audio file.
NUM_AUGMENTATIONS_PER_FILE = 3

# 4. Set the maximum number of augmentations to chain together in a single pipeline.
#    A value of 3 means each new file will have 1, 2, or 3 random effects applied.
MAX_AUGMENTATIONS_IN_PIPELINE = 3

# --- Define the parameters for each augmentation type ---
NOISE_FACTOR_RANGE = (0.005, 0.02)
PITCH_STEPS_RANGE = (-4, 4)
STRETCH_RATE_RANGE = (0.8, 1.2)
SHIFT_MAX_FRACTION = 0.25
LOW_PASS_CUTOFF_RANGE = (1000, 4000) # In Hz
HIGH_PASS_CUTOFF_RANGE = (200, 750)  # In Hz

# --------------------------------------------------------------------------
#                         AUGMENTATION FUNCTIONS
# --------------------------------------------------------------------------

def add_noise(audio, sr, noise_factor):
    noise = np.random.randn(len(audio))
    return audio + noise_factor * noise

def pitch_shift(audio, sr, n_steps):
    return librosa.effects.pitch_shift(y=audio, sr=sr, n_steps=n_steps)

def time_stretch(audio, sr, rate):
    return librosa.effects.time_stretch(y=audio, rate=rate)

def time_shift(audio, sr, max_fraction):
    shift_range = int(len(audio) * max_fraction)
    shift_amount = random.randint(-shift_range, shift_range)
    return np.roll(audio, shift_amount)
    
def apply_reverb(audio, sr, rir_audio):
    if len(rir_audio) > len(audio):
        rir_audio = rir_audio[:len(audio)] # Truncate RIR if it's longer
    rir_normalized = rir_audio / np.max(np.abs(rir_audio))
    return convolve(audio, rir_normalized, mode='same')

# --- FIX APPLIED HERE ---
def dynamic_compress(audio, sr):
    """
    Clips the audio to [-1, 1] before applying mu-law companding. This
    prevents errors when prior augmentations push the signal out of range.
    The companding is then expanded back to float.
    """
    # Clip audio to the required range for mu-law compression
    audio_clipped = np.clip(audio, -1.0, 1.0)
    
    # Apply mu-law and expand back to float
    mu_encoded = librosa.mu_compress(audio_clipped, mu=255)
    return librosa.mu_expand(mu_encoded, mu=255)
    
def apply_filter(audio, sr, cutoff, kind='lowpass', order=5):
    nyq = 0.5 * sr
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype=kind, analog=False)
    return lfilter(b, a, audio)

# --------------------------------------------------------------------------
#                          MAIN PROCESSING SCRIPT
# --------------------------------------------------------------------------
# (The rest of the script is unchanged and remains correct)

print("--- Data Augmentation Script Initializing ---")

# 1. Pre-load RIR files for reverb
rir_audios = []
if RIR_FOLDER and os.path.exists(RIR_FOLDER):
    print(f"Loading RIR files from: {RIR_FOLDER}")
    for rir_filename in os.listdir(RIR_FOLDER):
        if rir_filename.lower().endswith('.wav'):
            rir_path = os.path.join(RIR_FOLDER, rir_filename)
            try:
                rir_audio, rir_sr = librosa.load(rir_path, sr=None, dtype=np.float32)
                rir_audios.append(rir_audio)
            except Exception as e:
                print(f"Warning: Could not load RIR file {rir_path}. Error: {e}")
    if not rir_audios:
        print("Warning: RIR folder specified but no .wav files could be loaded. Reverb will be disabled.")
    else:
        print(f"Successfully loaded {len(rir_audios)} RIR files.")
else:
    print("Info: RIR folder not found or not specified. Reverb augmentation will be disabled.")

# 2. Collect all original audio files
print("\n--- Phase 1: Collecting Original Training Files ---")
original_filepaths = []
for root, _, files in os.walk(TRAIN_DATA_FOLDER):
    for filename in files:
        if filename.lower().endswith('.wav') and '_aug_' not in filename:
            original_filepaths.append(os.path.join(root, filename))

if not original_filepaths:
    print(f"Error: No original .wav files found in '{TRAIN_DATA_FOLDER}'.")
    exit()
print(f"Found {len(original_filepaths)} original audio files to augment.")

# 3. Apply Augmentation Pipelines
print("\n--- Phase 2: Applying Augmentation Pipelines ---")
augmentations_applied = 0
augmentation_functions = {
    'noise': add_noise,
    'pitch': pitch_shift,
    'stretch': time_stretch,
    'shift': lambda audio, sr: time_shift(audio, sr, SHIFT_MAX_FRACTION),
    'compress': dynamic_compress,
    'lowpass': lambda audio, sr: apply_filter(audio, sr, random.randint(*LOW_PASS_CUTOFF_RANGE), 'lowpass'),
    'highpass': lambda audio, sr: apply_filter(audio, sr, random.randint(*HIGH_PASS_CUTOFF_RANGE), 'highpass'),
}
if rir_audios:
    augmentation_functions['reverb'] = lambda audio, sr: apply_reverb(audio, sr, random.choice(rir_audios))
function_pool = list(augmentation_functions.items())

for filepath in tqdm(original_filepaths, desc="Augmenting files"):
    try:
        audio, sr = librosa.load(filepath, sr=None, dtype=np.float32)
        for i in range(NUM_AUGMENTATIONS_PER_FILE):
            augmented_audio = audio.copy()
            num_effects_to_apply = random.randint(1, min(MAX_AUGMENTATIONS_IN_PIPELINE, len(function_pool)))
            effects_to_apply = random.sample(function_pool, num_effects_to_apply)
            applied_names = []
            for aug_name, aug_func in effects_to_apply:
                applied_names.append(aug_name)
                if aug_name == 'noise':
                    noise_factor = random.uniform(*NOISE_FACTOR_RANGE)
                    augmented_audio = aug_func(augmented_audio, sr, noise_factor)
                elif aug_name == 'pitch':
                    n_steps = random.choice([s for s in range(PITCH_STEPS_RANGE[0], PITCH_STEPS_RANGE[1] + 1) if s != 0])
                    augmented_audio = aug_func(augmented_audio, sr, n_steps)
                elif aug_name == 'stretch':
                    rate = random.uniform(*STRETCH_RATE_RANGE)
                    while np.isclose(rate, 1.0):
                        rate = random.uniform(*STRETCH_RATE_RANGE)
                    augmented_audio = aug_func(augmented_audio, sr, rate)
                else: 
                    augmented_audio = aug_func(augmented_audio, sr)
            base_name = os.path.basename(filepath).split('.wav')[0]
            effects_str = '_'.join(sorted(applied_names))
            new_filename = f"{base_name}_aug_{i+1}_{effects_str}.wav"
            output_path = os.path.join(os.path.dirname(filepath), new_filename)
            augmented_audio_fp32 = augmented_audio.astype(np.float32)
            sf.write(output_path, augmented_audio_fp32, sr)
            augmentations_applied += 1
    except Exception as e:
        print(f"\nWarning: Could not process file {filepath}. Error: {e}")

print("\n" + "="*50)
print("       Data augmentation complete! ✨")
print(f"Successfully created {augmentations_applied} new training files.")
print(f"Your training folders now contain a diverse mix of augmented data.")
print("="*50)

--- Data Augmentation Script Initializing ---
Info: RIR folder not found or not specified. Reverb augmentation will be disabled.

--- Phase 1: Collecting Original Training Files ---
Found 691 original audio files to augment.

--- Phase 2: Applying Augmentation Pipelines ---


Augmenting files: 100%|██████████| 691/691 [00:19<00:00, 35.10it/s]


       Data augmentation complete! ✨
Successfully created 2073 new training files.
Your training folders now contain a diverse mix of augmented data.





In [9]:
%pip install tqdm

Note: you may need to restart the kernel to use updated packages.


In [12]:
import os
import librosa
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import warnings

# Suppress common UserWarning from librosa
warnings.filterwarnings('ignore', category=UserWarning)

# -------------------------------------------------------------------
# 1. CONFIGURATION
# -------------------------------------------------------------------
INPUT_AUDIO_DIR = "Dataset_For_Training/"
OUTPUT_IMAGE_DIR = "melspectrograms_dataset/"

# Audio Parameters
SAMPLE_RATE = 22050
DURATION = None  # Set to specific seconds if you want fixed-length clips
OFFSET = 0.0     # Start reading after this time (seconds)

# Spectrogram Parameters
N_FFT = 2048
HOP_LENGTH = 512
N_MELS = 256
FMAX = 8000

# Image settings
FIG_SIZE = (5, 5)
DPI = 100  # Control image resolution
COLORMAP = 'viridis'  # Matplotlib colormap for visualization

# -------------------------------------------------------------------
# 2. HELPER FUNCTIONS
# -------------------------------------------------------------------
def normalize_channel(channel):
    """Normalizes a single spectrogram channel to the 0-255 range for image saving."""
    if np.min(channel) == np.max(channel):
        return np.zeros_like(channel, dtype=np.uint8)
    
    scaled = (channel - np.min(channel)) / (np.max(channel) - np.min(channel))
    return (scaled * 255).astype(np.uint8)

def get_audio_files(directory):
    """Get all audio files from directory recursively."""
    audio_extensions = ('.wav', '.flac', '.mp3', '.m4a', '.aac', '.ogg')
    audio_files = []
    for dirpath, _, filenames in os.walk(directory):
        for filename in filenames:
            if filename.lower().endswith(audio_extensions):
                audio_files.append(os.path.join(dirpath, filename))
    return audio_files

# -------------------------------------------------------------------
# 3. CONVERSION PROCESS
# -------------------------------------------------------------------
def main():
    print("Starting conversion from .wav audio to 3-channel mel-spectrogram .png images...")
    print(f"Input directory: {INPUT_AUDIO_DIR}")
    print(f"Output directory: {OUTPUT_IMAGE_DIR}")
    
    os.makedirs(OUTPUT_IMAGE_DIR, exist_ok=True)
    
    audio_files = get_audio_files(INPUT_AUDIO_DIR)
    
    if not audio_files:
        print(f"No audio files found in {INPUT_AUDIO_DIR}")
        return
    
    print(f"Found {len(audio_files)} audio files to process")
    
    successful_conversions = 0
    failed_conversions = 0
    
    for input_filepath in tqdm(audio_files, desc="Converting files"):
        try:
            # Create corresponding output directory structure
            relative_path = os.path.relpath(os.path.dirname(input_filepath), INPUT_AUDIO_DIR)
            output_dir_path = os.path.join(OUTPUT_IMAGE_DIR, relative_path)
            os.makedirs(output_dir_path, exist_ok=True)

            base_name, _ = os.path.splitext(os.path.basename(input_filepath))
            output_filepath = os.path.join(output_dir_path, f"{base_name}.png")

            # Load audio with optional duration and offset
            y, sr = librosa.load(
                input_filepath, 
                sr=SAMPLE_RATE, 
                duration=DURATION,
                offset=OFFSET
            )

            # Generate Mel Spectrogram
            S = librosa.feature.melspectrogram(
                y=y, sr=sr, n_fft=N_FFT, hop_length=HOP_LENGTH, 
                n_mels=N_MELS, fmax=FMAX
            )
            S_dB = librosa.power_to_db(S, ref=np.max)

            # Calculate Deltas (the rate of change)
            delta_S_dB = librosa.feature.delta(S_dB)
            delta2_S_dB = librosa.feature.delta(S_dB, order=2)
            
            # Normalize each channel to the 0-255 range
            S_dB_norm = normalize_channel(S_dB)
            delta_S_dB_norm = normalize_channel(delta_S_dB)
            delta2_S_dB_norm = normalize_channel(delta2_S_dB)
            
            # Stack them into a 3-channel array (height, width, 3)
            stacked_image_data = np.stack([S_dB_norm, delta_S_dB_norm, delta2_S_dB_norm], axis=-1)

            # Save the 3-channel spectrogram as a borderless image
            plt.figure(figsize=FIG_SIZE, dpi=DPI)
            plt.imshow(stacked_image_data, aspect='auto', cmap=COLORMAP)
            plt.axis('off')
            plt.tight_layout(pad=0)
            plt.savefig(output_filepath, bbox_inches='tight', pad_inches=0, dpi=DPI)
            plt.close()
            
            successful_conversions += 1

        except Exception as e:
            print(f"\n-!> ERROR processing {input_filepath}: {e}")
            failed_conversions += 1
            continue

    print("\n" + "="*50)
    print("CONVERSION SUMMARY")
    print("="*50)
    print(f"Successful conversions: {successful_conversions}")
    print(f"Failed conversions: {failed_conversions}")
    print(f"Total files processed: {len(audio_files)}")
    print(f"3-channel mel-spectrograms saved in: {OUTPUT_IMAGE_DIR}")

if __name__ == "__main__":
    main()

Starting conversion from .wav audio to 3-channel mel-spectrogram .png images...
Input directory: Dataset_For_Training/
Output directory: melspectrograms_dataset/
Found 4908 audio files to process


Converting files: 100%|██████████| 4908/4908 [05:16<00:00, 15.51it/s]


CONVERSION SUMMARY
Successful conversions: 4908
Failed conversions: 0
Total files processed: 4908
3-channel mel-spectrograms saved in: melspectrograms_dataset/



