# Audio Preprocessing Experiments

This notebook is used for testing and prototyping preprocessing logic 
before integrating it into the main `src/` Python modules.

**Note:** This is not part of the production code. 
Some code cells may be exploratory or not well-documented.

In [24]:
import tensorflow as tf 
import keras
import librosa
import numpy as np
import matplotlib.pyplot as plt
import IPython.display as dis

In [2]:
file_path = "C:/voice-speaker-binary-classifier/data/"

In [3]:
# Load datasets with inferred lables
dataset = keras.utils.audio_dataset_from_directory(
    directory=file_path,
    labels="inferred",
    label_mode="int",
    # sampling_rate=16000,
    shuffle=True,
    output_sequence_length=48000,
    seed=1337
)

Found 863 files belonging to 2 classes.


In [4]:
# Split to train and validation set

dataset_size = dataset.cardinality().numpy()
train_size = int(0.8 * dataset_size)

train_ds = dataset.take(train_size)
test_ds = dataset.skip(train_size)

In [5]:
def add_noise(audio_batch, noise_factor=None):
    """
    Add random Gaussian noise to audio signal.

    Args:
        audio (np.ndarray): Orginal audio waveform.
        noise_factor (float): Controls noise intensity.

    Returns:
        np.ndarray: Noisy audio.    
    """
    
    noise = np.random.randn(audio_batch.shape[0], audio_batch.shape[1], audio_batch.shape[2])

    if noise_factor is None:
        noise_factor = np.random.uniform(0.001, 0.01)

    scaled_noise = noise_factor * noise
    
    augmented_audio = scaled_noise + audio_batch
    # Clip to maintain [-1, 1] range for audio
    return np.clip(augmented_audio, -1.0, 1.0)

In [6]:
def pitch_shift(audio, sr, n_steps=None):
    """
    Pitch shift audio by n_steps semitones.

    Args:
        audio (np.ndarray): Audio waveform.
        sr (int): Sampling rate.
        n_steps (int): Number of semitones to shift (positive or negative).

    Returns:
        np.ndarray: Pitch shifted audio.    
    """

    if n_steps is None:
        n_steps = np.random.randint(-3, 4, size=audio.shape[0])
    return np.stack([
        librosa.effects.pitch_shift(audio.flatten(), sr=sr, n_steps=int(step))
        for audio, step in zip(audio, n_steps)
    ])    

In [7]:
def amplitude_scaling(audio, scale=None):
    """
    Scale amplitude (volume) of audio.

    Args:
        audio (np.ndarray): Audio waveform.
        scale (float): Amplitude scaled audio.

    Returns:
        np.ndarray: Amplitude scaled audio.    
    """

    if scale is None:
        scale = np.random.uniform(0.6, 1.2, size=audio.shape[0])

    return np.stack([
        np.clip((audio * scale), -1.0, 1.0) 
        for audio, scale in zip(audio, scale)
    ])    

In [8]:
def random_augment(audio, sr):
    """
    Apply a random subset of augmentations to audio waveform.

    Args:
        audio (np.ndarray): Original audio waveform.
        sr (int): Sample rate

    Returns:
        augmented_audio (np.ndarray): augmented audio waveform.    
    """

    augmented = audio.copy()

    # Define probabilities of each augmentation.
    augmentations = [
        ("noise", 0.5, lambda x: (print("Calling noise func"), add_noise(x))[1]),
        ("pitch", 0.5, lambda x: (print("Calling pitch func"), pitch_shift(x, sr))[1]),
        ("amplitude", 0.5, lambda x: (print("Calling amplitude func"), amplitude_scaling(x))[1]),
    ]

    # Randomly apply augmentations based on probabilities
    for name, p, func in augmentations:
        if np.random.random() < p:
            try:
                augmented = func(audio)
            except Exception as e:
                print(f"Warning: augmentation {name} failed with error {e}")
    
    return augmented

In [9]:
def vtlp(melspectrogram, alpha=0.9):
    """
    Apply Vocal Tract Length Perturbation (VTLP) by wraping mel-spectrogram 
    frequency axis.

    Args:
        melspectrogram (np.ndarray): Mel spectrogram (freq x time).
        aplha (float): Wraping factor ~0.9-1.1, controls frequency scaling.

    Returns:
        np.ndarray: Warped mel spectrogram.    
    """

    num_freq_bins = melspectrogram.shape[0]
    warped_mel = np.zeros_like(melspectrogram)

    # Piecewise linear warping parameters
    f0 = int(num_freq_bins * 0.4)  # Frequency boundary point ~40% of bins
    for i in range(num_freq_bins):
        if i <= f0:
            warped_i = int(alpha * i)
        else:
            warped_i = int((num_freq_bins - alpha * f0) / (num_freq_bins - f0) * (i - f0) + alpha * f0)    
        warped_i = min(warped_i, num_freq_bins - 1)
        warped_mel[i, :] = melspectrogram[warped_i, :]
    return warped_mel         

In [10]:
def waveform_to_mel(audio, sr=16000, n_mels=64, n_fft=1024, hop_length=256):
    """
    Convert waveform to mel-spectrogram.

    Args:
        audio (np.ndarray): Audio waveform.
        sr (int): Sampling rate.
        n_mels (int): Number of mel bins.
        n_fft (int): FFT window size.
        hop_length (int): Hop length for STFT.

    Returns:
        np.ndarray: Mel spectrogram (batch, n_mels, timie, 1).    
    """

    mel_specs = []

    for i in range(audio.shape[0]):
        sample = audio[i].squeeze()

        mel_spec = librosa.feature.melspectrogram(
            y=sample,
            sr=sr,
            n_mels=n_mels,
            n_fft=n_fft,
            hop_length=hop_length
        )
        mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)  # Convert to decibel scale.
        mel_specs.append(mel_spec_db)

        if i%10 == 0:
            print(f"Converted {i} samples to mel-spectrogram, remaining {audio.shape[0]-i}")
    
    mel_specs = np.array(mel_specs)
    print(f"Convertion completed!")
    mel_specs = mel_specs[..., np.newaxis]
    mel_specs = (mel_specs - mel_specs.min()) / (mel_specs.max() - mel_specs.min() + 1e-9)
    return mel_specs

In [11]:
def time_frequency_masking(mel_spectrogram, time_mask_param=(5, 20), freq_mask_param=(3, 12)):
    """
    Apply SpecAugment style time and frequency masking on mel spectrogram.
    
    Args:
        mel_spectrogram (np.ndarray): Mel spectrogram input.
        time_mask_param (int): Max width of time mask.
        freq_mask_param (int): Max width of frequency mask.
        
    Returns:
        np.ndarray: Masked mel spectrogram.
    """
    augmented = mel_spectrogram.copy()
    num_freq, num_time, _ = augmented.shape
    time_mask_param = np.random.randint(*time_mask_param)
    
    # Time masking
    if np.random.rand() < 0.5:
        t = np.random.randint(0, time_mask_param)
        t0 = np.random.randint(0, max(1, num_time - t))
        augmented[:, t0:t0+t] = 0
    
    # Frequency masking
    if np.random.rand() < 0.5:
        freq_mask_param = np.random.randint(*freq_mask_param)
        f = np.random.randint(0, freq_mask_param)
        f0 = np.random.randint(0, max(1, num_freq - f))
        augmented[f0:f0+f, :] = 0
    else:
        t = np.random.randint(0, time_mask_param)
        t0 = np.random.randint(0, max(1, num_time - t))
        augmented[:, t0:t0+t] = 0    
    
    return augmented

In [12]:
def pas(mel):

    augmented_batch = []

    # Define augmentation functions with probabilities
    augmentations = [
        ("masking", 0.5, lambda x: (print("Calling masking func"), time_frequency_masking(x))[1]),
        ("vtlp", 0.5, lambda x: (print("Calling vtlp func"), vtlp(x))[1]),
    ]

    # Loop over each sample
    for i, sample in enumerate(mel):
        augmented = sample.copy()
        # Loop through each augmentation
        for name, p, func in augmentations:
            if np.random.random() < p:
                try:
                    augmented = func(augmented)  # apply augmentation to this sample
                except Exception as e:
                    print(f"Warning: augmentation {name} failed for sample {i} with error {e}")

        # Make sure the shape is consistent (add channel dimension if needed)
        if augmented.ndim == 1:
            augmented = np.expand_dims(augmented, axis=-1)  # (48000, 1)

        augmented_batch.append(augmented)

    return np.stack(augmented_batch)

In [13]:
x_list = []
y_list = []

for x_batch, y_batch in train_ds.as_numpy_iterator():
    x_list.append(x_batch)
    y_list.append(y_batch)

x_train = np.array(x_list)
y_train = np.array(y_list)

In [None]:
augg = []

num_aug = 4

for _ in range(num_aug):
    aug = np.stack([
        random_augment(x_train[i], sr=16000).reshape(x_train.shape[1], 48000, 1)
        for i in range(x_train.shape[0])
    ])
    augg.append(aug)
aug = np.stack(augg)    

Calling noise func
Calling pitch func
Calling amplitude func
Calling noise func
Calling amplitude func
Calling noise func
Calling pitch func
Calling amplitude func
Calling amplitude func
Calling noise func
Calling pitch func
Calling amplitude func
Calling pitch func
Calling amplitude func
Calling amplitude func
Calling amplitude func
Calling noise func
Calling pitch func
Calling amplitude func
Calling noise func
Calling pitch func
Calling amplitude func
Calling amplitude func
Calling noise func
Calling pitch func
Calling amplitude func
Calling amplitude func
Calling pitch func
Calling amplitude func
Calling noise func
Calling pitch func
Calling noise func
Calling pitch func
Calling amplitude func
Calling amplitude func
Calling pitch func
Calling noise func
Calling amplitude func
Calling noise func
Calling noise func
Calling amplitude func
Calling amplitude func
Calling amplitude func
Calling amplitude func
Calling noise func
Calling pitch func
Calling amplitude func
Calling noise func


In [15]:
x_train_flat = x_train.reshape(-1, 48000, 1)
aug_flat = aug.reshape(-1, 48000, 1)

In [None]:
all_samples_y = np.tile(y_train, num_aug + 1) # +1 for the original sample
all_samples_x = np.concatenate([x_train_flat, aug_flat], axis=0)

In [23]:
indices = np.arange(all_samples_x.shape[0])
np.random.shuffle(indices)

x_train = all_samples_x[indices]
y_train = all_samples_y[indices]
print(x_train.shape, y_train.shape)

(3360, 48000, 1) (3360,)


In [None]:
mel_spectro = waveform_to_mel(x_train, sr=16000)
aug_mel = pas(mel_spectro)

Converted 0 samples to mel-spectrogram, remaining 3360
Converted 10 samples to mel-spectrogram, remaining 3350
Converted 20 samples to mel-spectrogram, remaining 3340
Converted 30 samples to mel-spectrogram, remaining 3330
Converted 40 samples to mel-spectrogram, remaining 3320
Converted 50 samples to mel-spectrogram, remaining 3310
Converted 60 samples to mel-spectrogram, remaining 3300
Converted 70 samples to mel-spectrogram, remaining 3290
Converted 80 samples to mel-spectrogram, remaining 3280
Converted 90 samples to mel-spectrogram, remaining 3270
Converted 100 samples to mel-spectrogram, remaining 3260
Converted 110 samples to mel-spectrogram, remaining 3250
Converted 120 samples to mel-spectrogram, remaining 3240
Converted 130 samples to mel-spectrogram, remaining 3230
Converted 140 samples to mel-spectrogram, remaining 3220
Converted 150 samples to mel-spectrogram, remaining 3210
Converted 160 samples to mel-spectrogram, remaining 3200
Converted 170 samples to mel-spectrogram, 

In [None]:
x_train_flat_mel = mel_spectro.reshape(-1, 64, 188, 1)
aug_flat_mel = aug_mel.reshape(-1, 64, 188, 1)

In [None]:
all_samples_mel_x = np.concatenate([x_train_flat_mel, aug_flat_mel], axis=0)
all_samples_mel_y = np.tile(y_train, 1 + 1)

In [None]:
indices_mel = np.arange(all_samples_mel_x.shape[0])
np.random.shuffle(indices_mel)

x_train_mel = all_samples_mel_x[indices_mel]
y_train_mel = all_samples_mel_y[indices_mel]

(6720, 64, 188, 1)