In [1]:
import warnings
warnings.filterwarnings("ignore")

In [2]:
!pip install librosa



In [3]:
import matplotlib.pyplot as plt
import librosa
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
import numpy as np
import soundfile as sf
from sklearn.model_selection import train_test_split
from keras.callbacks import EarlyStopping

## Speech Denoising Using Deep Learning

In [5]:
s, sr = librosa.load('data/train_clean_male.wav', sr=None)
S = librosa.stft(s, n_fft=1024, hop_length=512)
sn, sr = librosa.load('data/train_dirty_male.wav', sr=None)
X = librosa.stft(sn, n_fft=1024, hop_length=512)

# (|S| and |X|)
S_mag = np.abs(S)   # clean speech magnitudes
X_mag = np.abs(X)   # noisy speech magnitudes

X_input = X_mag.T   # (2459, 513), noisy input magnitudes
S_target = S_mag.T  # (2459, 513), clean target magnitudes

In [6]:
# validation data
test_sn, sr_test = librosa.load('data/test_x_01.wav', sr=None)
X_test = librosa.stft(test_sn, n_fft=1024, hop_length=512)

# Compute magnitudes and phases
X_test_mag = np.abs(X_test)  # |X_test|
X_test_phase = np.angle(X_test)  # Phase of X_test

# Predict clean magnitudes using the trained model
X_test_input = X_test_mag.T  # Shape: (num_frames, 513)

# We must ensure we handle the case when |X_test| is zero to avoid division by zero
X_test_phase_combined = np.exp(1j * X_test_phase)  # Get the phase information

ground_truth_clean, sr_ground_truth = librosa.load('data/test_s_01.wav', sr=None)
Y_test = librosa.stft(ground_truth_clean, n_fft=1024, hop_length=512)
Y_test_mag = np.abs(Y_test)
Y_test_input = Y_test_mag.T

In [8]:
def compute_snr(ground_truth, recovered_signal):
    signal_power = np.sum(ground_truth ** 2) + 1e-20  # Prevent division by zero
    noise_power = np.sum((ground_truth - recovered_signal) ** 2) + 1e-20  # Prevent division by zero
    snr = 10 * np.log10(signal_power / noise_power)
    return snr

In [9]:
from IPython.display import Audio, display

class SNRBestModelCheckpoint(EarlyStopping):
    def __init__(self, X_test_input, X_test, X_test_mag, ground_truth_clean, sr_test, model_name, patience=10, min_delta=0.0, **kwargs):
        super(SNRBestModelCheckpoint, self).__init__(patience=patience, min_delta=min_delta, **kwargs)
        self.X_test_input = X_test_input
        self.X_test = X_test
        self.X_test_mag = X_test_mag
        self.ground_truth_clean = ground_truth_clean
        self.sr_test = sr_test
        self.best_snr = -np.inf
        self.best_weights = None
        self.model_name = model_name

    def on_epoch_end(self, epoch, logs=None):
        # Predict the clean magnitudes using the validation data
        predicted_magnitudes_test = self.model.predict(self.X_test_input)
        # Recover the speech spectrogram
        X_test_mag_nonzero = np.where(self.X_test_mag == 0, 1e-20, self.X_test_mag)

        # S_hat = (X_test / |X_test|) ⊙ |S_hat_test|
        S_hat = (self.X_test / X_test_mag_nonzero) * np.abs(predicted_magnitudes_test.T)
        # Inverse STFT to get the time-domain signal
        clean_speech_pred_test = librosa.istft(S_hat, hop_length=512)

        min_length = min(len(self.ground_truth_clean), len(clean_speech_pred_test))
        ground_truth_clean_trim = self.ground_truth_clean[:min_length]
        clean_speech_pred_test_trim = clean_speech_pred_test[:min_length]
        snr = compute_snr(ground_truth_clean_trim, clean_speech_pred_test_trim)

        print(f"Epoch {epoch + 1}, SNR: {snr:.4f} dB")
        # Check if the SNR has improved
        if snr > self.best_snr + self.min_delta:
            print(f"SNR improved from {self.best_snr:.4f} to {snr:.4f}")
            self.best_snr = snr
            self.wait = 0  # Reset the patience counter
            self.best_weights = self.model.get_weights()  # Save the best model weights
        else:
            self.wait += 1
            print(f"SNR did not improve. Best SNR: {self.best_snr:.4f}. Patience: {self.wait}/{self.patience}")
            if self.wait >= self.patience:
                self.model.stop_training = True
                print(f"Early stopping triggered. Best SNR: {self.best_snr:.4f} dB")

    def on_train_end(self, logs=None):
        print(f"Restoring model weights from the epoch with the best SNR: {self.best_snr:.4f} dB")
        self.model.set_weights(self.best_weights)

        # Generate and save the best reconstruction
        predicted_magnitudes_test = self.model.predict(self.X_test_input)
        X_test_mag_nonzero = np.where(self.X_test_mag == 0, 1e-20, self.X_test_mag)
        S_hat = (self.X_test / X_test_mag_nonzero) * np.abs(predicted_magnitudes_test.T)
        best_clean_speech_pred_test = librosa.istft(S_hat, hop_length=512)
        sf.write(f'test_01_best_snr_reconstruction_{self.model_name}.wav', best_clean_speech_pred_test, self.sr_test)
        print(f"Saved best SNR reconstruction to 'test_01_best_snr_reconstruction_{self.model_name}.wav'")

        # Play the best reconstruction audio in Jupyter
        print("Playing test_01_best_snr_reconstruction:")
        display(Audio(best_clean_speech_pred_test, rate=self.sr_test))

        # Generate and save the reconstruction for test_02
        test_02_x, sr_test_2 = librosa.load('data/test_x_02.wav', sr=None)
        X_test_2 = librosa.stft(test_02_x, n_fft=1024, hop_length=512)
        X_test_2_mag = np.abs(X_test_2)
        X_test_2_phase = np.angle(X_test_2)

        X_test_2_input = X_test_2_mag.T
        if self.model_name == "CNN":
            X_test_2_input = X_test_2_input[..., np.newaxis]
        predicted_magnitudes_test_2 = self.model.predict(X_test_2_input)
        X_test_2_mag_nonzero = np.where(X_test_2_mag == 0, 1e-20, X_test_2_mag)
        S_hat = (X_test_2 / X_test_2_mag_nonzero) * np.abs(predicted_magnitudes_test_2.T)

        clean_speech_pred_test_2 = librosa.istft(S_hat, hop_length=512)
        sf.write(f'test_s_02_recons_{self.model_name}.wav', clean_speech_pred_test_2, sr_test_2)
        print(f"Saved test_02 reconstruction to 'test_s_02_recons_{self.model_name}.wav'")

        # Play the test_02 reconstruction audio in Jupyter
        print("Playing test_s_02_recons:")
        display(Audio(clean_speech_pred_test_2, rate=sr_test_2))

    def get_best_snr(self):
        return self.best_snr

In [38]:
model = models.Sequential()

model.add(layers.InputLayer(input_shape=(513,)))

model.add(layers.Dense(513, use_bias=False))
model.add(layers.Dense(513, use_bias=False))

# model.add(layers.LayerNormalization())/

# model.add(layers.Dense(1024, activation='relu'))
# model.add(layers.BatchNormalization())
# model.add(layers.Dropout(0.2))

# model.add(layers.Dense(512, activation='relu'))
# model.add(layers.BatchNormalization())
# model.add(layers.Dropout(0.2))

# model.add(layers.Dense(256, activation='relu'))
# model.add(layers.BatchNormalization())
# model.add(layers.Dropout(0.2))

# model.add(layers.Dense(512, activation='relu'))
# model.add(layers.BatchNormalization())
# model.add(layers.Dropout(0.2))

# model.add(layers.Dense(1024, activation='relu'))
# model.add(layers.BatchNormalization())
# model.add(layers.Dropout(0.2))

# model.add(layers.Dense(513, activation='leaky_relu', kernel_initializer='he_normal'))
# model.add(layers.Dense(513, activation='leaky_relu', kernel_initializer='he_normal'))


model.add(layers.Dense(513, activation='relu', kernel_initializer='he_normal'))


# initial_learning_rate = 0.001
# decay_steps = 100
# decay_rate = 0.96

# # Create a Learning Rate Schedule
# learning_rate = tf.keras.optimizers.schedules.ExponentialDecay(
#     initial_learning_rate,
#     decay_steps=decay_steps,
#     decay_rate=decay_rate,
#     staircase=True)


model.compile(optimizer=optimizers.Adam(learning_rate=0.0001), loss='mean_squared_error')

In [39]:
snr_early_stopping = SNRBestModelCheckpoint(X_test_input=X_test_input, X_test=X_test, X_test_mag=X_test_mag,
                                              ground_truth_clean=ground_truth_clean, sr_test=sr_test, model_name="DNN", patience=100)

# training the model with the custom early stopping callback
model.fit(X_input, S_target, batch_size=100, epochs=1000, validation_data=(X_test_input, Y_test_input),
          callbacks=[snr_early_stopping])

Epoch 1/1000
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
Epoch 1, SNR: 0.7424 dB
SNR improved from -inf to 0.7424
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 40ms/step - loss: 0.1367 - val_loss: 0.2790
Epoch 2/1000
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
Epoch 2, SNR: 1.6828 dB
SNR improved from 0.7424 to 1.6828
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - loss: 0.0686 - val_loss: 0.2212
Epoch 3/1000
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
Epoch 3, SNR: 2.3497 dB
SNR improved from 1.6828 to 2.3497
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - loss: 0.0476 - val_loss: 0.1878
Epoch 4/1000
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
Epoch 4, SNR: 2.8316 dB
SNR improved from 2.3497 to 2.8316
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - loss: 0.0331 - val_loss: 0.16

[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 
Saved test_02 reconstruction to 'test_s_02_recons_DNN.wav'
Playing test_s_02_recons:


<keras.src.callbacks.history.History at 0x7c86084c3df0>

In [11]:
# SNR for the noisy signal (as a frame of reference)
min_length = min(len(ground_truth_clean), len(test_sn))
noisy_signal = test_sn[:min_length]
clean_signal = ground_truth_clean[:min_length]
snr_noisy = compute_snr(clean_signal, noisy_signal)
print(f'SNR of the noisy signal: {snr_noisy:.2f} dB')

SNR of the noisy signal: 13.23 dB


- I am using SNR for early stopping.
- The best SNR is around the 13.38 dB which surpasses the SNR of the noisy signal (13.23 dB).
- The denoising of test_x_02 is saved under **test_s_02_recons_DNN**.

## Speech Denoising Using 1D CNN

In [12]:
# X_input_reshaped = np.expand_dims(X_input, axis=-1)  # Shape: (num_frames, 513, 1)
# S_target_reshaped = np.expand_dims(S_target, axis=-1)  # Shape: (num_frames, 513, 1)
X_input_reshaped = X_input[..., np.newaxis]  # (num_samples, 513, 1)
S_target_reshaped = S_target[..., np.newaxis]  # (num_samples, 513, 1)
X_test_input_cnn = X_test_input[..., np.newaxis]  # (num_test_samples, 513, 1)
Y_test_input_cnn = Y_test_input[..., np.newaxis]  # (num_test_samples, 513, 1)

In [36]:
cnn_model = models.Sequential()

cnn_model.add(layers.Conv1D(filters=16, kernel_size=8, strides=2, padding='same', activation='relu', input_shape=(513, 1)))
# # cnn_model.add(layers.BatchNormalization())
# cnn_model.add(layers.MaxPooling1D(pool_size=2, strides=2))  # Optional max pooling
# cnn_model.add(layers.Dropout(0.1))

cnn_model.add(layers.Conv1D(filters=16, kernel_size=8, strides=2, padding='same', activation='relu'))

cnn_model.add(layers.Flatten())

cnn_model.add(layers.Dense(513, activation='relu'))

cnn_model.compile(optimizer=optimizers.Adam(learning_rate=0.0001), loss='mean_squared_error')

In [37]:
snr_early_stopping = SNRBestModelCheckpoint(X_test_input=X_test_input_cnn, X_test=X_test, X_test_mag=X_test_mag,
                                              ground_truth_clean=ground_truth_clean, sr_test=sr_test, model_name="CNN", patience=100)


cnn_model.fit(X_input_reshaped, S_target_reshaped, batch_size=100, epochs=1000, validation_data=(X_test_input_cnn, Y_test_input_cnn), callbacks=[snr_early_stopping])

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 75ms/step - loss: 0.0969 - val_loss: 0.2527
Epoch 2/1000
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 
Epoch 2, SNR: 1.0574 dB
SNR improved from 0.4980 to 1.0574
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step - loss: 0.0893 - val_loss: 0.2262
Epoch 3/1000
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step 
Epoch 3, SNR: 1.5703 dB
SNR improved from 1.0574 to 1.5703
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step - loss: 0.0820 - val_loss: 0.2071
Epoch 4/1000
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
Epoch 4, SNR: 1.9305 dB
SNR improved from 1.5703 to 1.9305
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 9ms/step - loss: 0.0692 - val_loss: 0.1966
Epoch 5/1000
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[

[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step
Saved test_02 reconstruction to 'test_s_02_recons_CNN.wav'
Playing test_s_02_recons:


<keras.src.callbacks.history.History at 0x7c86d006c9a0>

- The SNR using the CNN model is around 13.46 dB which is better than the fully connected network as well as the SNR of the noisy signal.
- The denoising of test_x_02 is saved under **test_s_02_recons_CNN**.