In [21]:
import tensorflow as tf
import numpy as np
import librosa
import librosa.display
import os
import matplotlib.pyplot as plt
from tensorflow.keras import layers, models
import soundfile as sf
from typing import Tuple, Optional

In [22]:
def load_audio_file(file_path: str, sr: int = 16000) -> np.ndarray:
    """
    Load an audio file with a specified sample rate.
    
    Args:
        file_path (str): Path to the audio file.
        sr (int): Sample rate for loading the audio.
    
    Returns:
        np.ndarray: Loaded audio signal.
    """
    audio, _ = librosa.load(file_path, sr=sr)
    return audio

def audio_to_spectrogram(audio: np.ndarray, n_fft: int = 1024, hop_length: int = 512) -> np.ndarray:
    """
    Convert audio signal to a spectrogram.
    
    Args:
        audio (np.ndarray): Audio signal.
        n_fft (int): Number of FFT components.
        hop_length (int): Number of samples between successive frames.
    
    Returns:
        np.ndarray: Spectrogram of the audio.
    """
    stft = librosa.stft(audio, n_fft=n_fft, hop_length=hop_length)
    spectrogram = np.abs(stft)
    return spectrogram

def spectrogram_to_audio(spectrogram: np.ndarray, hop_length: int = 512) -> np.ndarray:
    """
    Convert a spectrogram back to an audio signal.
    
    Args:
        spectrogram (np.ndarray): Spectrogram.
        hop_length (int): Number of samples between successive frames.
    
    Returns:
        np.ndarray: Reconstructed audio signal.
    """
    stft_reconstructed = librosa.istft(spectrogram, hop_length=hop_length)
    return stft_reconstructed


In [23]:
def load_data(
    noise_path: str,
    clean_path: str,
    noisy_path: str,
    limit: Optional[int] = None,
    fixed_length: int = 300
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Load noise, clean, and noisy audio data and convert them to spectrograms with a fixed length.
    
    Args:
        noise_path (str): Path to noise recordings.
        clean_path (str): Path to clean recordings.
        noisy_path (str): Path to noisy recordings.
        limit (Optional[int]): Limit on the number of files to load.
        fixed_length (int): Fixed time dimension length for spectrograms.
    
    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray]: Arrays of noise, clean, and noisy spectrograms.
    """
    noise_files = sorted([os.path.join(noise_path, f) for f in os.listdir(noise_path) if f.endswith('.wav')])[:limit]
    clean_files = sorted([os.path.join(clean_path, f) for f in os.listdir(clean_path) if f.endswith('.flac')])[:limit]
    noisy_files = sorted([os.path.join(noisy_path, f) for f in os.listdir(noisy_path) if f.endswith('.mp3')])[:limit]
    
    def pad_or_truncate(spectrogram: np.ndarray, target_length: int) -> np.ndarray:
        """
        Pad or truncate a spectrogram to ensure a consistent length along the time axis.
        
        Args:
            spectrogram (np.ndarray): Input spectrogram.
            target_length (int): Desired length along the time axis.
        
        Returns:
            np.ndarray: Padded or truncated spectrogram.
        """
        if spectrogram.shape[1] > target_length:
            return spectrogram[:, :target_length]
        else:
            padding = target_length - spectrogram.shape[1]
            return np.pad(spectrogram, ((0, 0), (0, padding)), mode='constant')
    
    # Convert audio to spectrograms and fix length
    noise_spectrograms = [pad_or_truncate(audio_to_spectrogram(load_audio_file(f)), fixed_length) for f in noise_files]
    clean_spectrograms = [pad_or_truncate(audio_to_spectrogram(load_audio_file(f)), fixed_length) for f in clean_files]
    noisy_spectrograms = [pad_or_truncate(audio_to_spectrogram(load_audio_file(f)), fixed_length) for f in noisy_files]
    
    # Convert lists to numpy arrays with consistent shapes
    return np.array(noise_spectrograms), np.array(clean_spectrograms), np.array(noisy_spectrograms)


In [None]:
def build_denoising_cnn(input_shape: Tuple[int, int, int]) -> tf.keras.Model:
    """
    Define a CNN model for audio noise reduction with precise shape matching adjustments.
    
    Args:
        input_shape (Tuple[int, int, int]): Shape of the input spectrogram (height, width, channels).
    
    Returns:
        tf.keras.Model: Compiled denoising CNN model.
    """
    model = models.Sequential([
        layers.Input(shape=input_shape),
        
        # Encoder
        layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
        layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
        layers.MaxPooling2D((2, 2), padding='same'),  # Reduce size by half
        
        layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
        layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
        layers.MaxPooling2D((2, 2), padding='same'),  # Reduce size by half again
        
        layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
        layers.Conv2D(128, (3, 3), activation='relu', padding='same'),
        layers.MaxPooling2D((2, 2), padding='same'),  # Reduce size by half once more
        
        # Decoder
        layers.Conv2DTranspose(128, (3, 3), activation='relu', padding='same'),
        layers.Conv2DTranspose(128, (3, 3), activation='relu', padding='same'),
        layers.UpSampling2D((2, 2)),  # Restore original size
        
        layers.Conv2DTranspose(64, (3, 3), activation='relu', padding='same'),
        layers.Conv2DTranspose(64, (3, 3), activation='relu', padding='same'),
        layers.UpSampling2D((2, 2)),  # Restore size further
        
        layers.Conv2DTranspose(32, (3, 3), activation='relu', padding='same'),
        layers.Conv2DTranspose(32, (3, 3), activation='relu', padding='same'),
        layers.UpSampling2D((2, 2)),  # Final upsampling to match input size
        
        # Output layer
        layers.Conv2D(1, (3, 3), activation='sigmoid', padding='same')
    ])
    
    # Confirm model output shape
    print("Expected input shape:", input_shape)
    print("Model output shape:", model.output_shape)
    
    model.compile(optimizer='adam', loss='mse')
    return model


In [25]:
# Set paths for training and validation data
train_noise_path = '../../../data/audios/english/train/noise'
train_clean_path = '../../../data/audios/english/train/clean_trim'
train_noisy_path = '../../../data/audios/english/train/blended_trim'

val_noise_path = '../../../data/audios/english/validation/noise'
val_clean_path = '../../../data/audios/english/validation/clean_trim'
val_noisy_path = '../../../data/audios/english/validation/blended_trim'

# Load training and validation data (limit as needed)
train_noise, train_clean, train_noisy = load_data(train_noise_path, train_clean_path, train_noisy_path, limit=500, fixed_length=300)
val_noise, val_clean, val_noisy = load_data(val_noise_path, val_clean_path, val_noisy_path, limit=100, fixed_length=300)

# Add channel dimension for CNN input
train_noisy = train_noisy[..., np.newaxis]
train_clean = train_clean[..., np.newaxis]
val_noisy = val_noisy[..., np.newaxis]
val_clean = val_clean[..., np.newaxis]

# Debugging statements to inspect shapes
print("Training data shapes:")
print("train_noisy:", train_noisy.shape)
print("train_clean:", train_clean.shape)
print("Validation data shapes:")
print("val_noisy:", val_noisy.shape)
print("val_clean:", val_clean.shape)

# Build and train model
input_shape = train_noisy.shape[1:]  # Use shape of one sample
model = build_denoising_cnn(input_shape)
history = model.fit(train_noisy, train_clean, validation_data=(val_noisy, val_clean), epochs=20, batch_size=8)


Training data shapes:
train_noisy: (100, 513, 300, 1)
train_clean: (100, 513, 300, 1)
Validation data shapes:
val_noisy: (100, 513, 300, 1)
val_clean: (100, 513, 300, 1)
Expected input shape: (513, 300, 1)
Model output shape: (None, 520, 304, 1)
Epoch 1/20


ValueError: Dimensions must be equal, but are 513 and 520 for '{{node compile_loss/mse/sub}} = Sub[T=DT_FLOAT](data_1, sequential_4_1/conv2d_25_1/Sigmoid)' with input shapes: [?,513,300,1], [?,520,304,1].

In [None]:
# Save the model to both .h5 and .keras formats
model.save('ML-DAN_v3.0.h5')
model.save('ML-DAN_v3.0.keras')


In [None]:
def denoise_audio(model: tf.keras.Model, noisy_audio_path: str, output_path: str) -> None:
    """
    Apply noise reduction on a noisy audio file and save the cleaned audio.
    
    Args:
        model (tf.keras.Model): Trained noise reduction model.
        noisy_audio_path (str): Path to the noisy audio file.
        output_path (str): Path where the denoised audio will be saved.
    """
    # Load and convert noisy audio to spectrogram
    noisy_audio = load_audio_file(noisy_audio_path)
    noisy_spectrogram = audio_to_spectrogram(noisy_audio)
    
    # Add batch and channel dimension for model prediction
    noisy_spectrogram = noisy_spectrogram[np.newaxis, ..., np.newaxis]
    
    # Predict clean spectrogram
    predicted_clean_spectrogram = model.predict(noisy_spectrogram)[0, ..., 0]
    
    # Convert predicted spectrogram back to audio
    cleaned_audio = spectrogram_to_audio(predicted_clean_spectrogram)
    
    # Save the cleaned audio
    sf.write(output_path, cleaned_audio, 16000)

# Example usage of the denoising function
denoise_audio(model, 'path_to_noisy_audio.mp3', 'path_to_cleaned_output.wav')
