# API Project
Comparing various Noise Reduction Techniques with each other using various kinds of metrics

### Prepearing Dataset

In [15]:
!pip install pydub



In [11]:
import os
from pydub import AudioSegment

def combine_audio_with_noise(original_folder, noise_folder, output_folder, noise_reduction_db=10):
    # Ensure output folder exists
    os.makedirs(output_folder, exist_ok=True)

    # List audio files in the original and noise folders
    original_files = [f for f in os.listdir(original_folder) if f.endswith(".wav")]
    noise_files = [f for f in os.listdir(noise_folder) if f.endswith(".wav")]

    if not original_files:
        print("No original audio files found in the folder.")
        return

    if not noise_files:
        print("No noise files found in the folder.")
        return

    for original_file in original_files:
        # Load the original audio
        original_path = os.path.join(original_folder, original_file)
        original_audio = AudioSegment.from_wav(original_path)

        # Select a random noise file
        noise_file = noise_files[0]  # Use the first noise file, or implement random selection
        noise_path = os.path.join(noise_folder, noise_file)
        noise_audio = AudioSegment.from_wav(noise_path)

        # Reduce the volume of the noise
        noise_audio = noise_audio - noise_reduction_db

        # Adjust the length of the noise to match the original audio
        if len(noise_audio) < len(original_audio):
            noise_audio = noise_audio * (len(original_audio) // len(noise_audio) + 1)

        noise_audio = noise_audio[:len(original_audio)]

        # Combine the original audio and noise
        combined_audio = original_audio.overlay(noise_audio)

        # Save the combined audio
        output_path = os.path.join(output_folder, f"combined_{original_file}")
        combined_audio.export(output_path, format="wav")

    print("Audio files combined and saved successfully.")

# Example usage
original_folder = "./MS-SNSD-master/clean_test"
noise_folder = "./MS-SNSD-master/noise_test"
output_folder = "./MS-SNSD-master/combined_test"
combine_audio_with_noise(original_folder, noise_folder, output_folder, noise_reduction_db=25)

Audio files combined and saved successfully.


### Technique 1 STFT

In [2]:
import IPython
from scipy.io import wavfile
import scipy.signal
import numpy as np
import matplotlib.pyplot as plt
import librosa
import soundfile
import time
from datetime import timedelta as td

In [3]:
def fftnoise(f):
    f = np.array(f, dtype="complex")
    Np = (len(f) - 1) // 2
    phases = np.random.rand(Np) * 2 * np.pi
    phases = np.cos(phases) + 1j * np.sin(phases)
    f[1: Np + 1] *= phases
    f[-1: -1 - Np: -1] = np.conj(f[1: Np + 1])
    return np.fft.ifft(f).real

In [4]:

def band_limited_noise(min_freq, max_freq, samples=1024, samplerate=1):
    freqs = np.abs(np.fft.fftfreq(samples, 1 / samplerate))
    f = np.zeros(samples)
    f[np.logical_and(freqs >= min_freq, freqs <= max_freq)] = 1
    return fftnoise(f)
    

In [5]:

def _stft(y, n_fft, hop_length, win_length):
    return librosa.stft(y=y, n_fft=n_fft, hop_length=hop_length, win_length=win_length)
    

In [6]:

def _istft(y, hop_length, win_length):
    return librosa.istft(y, hop_length=hop_length, win_length=win_length)


def _amp_to_db(x):
    return librosa.amplitude_to_db(x, ref=1.0, amin=1e-20, top_db=80.0)


def _db_to_amp(x):
    return librosa.db_to_amplitude(x, ref=1.0)
    

def plot_spectrogram(signal, title):
    fig, ax = plt.subplots(figsize=(20, 4))
    cax = ax.matshow(
        signal,
        origin="lower",
        aspect="auto",
        cmap=plt.cm.seismic,
        vmin=-1 * np.max(np.abs(signal)),
        vmax=np.max(np.abs(signal)),
    )
    fig.colorbar(cax)
    ax.set_title(title)
    plt.show()
    

In [7]:

def plot_statistics_and_filter(mean_freq_noise, std_freq_noise, noise_thresh, smoothing_filter):
    fig, ax = plt.subplots(ncols=2, figsize=(20, 4))
    ax[0].plot(std_freq_noise, label="Std. power of noise")
    ax[0].plot(noise_thresh, label="Noise threshold (by frequency)")
    ax[0].set_title("Threshold for mask")
    ax[0].legend()
    cax = ax[1].matshow(smoothing_filter, origin="lower")
    fig.colorbar(cax)
    ax[1].set_title("Filter for smoothing Mask")
    plt.show()
    


In [8]:

def remove_noise(
    audio_clip, 
    noise_clip,
    n_grad_freq=2,
    n_grad_time=4,
    n_fft=512,
    win_length=512,
    hop_length=512//4,
    n_std_thresh=0.5,
    prop_decrease=0.8,
    verbose=False,
    visual=False,
):
    noise_stft = _stft(noise_clip, n_fft, hop_length, win_length)
    noise_stft_db = _amp_to_db(np.abs(noise_stft))

    mean_freq_noise = np.mean(noise_stft_db, axis=1)
    std_freq_noise = np.std(noise_stft_db, axis=1)
    noise_thresh = mean_freq_noise + std_freq_noise * n_std_thresh

    sig_stft = _stft(audio_clip, n_fft, hop_length, win_length)
    sig_stft_db = _amp_to_db(np.abs(sig_stft))

    mask_gain_dB = np.min(_amp_to_db(np.abs(sig_stft)))

    smoothing_filter = np.outer(
        np.concatenate([
            np.linspace(0, 1, n_grad_freq + 1, endpoint=False),
            np.linspace(1, 0, n_grad_freq + 2),
        ])[1:-1],
        np.concatenate([
            np.linspace(0, 1, n_grad_time + 1, endpoint=False),
            np.linspace(1, 0, n_grad_time + 2),
        ])[1:-1],
    )
    smoothing_filter = smoothing_filter / np.sum(smoothing_filter)

    db_thresh = np.repeat(
        np.reshape(noise_thresh, [1, len(mean_freq_noise)]),
        np.shape(sig_stft_db)[1],
        axis=0,
    ).T

    sig_mask = sig_stft_db < db_thresh
    sig_mask = scipy.signal.fftconvolve(sig_mask, smoothing_filter, mode="same")
    sig_mask = sig_mask * prop_decrease

    sig_stft_db_masked = (
        sig_stft_db * (1 - sig_mask)
        + np.ones(np.shape(mask_gain_dB)) * mask_gain_dB * sig_mask
    )
    sig_imag_masked = np.imag(sig_stft) * (1 - sig_mask)
    sig_stft_amp = (_db_to_amp(sig_stft_db_masked) * np.sign(sig_stft)) + (1j * sig_imag_masked)

    recovered_signal = _istft(sig_stft_amp, hop_length, win_length)
    recovered_spec = _amp_to_db(
        np.abs(_stft(recovered_signal, n_fft, hop_length, win_length))
    )

    if verbose:
        plot_spectrogram(noise_stft_db, 'Noise STFT (dB)')
        plot_spectrogram(recovered_spec, 'Recovered Spectrogram (dB)')
    
    return recovered_signal
    

In [12]:
import os
from scipy.io import wavfile
import librosa
import soundfile as sf

def process_folder(input_folder, output_folder, n_fft=512, hop_length=128, win_length=512):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for file_name in os.listdir(input_folder):
        if file_name.endswith('.wav'):  # Ensure only WAV files are processed
            input_path = os.path.join(input_folder, file_name)
            output_path = os.path.join(output_folder, f"denoised_{file_name}")
            
            print(f"Processing {input_path}...")
            data, sr = librosa.load(input_path, sr=None)
            denoised = remove_noise(data, data, n_fft=n_fft, hop_length=hop_length, win_length=win_length)
            sf.write(output_path, denoised, sr)
            print(f"Saved denoised file to {output_path}")

if __name__ == "__main__":
    input_folder = './MS-SNSD-master/combined_test/'
    output_folder = './output/STFT/'
    
    process_folder(input_folder, output_folder)


Processing ./MS-SNSD-master/combined_test/combined_clnsp0.wav...
Saved denoised file to ./output/STFT/denoised_combined_clnsp0.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp1.wav...
Saved denoised file to ./output/STFT/denoised_combined_clnsp1.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp10.wav...
Saved denoised file to ./output/STFT/denoised_combined_clnsp10.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp11.wav...
Saved denoised file to ./output/STFT/denoised_combined_clnsp11.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp12.wav...
Saved denoised file to ./output/STFT/denoised_combined_clnsp12.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp13.wav...
Saved denoised file to ./output/STFT/denoised_combined_clnsp13.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp14.wav...
Saved denoised file to ./output/STFT/denoised_combined_clnsp14.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp15.wav...
Saved d

### Technique 2 FFT + DNN

In [14]:
import os
from scipy.fft import fft, ifft
import numpy as np
from scipy.io import wavfile

def fft_denoise(audio, sample_rate, cutoff_frequency=2000):
    # Perform FFT
    audio_fft = fft(audio)
    
    # Frequency filtering
    frequency_indices = np.fft.fftfreq(len(audio), d=1/sample_rate)
    high_amplitude_indices = np.abs(audio_fft) > np.percentile(np.abs(audio_fft), 99)
    audio_fft[high_amplitude_indices] *= 0.1  # Reduce high amplitude components

    # Perform inverse FFT
    filtered_audio = np.real(ifft(audio_fft))
    return filtered_audio

def process_folder_fft_dnn(input_folder, output_folder, cutoff_frequency=2000):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for file_name in os.listdir(input_folder):
        if file_name.endswith('.wav'):  # Ensure only WAV files are processed
            input_path = os.path.join(input_folder, file_name)
            output_path = os.path.join(output_folder, f"denoised_{file_name}")
            
            print(f"Processing {input_path}...")
            sample_rate, audio = wavfile.read(input_path)
            denoised_audio = fft_denoise(audio, sample_rate, cutoff_frequency=cutoff_frequency)
            wavfile.write(output_path, sample_rate, denoised_audio.astype(np.int16))
            print(f"Saved denoised file to {output_path}")

if __name__ == "__main__":
    input_folder = './MS-SNSD-master/combined_test/'
    output_folder = './output/FFT/'
    
    process_folder_fft_dnn(input_folder, output_folder)


Processing ./MS-SNSD-master/combined_test/combined_clnsp0.wav...
Saved denoised file to ./output/FFT/denoised_combined_clnsp0.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp1.wav...
Saved denoised file to ./output/FFT/denoised_combined_clnsp1.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp10.wav...
Saved denoised file to ./output/FFT/denoised_combined_clnsp10.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp11.wav...
Saved denoised file to ./output/FFT/denoised_combined_clnsp11.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp12.wav...
Saved denoised file to ./output/FFT/denoised_combined_clnsp12.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp13.wav...
Saved denoised file to ./output/FFT/denoised_combined_clnsp13.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp14.wav...
Saved denoised file to ./output/FFT/denoised_combined_clnsp14.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp15.wav...
Saved denoised

In [15]:
import numpy as np
import keras
import scipy.io.wavfile
import scipy.signal
import os
import random
from pathlib import Path
from sphfile import SPHFile


2024-12-20 19:23:38.970139: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1734719019.040127   16456 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1734719019.059765   16456 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-12-20 19:23:39.220158: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [16]:
# Define the stackLayers function
def stackLayers(layerSet):
    stack = layerSet[0]
    for i in range(1, len(layerSet)):
        stack = layerSet[i](stack)
    return stack


In [17]:
# Set the working directory for TIMIT dataset
train_cwd = './TIMIT/TRAIN'  # Adjust path to TIMIT training data
test_cwd = './TIMIT/TEST'    # Adjust path to TIMIT test data

# Load the training dataset (TIMIT)
trainSoundClips = []
print(f"Loading training data from: {train_cwd}")
for soundFile in Path(train_cwd).rglob('*.wav'):
    trainSoundClips += [scipy.io.wavfile.read(soundFile)]

# Load the testing dataset (TIMIT)
testSoundClips = []
print(f"Loading testing data from: {test_cwd}")
for soundFile in Path(test_cwd).rglob('*.wav'):
    testSoundClips += [scipy.io.wavfile.read(soundFile)]

# Assuming the dataset rate is the same for all files
dataRate = trainSoundClips[0][0]

# Extracting audio data from the loaded clips
trainClips = [i[1] for i in trainSoundClips]
testClips = [i[1] for i in testSoundClips]

# Merge training data and validation split (90% training, 10% validation)
mergedSpeech = np.concatenate(trainClips, axis=0)
validationSpeech = mergedSpeech[mergedSpeech.shape[0] * 9 // 10:]
mergedSpeech = mergedSpeech[:mergedSpeech.shape[0] * 9 // 10]

normalizingFactor = np.std(mergedSpeech)
noisingFactor = 0.15  # Can adjust based on noise level
clipLength = 1024

Loading training data from: ./TIMIT/TRAIN
Loading testing data from: ./TIMIT/TEST


In [18]:
# Define the sampleGenerator function
def sampleGenerator(originalSound, sampleLength, noisingFactor, normalizingFactor, batchSize=32, firstFixed=None):
    while True:
        indices = np.random.randint(low=0, high=originalSound.shape[0] - sampleLength, size=batchSize).tolist()
        if firstFixed is not None:
            indices[0] = firstFixed
        samples = np.array([originalSound[index:index + sampleLength] for index in indices])
        noise = np.random.normal(loc=0, scale=noisingFactor * normalizingFactor, size=samples.shape)
        yield ((samples + noise) / normalizingFactor, samples / normalizingFactor)


In [19]:
# Define the model layers (already provided in the code)
denoiserLayers = [
    keras.layers.Input(shape=(clipLength,)),
    keras.layers.Reshape((clipLength, 1)),

    keras.layers.Conv1D(filters=64, kernel_size=25, padding='same'),
    keras.layers.LeakyReLU(),
    keras.layers.Conv1D(filters=64, kernel_size=25, padding='same'),
    keras.layers.LeakyReLU(),
    keras.layers.Dropout(.4),

    keras.layers.Conv1D(filters=64, kernel_size=25, padding='same'),
    keras.layers.LeakyReLU(),
    keras.layers.Conv1D(filters=64, kernel_size=25, padding='same'),
    keras.layers.LeakyReLU(),
    keras.layers.Dropout(.4),

    keras.layers.Conv1D(filters=64, kernel_size=25, padding='same'),
    keras.layers.LeakyReLU(),
    keras.layers.Conv1D(filters=64, kernel_size=25, padding='same'),
    keras.layers.LeakyReLU(),
    keras.layers.Dropout(.4),

    keras.layers.Conv1D(filters=64, kernel_size=25, padding='same'),
    keras.layers.LeakyReLU(),

    keras.layers.Conv1D(filters=1, kernel_size=25, padding='same'),
    keras.layers.Dropout(.4),

    keras.layers.Reshape((clipLength,)),
]

# Now you can use stackLayers function to build the model
denoiser = keras.models.Model(inputs=denoiserLayers[0], outputs=stackLayers(denoiserLayers))
denoiser.compile(optimizer=keras.optimizers.Adam(.0001, decay=1e-7), loss='mse', metrics=['mse'])
denoiser.summary()


I0000 00:00:1734719030.085016   16456 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 5467 MB memory:  -> device: 0, name: NVIDIA GeForce GTX 1060, pci bus id: 0000:01:00.0, compute capability: 6.1


In [32]:
# Train the model
clipGenerator = sampleGenerator(mergedSpeech, clipLength, noisingFactor, normalizingFactor, batchSize=32)
validationClipGenerator = sampleGenerator(validationSpeech, clipLength, noisingFactor, normalizingFactor, batchSize=256)

# Train the model

denoiser.fit(clipGenerator, 
              steps_per_epoch=512, 
              epochs=30, 
              validation_data=validationClipGenerator, 
              validation_steps=16,
              verbose=2)



Epoch 1/30
512/512 - 23s - 45ms/step - loss: 0.4093 - mse: 0.4093 - val_loss: 0.1460 - val_mse: 0.1460
Epoch 2/30
512/512 - 23s - 46ms/step - loss: 0.4184 - mse: 0.4184 - val_loss: 0.1511 - val_mse: 0.1511
Epoch 3/30
512/512 - 24s - 47ms/step - loss: 0.4232 - mse: 0.4232 - val_loss: 0.1416 - val_mse: 0.1416
Epoch 4/30
512/512 - 29s - 57ms/step - loss: 0.4155 - mse: 0.4155 - val_loss: 0.1534 - val_mse: 0.1534
Epoch 5/30
512/512 - 39s - 77ms/step - loss: 0.4106 - mse: 0.4106 - val_loss: 0.1434 - val_mse: 0.1434
Epoch 6/30
512/512 - 45s - 88ms/step - loss: 0.4085 - mse: 0.4085 - val_loss: 0.1530 - val_mse: 0.1530
Epoch 7/30
512/512 - 54s - 105ms/step - loss: 0.4318 - mse: 0.4318 - val_loss: 0.1698 - val_mse: 0.1698
Epoch 8/30
512/512 - 59s - 114ms/step - loss: 0.4099 - mse: 0.4099 - val_loss: 0.1701 - val_mse: 0.1701
Epoch 9/30
512/512 - 61s - 119ms/step - loss: 0.4136 - mse: 0.4136 - val_loss: 0.1477 - val_mse: 0.1477
Epoch 10/30
512/512 - 60s - 117ms/step - loss: 0.4225 - mse: 0.4225 - 

<keras.src.callbacks.history.History at 0x7efd7c43f730>

In [38]:
# Test the model on a noisy clip from the test dataset
def sequentialPredict(data, subsequenceLength, stride):
    assert stride <= subsequenceLength
    batchSize = 32
    batchedData = np.empty((int(np.ceil(data.shape[0] - subsequenceLength) / stride) + 1, subsequenceLength))
    startIndices = np.empty((batchedData.shape[0],), dtype=np.int32)
    for i in range(0, batchedData.shape[0]):
        startIndex = i * stride
        if startIndex > data.shape[0] - subsequenceLength:
            startIndex = data.shape[0] - subsequenceLength
        batchedData[i] = data[startIndex:startIndex + subsequenceLength] / normalizingFactor
        startIndices[i] = startIndex
    processedBatches = denoiser.predict(batchedData, batch_size=batchSize) * normalizingFactor

    finalData = np.zeros(data.shape)
    hitCounter = np.zeros(data.shape)
    for i in range(0, batchedData.shape[0]):
        finalData[startIndices[i]:startIndices[i] + subsequenceLength] += processedBatches[i]
        hitCounter[startIndices[i]:startIndices[i] + subsequenceLength] += 1
    finalData = np.divide(finalData, hitCounter)
    return finalData.astype(np.int16)

testClip = testClips[0]  # Choose a clip from the test set
noisedClip = (testClip + np.random.normal(loc=0, scale=noisingFactor * normalizingFactor, size=testClip.shape)).astype(np.int16)

# Denoise the clip
predicted = sequentialPredict(noisedClip, clipLength, stride=clipLength // 2)

# Save the denoised output
scipy.io.wavfile.write("predicted_test.wav", dataRate, predicted)
scipy.io.wavfile.write("original_test.wav", dataRate, testClips[0])
scipy.io.wavfile.write("noised_test.wav", dataRate, noisedClip)

[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step


In [34]:
import os
import scipy.io.wavfile as wavfile
import numpy as np
import matplotlib.pyplot as plt

# Function to load audio file
def load_audio_file(file_path):
    rate, data = wavfile.read(file_path)  # Load the .wav file
    return rate, data

# Function to convert stereo audio to mono
def stereo_to_mono(stereo_audio):
    if len(stereo_audio.shape) == 2:  # Check if audio is stereo
        mono_audio = stereo_audio.mean(axis=1)  # Average the two channels
        return mono_audio.astype(np.int16)  # Convert back to int16 if necessary
    return stereo_audio  # If it's already mono, return it as is

# Function for sequential prediction (actual processing logic)
def sequentialPredict(data, subsequenceLength, stride):
    assert stride <= subsequenceLength, "Stride must be less than or equal to the subsequence length."
    batchSize = 32  # Example batch size
    total_batches = (len(data) - subsequenceLength) // stride + 1
    processed_data = np.zeros(len(data), dtype=np.float32)
    hit_counts = np.zeros(len(data), dtype=np.float32)

    for batch_idx in range(total_batches):
        start_idx = batch_idx * stride
        end_idx = start_idx + subsequenceLength
        if end_idx > len(data):
            end_idx = len(data)
            start_idx = end_idx - subsequenceLength

        subsequence = data[start_idx:end_idx]
        denoised_subsequence = subsequence * 0.95  # Example denoising factor
        processed_data[start_idx:end_idx] += denoised_subsequence
        hit_counts[start_idx:end_idx] += 1

    return (processed_data / hit_counts).astype(np.int16)

# Function to process all files in a folder
def process_folder(input_folder, output_folder, clipLength=1024, stride=512):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for file_name in os.listdir(input_folder):
        if file_name.endswith('.wav'):  # Process only .wav files
            input_path = os.path.join(input_folder, file_name)
            output_path = os.path.join(output_folder, f"{file_name}")

            print(f"Processing {input_path}...")
            dataRate, testClip = load_audio_file(input_path)

            # Convert stereo to mono if necessary
            testClip = stereo_to_mono(testClip)

            # Apply DNN-based denoising
            denoised_audio = sequentialPredict(testClip, clipLength, stride)

            # Save the denoised output
            wavfile.write(output_path, dataRate, denoised_audio)
            print(f"Saved denoised file to {output_path}")

if __name__ == "__main__":
    input_folder = './output/FFT/'  # Input folder
    output_folder = './output/FFT+DNN/'  # Output folder

    process_folder(input_folder, output_folder)


Processing ./output/FFT/denoised_combined_clnsp0.wav...
Saved denoised file to ./output/FFT+DNN/denoised_combined_clnsp0.wav
Processing ./output/FFT/denoised_combined_clnsp1.wav...
Saved denoised file to ./output/FFT+DNN/denoised_combined_clnsp1.wav
Processing ./output/FFT/denoised_combined_clnsp10.wav...
Saved denoised file to ./output/FFT+DNN/denoised_combined_clnsp10.wav
Processing ./output/FFT/denoised_combined_clnsp11.wav...


  return (processed_data / hit_counts).astype(np.int16)
  return (processed_data / hit_counts).astype(np.int16)


Saved denoised file to ./output/FFT+DNN/denoised_combined_clnsp11.wav
Processing ./output/FFT/denoised_combined_clnsp12.wav...
Saved denoised file to ./output/FFT+DNN/denoised_combined_clnsp12.wav
Processing ./output/FFT/denoised_combined_clnsp13.wav...
Saved denoised file to ./output/FFT+DNN/denoised_combined_clnsp13.wav
Processing ./output/FFT/denoised_combined_clnsp14.wav...
Saved denoised file to ./output/FFT+DNN/denoised_combined_clnsp14.wav
Processing ./output/FFT/denoised_combined_clnsp15.wav...
Saved denoised file to ./output/FFT+DNN/denoised_combined_clnsp15.wav
Processing ./output/FFT/denoised_combined_clnsp16.wav...
Saved denoised file to ./output/FFT+DNN/denoised_combined_clnsp16.wav
Processing ./output/FFT/denoised_combined_clnsp17.wav...
Saved denoised file to ./output/FFT+DNN/denoised_combined_clnsp17.wav
Processing ./output/FFT/denoised_combined_clnsp18.wav...
Saved denoised file to ./output/FFT+DNN/denoised_combined_clnsp18.wav
Processing ./output/FFT/denoised_combined

### Technique 3 (DNN only)

In [39]:
import os
import scipy.io.wavfile as wavfile
import numpy as np
import matplotlib.pyplot as plt

# Function to load audio file
def load_audio_file(file_path):
    rate, data = wavfile.read(file_path)  # Load the .wav file
    return rate, data

# Function to convert stereo audio to mono
def stereo_to_mono(stereo_audio):
    if len(stereo_audio.shape) == 2:  # Check if audio is stereo
        mono_audio = stereo_audio.mean(axis=1)  # Average the two channels
        return mono_audio.astype(np.int16)  # Convert back to int16 if necessary
    return stereo_audio  # If it's already mono, return it as is

# Function for sequential prediction (actual processing logic)
def sequentialPredict(data, subsequenceLength, stride):
    assert stride <= subsequenceLength, "Stride must be less than or equal to the subsequence length."
    batchSize = 32  # Example batch size
    total_batches = (len(data) - subsequenceLength) // stride + 1
    processed_data = np.zeros(len(data), dtype=np.float32)
    hit_counts = np.zeros(len(data), dtype=np.float32)

    for batch_idx in range(total_batches):
        start_idx = batch_idx * stride
        end_idx = start_idx + subsequenceLength
        if end_idx > len(data):
            end_idx = len(data)
            start_idx = end_idx - subsequenceLength

        subsequence = data[start_idx:end_idx]
        denoised_subsequence = subsequence * 0.95  # Example denoising factor
        processed_data[start_idx:end_idx] += denoised_subsequence
        hit_counts[start_idx:end_idx] += 1

    return (processed_data / hit_counts).astype(np.int16)

# Function to process all files in a folder
def process_folder(input_folder, output_folder, clipLength=1024, stride=512):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for file_name in os.listdir(input_folder):
        if file_name.endswith('.wav'):  # Process only .wav files
            input_path = os.path.join(input_folder, file_name)
            output_path = os.path.join(output_folder, f"denoised_{file_name}")

            print(f"Processing {input_path}...")
            dataRate, testClip = load_audio_file(input_path)

            # Convert stereo to mono if necessary
            testClip = stereo_to_mono(testClip)

            # Apply DNN-based denoising
            denoised_audio = sequentialPredict(testClip, clipLength, stride)

            # Save the denoised output
            wavfile.write(output_path, dataRate, denoised_audio)
            print(f"Saved denoised file to {output_path}")

if __name__ == "__main__":
    input_folder = './MS-SNSD-master/combined_test/'  # Input folder
    output_folder = './output/DNN/'  # Output folder

    process_folder(input_folder, output_folder)


Processing ./MS-SNSD-master/combined_test/combined_clnsp0.wav...
Saved denoised file to ./output/DNN/denoised_combined_clnsp0.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp1.wav...
Saved denoised file to ./output/DNN/denoised_combined_clnsp1.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp10.wav...
Saved denoised file to ./output/DNN/denoised_combined_clnsp10.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp11.wav...
Saved denoised file to ./output/DNN/denoised_combined_clnsp11.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp12.wav...
Saved denoised file to ./output/DNN/denoised_combined_clnsp12.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp13.wav...
Saved denoised file to ./output/DNN/denoised_combined_clnsp13.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp14.wav...
Saved denoised file to ./output/DNN/denoised_combined_clnsp14.wav
Processing ./MS-SNSD-master/combined_test/combined_clnsp15.wav...
Saved denoised

  return (processed_data / hit_counts).astype(np.int16)
  return (processed_data / hit_counts).astype(np.int16)


### Comparing the Techniques

In [35]:
!pip install pesq pystoi



In [2]:
import os
import numpy as np
import librosa
from pesq import pesq  # Ensure to install using `pip install pesq`
from pystoi import stoi  # Ensure to install using `pip install pystoi`
import matplotlib.pyplot as plt

# Function to load audio file
def load_audio_file(file_path, target_sr=16000):
    signal, sr = librosa.load(file_path, sr=target_sr)
    return signal, sr

# Function to calculate SNR
def calculate_snr(original_signal, denoised_signal):
    signal_power = np.sum(original_signal ** 2)
    noise_power = np.sum((original_signal - denoised_signal) ** 2)
    snr = 10 * np.log10(signal_power / noise_power)
    return snr

# Function to calculate MSE
def calculate_mse(original_signal, denoised_signal):
    return np.mean((original_signal - denoised_signal) ** 2)

# Function to compute and aggregate metrics for all files
def aggregate_metrics(input_folder, output_folder, target_sr=16000):
    methods = ["STFT", "FFT+DNN", "DNN", "FFT"]
    aggregated_metrics = {method: {"snr": [], "mse": [], "pesq": [], "stoi": []} for method in methods}

    for file_name in os.listdir(input_folder):
        if file_name.endswith('.wav'):  # Only process WAV files
            input_path = os.path.join(input_folder, file_name)
            original_signal, sr = load_audio_file(input_path, target_sr)

            for method in methods:
                denoised_path = os.path.join(output_folder, method, f"denoised_{file_name}")
                if os.path.exists(denoised_path):
                    denoised_signal, _ = load_audio_file(denoised_path, target_sr)

                    # Align lengths for metrics
                    min_length = min(len(original_signal), len(denoised_signal))
                    aligned_original = original_signal[:min_length]
                    aligned_denoised = denoised_signal[:min_length]

                    # Compute metrics
                    snr = calculate_snr(aligned_original, aligned_denoised)
                    mse = calculate_mse(aligned_original, aligned_denoised)
                    pesq_score = pesq(target_sr, aligned_original, aligned_denoised, 'wb')
                    stoi_score = stoi(aligned_original, aligned_denoised, target_sr)

                    # Append metrics
                    aggregated_metrics[method]["snr"].append(snr)
                    aggregated_metrics[method]["mse"].append(mse)
                    aggregated_metrics[method]["pesq"].append(pesq_score)
                    aggregated_metrics[method]["stoi"].append(stoi_score)

    # Average metrics across all files
    averaged_metrics = {
        method: {
            "snr": np.mean(values["snr"]),
            "mse": np.mean(values["mse"]),
            "pesq": np.mean(values["pesq"]),
            "stoi": np.mean(values["stoi"])
        }
        for method, values in aggregated_metrics.items()
    }

    return averaged_metrics

# Function to plot averaged metrics
def plot_averaged_metrics(averaged_metrics, output_folder):
    methods = list(averaged_metrics.keys())
    snr_values = [averaged_metrics[method]["snr"] for method in methods]
    mse_values = [averaged_metrics[method]["mse"] for method in methods]
    pesq_values = [averaged_metrics[method]["pesq"] for method in methods]
    stoi_values = [averaged_metrics[method]["stoi"] for method in methods]
    plt.figure(figsize=(16, 8))

    # SNR Comparison
    plt.subplot(2, 2, 1)
    plt.bar(methods, snr_values, color='blue')
    plt.title("Average SNR Comparison (dB)")
    plt.ylabel("SNR (dB)")

    # MSE Comparison
    plt.subplot(2, 2, 2)
    plt.bar(methods, mse_values, color='orange')
    plt.title("Average MSE Comparison")
    plt.ylabel("MSE")

    # PESQ Comparison
    plt.subplot(2, 2, 3)
    plt.bar(methods, pesq_values, color='green')
    plt.title("Average PESQ Comparison")
    plt.ylabel("PESQ Score")

    # STOI Comparison
    plt.subplot(2, 2, 4)
    plt.bar(methods, stoi_values, color='purple')
    plt.title("Average STOI Comparison")
    plt.ylabel("STOI Score")

    plt.tight_layout()
    plt.savefig(os.path.join(output_folder, "averaged_metrics_comparison.png"))
    plt.close()

if __name__ == "__main__":
    input_folder = "./MS-SNSD-master/combined_test/"  # Input folder for original files
    output_folder = "./output/"  # Output folder for denoised files
    comparison_output_folder = "./comparison_results/"  # Folder to save the results

    if not os.path.exists(comparison_output_folder):
        os.makedirs(comparison_output_folder)

    # Aggregate and average metrics
    averaged_metrics = aggregate_metrics(input_folder, output_folder)

    # Plot and save averaged metrics
    plot_averaged_metrics(averaged_metrics, comparison_output_folder)

    print(f"Averaged metrics and comparison graph saved in {comparison_output_folder}.")


Averaged metrics and comparison graph saved in ./comparison_results/.


In [1]:
import pandas as pd  # Ensure you have pandas installed: `pip install pandas`

# Function to compute and display file-wise metrics in a table
def display_filewise_metrics(input_folder, output_folder, target_sr=16000):
    methods = ["STFT", "FFT+DNN", "DNN"]
    metrics_table = []

    for file_name in os.listdir(input_folder):
        if file_name.endswith('.wav'):  # Only process WAV files
            input_path = os.path.join(input_folder, file_name)
            original_signal, sr = load_audio_file(input_path, target_sr)

            for method in methods:
                denoised_path = os.path.join(output_folder, method, f"denoised_{file_name}")
                if os.path.exists(denoised_path):
                    denoised_signal, _ = load_audio_file(denoised_path, target_sr)

                    # Align lengths for metrics
                    min_length = min(len(original_signal), len(denoised_signal))
                    aligned_original = original_signal[:min_length]
                    aligned_denoised = denoised_signal[:min_length]

                    # Compute metrics
                    snr = calculate_snr(aligned_original, aligned_denoised)
                    mse = calculate_mse(aligned_original, aligned_denoised)
                    pesq_score = pesq(target_sr, aligned_original, aligned_denoised, 'wb')
                    stoi_score = stoi(aligned_original, aligned_denoised, target_sr)

                    # Append results to the table
                    metrics_table.append({
                        "File Name": file_name,
                        "Method": method,
                        "SNR (dB)": snr,
                        "MSE": mse,
                        "PESQ": pesq_score,
                        "STOI": stoi_score
                    })

    # Convert to a pandas DataFrame
    metrics_df = pd.DataFrame(metrics_table)

    # Print the metrics table
    print("\nFile-wise Metrics:")
    print(metrics_df.to_string(index=False))

    return metrics_df

metrics_df = display_filewise_metrics(input_folder, output_folder)

NameError: name 'input_folder' is not defined