In [168]:
from typing import Type 
import librosa
import numpy as np
import sys
import matplotlib.pyplot as plt
import soundfile as sf
import IPython.display as ipd
import glob
import os


In [169]:
def helper_fun(snr):
    alpha = 0
    if -5.0 <= snr <= 20:
        # 4 is alpha_0 in the paper, 3/20 is 1/s(value is given in the paper)
        # from paper, using varied alpha value, audio distortion can be reduced
        alpha = 4 - snr * 3 / 20
    # when snr less than -5, we use 5(5 is given by paper) 
    elif snr < -5.0:
        alpha = 5
    # when snr greater than 20, we use 1(from paper, the greater the snr, the larger the value, and when snr is grater than 2, we use 1)
    elif snr > 20:
        alpha = 1
 
    return alpha

def calculate_overlap_sample_num(length_of_frame, overlap_percent):
    # The number of overlapping samples in each audio frame was calculated. 
    # This was obtained by multiplying the frame length length_of_frame by the overlap percentage overlap_percent and dividing by 100.
    # we set the window overlap
    return int(np.floor(length_of_frame * overlap_percent / 100))

def moving_average(x, N=5):
    return np.convolve(x, np.ones((N,))/N, mode='same')

def calculate_average_noise_power(win, length_of_frame, noise_mean, NFFT, x):
    # The loop assumes that the first 5 frames of the audio are noisy (or silent) and 
    # uses these frames to calculate the average of the noise power spectrum.
    
    # Noise_estimated was obtained by transforming each window-functionalised frame to the 
    # frequency domain via FFT and accumulating their amplitudes, then averaging the results of these 5 frames.

    # estimation of noise is obtained by two steps:
    # (1)-average the noise spectra from several frames of "silence". 
    j = 0
    for k in range(0, 5):
        temp = np.multiply(win, x[j:j+length_of_frame])
        noise_mean = np.add(noise_mean, np.abs(np.fft.fft(temp, NFFT)))
        j = j + length_of_frame
    noise_estimated = noise_mean / 5
   
    # (2)-using moving averages as a frequency smoothing method
    noise_estimated = moving_average(noise_estimated, N=5)
    return noise_estimated

In [170]:
def power_spectral_subtraction(x, sr): 
    # for example，if sample rate is 8000HZ, then audio frame with should have 240 samples 30ms (30 * 8000 / 1000)
    # From paper, the value 30 here is the frame size, if the frame is shorter than 20, it results in roughness
    # and increase the frame size can decrease the musical noise
    length_of_frame = int(np.floor(30 * sr / 1000))
    # In order to use the Fast Fourier Transform (FFT), we want the length of the frame to be a power of 2, ensuring that the frame length is even.
    # If the frame length is not even, then adjust it by adding 1.
    if np.remainder(length_of_frame, 2) == 1:
        length_of_frame = length_of_frame + 1
 
    # This is window overlap in percent of frame size.
    # Sets the percentage of overlap between audio frames. 
    # Setting it here to 50% means that each audio frame will overlap the previous frame by half its length.
    overlap_per = 50
    len1 = calculate_overlap_sample_num(length_of_frame, overlap_per)
    len2 = int(length_of_frame - len1)    
 
    # VAD(Voice Activity Detection) sets the VAD's signal-to-noise ratio (SNR) threshold in decibels (dB). 
    # If the SNR of a frame falls below this threshold, it may be seen as containing more noise and less speech content.
    # It is used to recognise speech segments and non-speech segments (e.g. mute or background noise) in an audio signal.
    VAD = 6
    
    # power exponent
    # gamma is a parameter used in the power spectrum subtraction method to adjust the noise power.
    # In the denoising process, both the power spectrum of the signal and the power spectrum of the noise are 
    # raised to the power specified by gamma, and then the subtraction operation is performed.
    gamma = 2.0
    # A noise beta is set, which is a threshold used in power spectrum subtraction to prevent the power spectrum of 
    # a signal from being reduced below 0 to avoid completely eliminating certain parts of the signal, which may cause distortion of the speech signal.
    # We set the initial value of beta
    beta = 0.004
    # weight_coefficient is a smoothing parameter for updating the noise power spectrum when speech activity detection determines that the current frame is noisy.
    weight_coefficient = 0.9
 
    # define window
    # The win is a Hanning window defined by the length_of_frame, 
    # which is used to reduce the discontinuities at the junctions between frames and to reduce the spectral leakage.
    win = np.hanning(length_of_frame+1)[0:length_of_frame]
    # Normalisation here is similar to normalising audio after it has been reverberated or noised, so that the processed loudness is the same as the previous loudness, 
    # except that in this case the energy is kept constant.
    # G = len2 / np.sum(win)  # normalization gain for overlap+add with 50% overlap
    
    # The NFFT is the number of points of the Fourier transform, usually taken to be the smallest power of 2 greater than or equal to the frame length. 
    # This optimises the efficiency of the FFT algorithm
    NFFT = length_of_frame
    
    # noise_mean is an array that stores the mean value of the noise power spectrum.
    noise_mean = np.zeros(NFFT)
    # We need to calculate the estimated noise power first, and then dynamically update the estimated noise power frame by frame
    noise_estimated = calculate_average_noise_power(win, length_of_frame, noise_mean, NFFT, x)
 
    # k is used to track the location of the currently processed audio
    k = 0
    # np.sqrt(-1
    imaginary = 1j  
    # x_old is an array that stores the second half of the data from the previous frame, 
    # which will overlap with the first half of the next frame
    x_old = np.zeros(len1)
    # Nframes calculates how many frames the audio file will be divided into
    num_of_frames = int(np.floor(len(x)/len2)) - 1

    # xfinal is an array that stores the final processed audio signal.
    xfinal = np.zeros(num_of_frames * len2)
 
    # process frame by frame
    for n in range(0, num_of_frames):
        insign = np.multiply(win, x[k:k+length_of_frame])
        # spec performs a Fast Fourier Transform (FFT) for the current frame to obtain the spectrum.
        spec = np.fft.fft(insign, NFFT)
        # sig is the magnitude (amplitude) of the acquired spectrum.
        sig = np.abs(spec)  # compute the magnitude
 
        # save the noisy phase information
        theta = np.angle(spec)

        # SNR Calculates the signal-to-noise ratio of the current frame. 
        # The SNR is obtained by dividing the signal energy by the noise energy, reflecting the signal strength relative to the noise.
        # The higher the SNR, the weaker the estimated noise, and vice versa.
        SNR = 10*np.log10(np.linalg.norm(sig, 2)**2 / np.linalg.norm(noise_estimated, 2)**2)
        # From papaer suggestion, if SNR is in range of -5 to 0, beta should in the range 0.02 to 0.06
        if -5 <= SNR < 0:
            beta = 0.03
        # From papaer suggestion, if SNR is in range of 0 to 5, beta should in the range 0.005 to 0.02
        if 0 <= SNR <= 5:
            beta = 0.01
        # Select the best alpha value according to the SNR of each frame to reduce speech distortion.
        # And the alpha is calculated dynamically on a frame-by-frame basis.
        alpha = helper_fun(SNR)

        # D performs power spectrum subtraction based on signal amplitude, noise amplitude, 
        # denoising factor alpha and exponential gamma(according to the paper).  
        # sig ** gamma is Ps(w)**gamma in the paper, noise_estimated ** gamma is Pn(w)**gamma in the paper
        Ps = sig ** gamma
        Pn = noise_estimated ** gamma
        D = Ps - alpha * Pn
        
        # find indices of all non-negative values of D
        non_negative_indices = D >= 0

        # Perform the 1/gamma power operation on non-negative values
        D[non_negative_indices] = D[non_negative_indices] ** (1/gamma)

        # calculate the threshold value
        threshold = beta * Pn

        # calculate P's(w) based on β*Pn(w)
        D = np.maximum(D, threshold)

        # dynamically update the estimate noise
        noise_estimated = dynamically_calculate_noise_estimated(SNR, VAD, Ps, Pn, gamma, weight_coefficient, noise_estimated)
        
        # e^(i*theta) = cos(theta) + i*sin(theta) is Euler's formula, express e^(j(theta)) in the paper
        # Perform inverse Fourier transform on both P's(w) and original phase to obtain enhanced speech signal
        x_phase = (np.sqrt(D)) * (np.cos(theta) + imaginary * (np.sin(theta)))
        xi = np.real(np.fft.ifft(x_phase))
    
        # Overlap and add
        xfinal[k:k+len2] = x_old + xi[0:len1]
        x_old = xi[len1:length_of_frame]
        k = k + len2
 
    # write output
    result = xfinal
    return result

In [171]:
def dynamically_calculate_noise_estimated(SNR, VAD, Ps, Pn, gamma, weight_coefficient, noise_estimated):
    # If the signal-to-noise ratio of the current frame is less than the VAD threshold VAD, 
    # the noise power spectrum is updated using the coefficient weight_coefficient.
    # if SNR is less than VAD, this indicates that the current signal segment consists mainly of noise
    # When the SNR is below the VAD threshold, this indicates that the signal consists mainly of noise rather than speech. 
    # We need to adjust its noise estimate to ensure that the noise reduction algorithm is adjusted to the latest noise characteristics. 
    # In this case, re-estimating the noise level can more accurately distinguish between noise and speech components in the subsequent signal. 
    # By updating the noise model, the noise can be reduced more effectively, thus improving the audibility and quality of speech,
    # and thus better adapting to the actual noise level in the signal.
    if SNR < VAD:   
        # Update noise spectrum
        # We use the Power Transformation method to re-estimate the noise, since Pn = noise_estimated**gamma, Ps = sig**gamma, gamma = 2, 
        # so this is squaring the signal and amplifying the difference between the signal and the noise, 
        # making the noise component of the signal more prominent. 
        # We weighted the Ps and Pn using the weighting factor, which was 0.9. 
        # This means we put most of the weight on the old noise estimate, which allows for a smoother transition. 
        # It also means we retain the old noise estimate while slowly introducing the new signal information.
        noise_temp = weight_coefficient * Pn + (1 - weight_coefficient) * Ps

           
        # After weighted averaging, noise_temp needs to be inversely power transformed (because gamma = 2, so here we need to do the square root for noise_temp), 
        # because the previous signal and noise estimates were at the original energy level, 
        # in order to make the updated noise estimates correctly represent the characteristics of the original signal, 
        # we need to convert it back to the original energy level, so we need to square root of it.
        noise_estimated = noise_temp ** (1 / gamma)  # new noise spectrum
    return noise_estimated

## These are audio examples that showing differences after applying noise reduction algorithm:

**For noise Boeing**

*---------Before noise reduction*

In [212]:
filename1 = 'R_01_COMPSPKR_FA_Noised.wav'
x1, sr1 = librosa.load(filename1, sr=8000, mono=True)
ipd.display(ipd.Audio(x1, rate=sr1))


*---------After noise reduction*

In [213]:
filename1 = 'R_01_COMPSPKR_FA_Noised.wav'
# outputfile = 'output.wav'
x1, sr1 = librosa.load(filename1, sr=8000, mono=True)
result1 = power_spectral_subtraction(x1, sr1)
ipd.display(ipd.Audio(result1, rate=sr1))

# process_rev_audio(filename, outputfile)
# rh_spec_sub1(filename, outputfile)

**For noise Machines**

*---------Before noise reduction*

In [214]:
filename1 = 'R_01_NOISE_FA_Noised.wav'
x1, sr1 = librosa.load(filename1, sr=8000, mono=True)
ipd.display(ipd.Audio(x1, rate=sr1))


*---------After noise reduction*

In [220]:
filename1 = 'R_01_NOISE_FA_Noised.wav'
# outputfile = 'output.wav'
x1, sr1 = librosa.load(filename1, sr=8000, mono=True)
result1 = power_spectral_subtraction(x1, sr1)
ipd.display(ipd.Audio(result1, rate=sr1))

# process_rev_audio(filename, outputfile)
# rh_spec_sub1(filename, outputfile)

**For noise Crowded**

*---------Before noise reduction*

In [216]:
filename1 = 'R_01_CLIP_FA_Noised.wav'
x1, sr1 = librosa.load(filename1, sr=8000, mono=True)
ipd.display(ipd.Audio(x1, rate=sr1))


*---------After noise reduction*

In [217]:
filename1 = 'R_01_CLIP_FA_Noised.wav'
# outputfile = 'output.wav'
x1, sr1 = librosa.load(filename1, sr=8000, mono=True)
result1 = power_spectral_subtraction(x1, sr1)
ipd.display(ipd.Audio(
    result1, rate=sr1))
# process_rev_audio(filename, outputfile)
# rh_spec_sub1(filename, outputfile)

**For noise Sports**

*---------Before noise reduction*

In [218]:
filename1 = 'R_01_ECHO_FA_Noised.wav'
x1, sr1 = librosa.load(filename1, sr=8000, mono=True)
ipd.display(ipd.Audio(x1, rate=sr1))


*---------After noise reduction*

In [219]:
filename1 = 'R_01_ECHO_FA_Noised.wav'
# outputfile = 'output.wav'
x1, sr1 = librosa.load(filename1, sr=8000, mono=True)
result1 = power_spectral_subtraction(x1, sr1)
ipd.display(ipd.Audio(result1, rate=sr1))

# process_rev_audio(filename, outputfile)
# rh_spec_sub1(filename, outputfile)

In [172]:
def process_rev_audio(rev_file, output):
    x, sr = librosa.load(rev_file, sr=8000, mono=True)
    result = power_spectral_subtraction(x, sr)
    # print('sr = ', sr)
    # ipd.display(ipd.Audio(result, rate=sr))

    base, ext = os.path.splitext(os.path.basename(rev_file))
    output_filename = f"{base}_enhanced{ext}"
    full_output_path = os.path.join(output, output_filename)


    
    sf.write(full_output_path, result, samplerate=sr)
    # sf.write(output, result, sr, 'PCM_16')

In [173]:
def process_folder(folder_path, output_dir):
    # Search for all .wav files in the folder
    wav_files = glob.glob(os.path.join(folder_path, '*.wav'))
    for wav_file in wav_files:
        # print(f"Processing {wav_file}...")?
        process_rev_audio(wav_file, output_dir)

In [174]:
# ==================================== For echo ====================================
input_folder = './Noised_Audio/echo_sports'  # Folder with original .wav files
output_folder = './denoised_Audio/echo_sports'  # Folder to save processed .wav files
process_folder(input_folder, output_folder)

In [175]:
input_folder = './Noised_Audio/echo_boeing'  # Folder with original .wav files
output_folder = './denoised_Audio/echo_boeing'  # Folder to save processed .wav files
process_folder(input_folder, output_folder)

In [176]:
input_folder = './Noised_Audio/echo_crowded'  # Folder with original .wav files
output_folder = './denoised_Audio/echo_crowded'  # Folder to save processed .wav files
process_folder(input_folder, output_folder)

In [177]:
input_folder = './Noised_Audio/echo_machines'  # Folder with original .wav files
output_folder = './denoised_Audio/echo_machines'  # Folder to save processed .wav files
process_folder(input_folder, output_folder)

In [178]:
# ==================================== For compspkr ====================================
input_folder = './Noised_Audio/compspkr_sports'  # Folder with original .wav files
output_folder = './denoised_Audio/compspkr_sports'  # Folder to save processed .wav files
process_folder(input_folder, output_folder)

In [179]:
input_folder = './Noised_Audio/compspkr_boeing'  # Folder with original .wav files
output_folder = './denoised_Audio/compspkr_boeing'  # Folder to save processed .wav files
process_folder(input_folder, output_folder)

In [180]:
input_folder = './Noised_Audio/compspkr_crowded'  # Folder with original .wav files
output_folder = './denoised_Audio/compspkr_crowded'  # Folder to save processed .wav files
process_folder(input_folder, output_folder)

In [181]:
input_folder = './Noised_Audio/compspkr_machines'  # Folder with original .wav files
output_folder = './denoised_Audio/compspkr_machines'  # Folder to save processed .wav files
process_folder(input_folder, output_folder)

In [182]:
# ==================================== For clip ====================================
input_folder = './Noised_Audio/clip_sports'  # Folder with original .wav files
output_folder = './denoised_Audio/clip_sports'  # Folder to save processed .wav files
process_folder(input_folder, output_folder)

In [183]:
input_folder = './Noised_Audio/clip_boeing'  # Folder with original .wav files
output_folder = './denoised_Audio/clip_boeing'  # Folder to save processed .wav files
process_folder(input_folder, output_folder)

In [184]:
input_folder = './Noised_Audio/clip_crowded'  # Folder with original .wav files
output_folder = './denoised_Audio/clip_crowded'  # Folder to save processed .wav files
process_folder(input_folder, output_folder)

In [185]:
input_folder = './Noised_Audio/clip_machines'  # Folder with original .wav files
output_folder = './denoised_Audio/clip_machines'  # Folder to save processed .wav files
process_folder(input_folder, output_folder)

In [186]:
# ==================================== For noise ====================================
input_folder = './Noised_Audio/noise_sports'  # Folder with original .wav files
output_folder = './denoised_Audio/noise_sports'  # Folder to save processed .wav files
process_folder(input_folder, output_folder)

In [187]:
input_folder = './Noised_Audio/noise_boeing'  # Folder with original .wav files
output_folder = './denoised_Audio/noise_boeing'  # Folder to save processed .wav files
process_folder(input_folder, output_folder)

In [188]:
input_folder = './Noised_Audio/noise_crowded'  # Folder with original .wav files
output_folder = './denoised_Audio/noise_crowded'  # Folder to save processed .wav files
process_folder(input_folder, output_folder)

In [189]:
input_folder = './Noised_Audio/noise_machines'  # Folder with original .wav files
output_folder = './denoised_Audio/noise_machines'  # Folder to save processed .wav files
process_folder(input_folder, output_folder)