In [6]:
import numpy as np
import matplotlib.pyplot as plt
import librosa
import tensorflow as tf
import os
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import numpy as np
import numpy as np
import scipy.signal as sps
import soundfile as sf
import os
import math
import wave
from scipy.io import wavfile
from keras.models import load_model
from keras.models import Sequential

%matplotlib inline
physical_devices = tf.config.experimental.list_physical_devices('GPU')
print(physical_devices)
if physical_devices:
  tf.config.experimental.set_memory_growth(physical_devices[0], True)

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [7]:
def create_spectrogram(audio_file, image_file):
    fig = plt.figure()
    ax = fig.add_subplot(1, 1, 1)
    fig.subplots_adjust(left=0, right=1, bottom=0, top=1)

    y, sr = librosa.load(audio_file)
    ms = librosa.feature.melspectrogram(y=y, sr=sr, S=None, n_fft=1024, hop_length=80, win_length=320, window='hann', center=True, pad_mode='constant', power=2.0)
    #ms = librosa.feature.melspectrogram(y=y, sr=sr, S=None)
    log_ms = librosa.power_to_db(ms, ref=np.max)
    librosa.display.specshow(log_ms, sr=sr)

    fig.savefig(image_file)
    plt.close(fig)
    
def create_pngs_from_wavs(input_path, output_path):
    if not os.path.exists(output_path):
        os.makedirs(output_path)

    dir = os.listdir(input_path)

    for i, file in enumerate(dir):
        input_file = os.path.join(input_path, file)
        output_file = os.path.join(output_path, file.replace('.wav', '.png'))
        create_spectrogram(input_file, output_file)

In [9]:
def load_images_from_path(path, label, limit=1500):
    images = []
    labels = []

    file_list = os.listdir(path)
    if limit is not None:
        file_list = file_list[:limit]

    for file in file_list:
        images.append(img_to_array(load_img(os.path.join(path, file), target_size=(224, 224, 3))))
        labels.append(label)
        
    return images, labels

def show_images(images):
    fig, axes = plt.subplots(1, 8, figsize=(20, 20), subplot_kw={'xticks': [], 'yticks': []})

    for i, ax in enumerate(axes.flat):
        ax.imshow(images[i] / 255)

In [10]:
# Recreate the exact same model, including its weights and the optimizer
new_model = tf.keras.models.load_model('models/my_model.h5')

# Show the model architecture
new_model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 222, 222, 32)      896       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 111, 111, 32)     0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 109, 109, 128)     36992     
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 54, 54, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 52, 52, 128)       147584    
                                                                 
 max_pooling2d_2 (MaxPooling  (None, 26, 26, 128)      0

In [14]:
# All locations
folder_name = "data_test/"
entries = os.listdir(folder_name)
print(len(entries))

10


In [8]:
print(entries)

['speech_1.wav', 'speech_10.wav']


In [24]:
path = "Spectrograms_wide_VA"

path_spec_test = path + "/Spectral_subtract_test2/"

isExist = os.path.exists(path_spec_test)
if not isExist:
    # Create a new directory because it does not exist
    os.makedirs(path_spec_test)
    print("the %s directory is created!", path_spec_test)

noisy = ["-3dB", "-6dB", "-9dB"]
clean = ["0dB", "3dB", "6dB", "9dB", "clean"]
target_names = ["clean", "0dB", "-3dB", "-6dB", "-9dB", "3dB", "6dB", "9dB"]
user_target = ["0dB", "-3dB", "-6dB", "-9dB", "3dB", "6dB", "9dB"]

the %s directory is created! Spectrograms_wide_VA/Spectral_subtract_test2/


In [19]:
def hann_window_a_signal(Windowed_data):
    Hann_window = sps.windows.hann(len(Windowed_data))
    Hann_Windowed_data = Hann_window * Windowed_data
    padded_signal = np.pad(Hann_Windowed_data, (0, 512), 'constant')
    Windowed_data_fft = np.fft.fft(padded_signal, 1024)
    return Windowed_data_fft

def perform_gan_noise_subtraction(input_audio_file, output_audio_file, alpha, Noise_gan):
    N_fft = 1024

    samplerate, data = wavfile.read(input_audio_file)
    Bit_Check = wave.open(input_audio_file, 'rb')
    bit_depth = Bit_Check.getsampwidth() * 8
    data = data / (2 ** (bit_depth - 1))
    Overlaps = math.floor(len(data) / 128)
    audio_ss = np.zeros(len(data))

    for No_of_overlaps in range(Overlaps - 5):
        Rectangular_windowed_signal = data[0 + 128 * No_of_overlaps:512 + 128 * No_of_overlaps]
        Estimated_noise_PSD = np.zeros(N_fft)
        GAN_noise_estimate = np.zeros(N_fft)
        FFT_of_windowed_signal = hann_window_a_signal(Rectangular_windowed_signal)

        Hann_window = sps.windows.hann(len(Rectangular_windowed_signal))
        PSD_window_scaling = np.sum(Hann_window ** 2)
        PSD_of_windowed_signal = (np.abs(FFT_of_windowed_signal) ** 2) / (samplerate * PSD_window_scaling)

        Tensor_PSD = tf.convert_to_tensor(PSD_of_windowed_signal.reshape(1, 1024), tf.float32)
        Generated_codebook = Noise_gan(Tensor_PSD)
        Generated_codebook = Generated_codebook.numpy()
        Generated_codebook_reshaped = np.abs((Generated_codebook.reshape(1024, 9)))

        Generated_codebook_inverse = np.linalg.pinv(Generated_codebook_reshaped, rcond=1e-15)
        Generated_coeffs = Generated_codebook_inverse * PSD_of_windowed_signal
        Generated_coeffs = np.transpose(Generated_coeffs)
        GAN_noise_codebook = (Generated_coeffs * Generated_codebook_reshaped)
        GAN_noise_codebook = GAN_noise_codebook.clip(min=0)

        for Freq_bin in range(0, N_fft):
            GAN_noise_estimate[Freq_bin] = np.sum(GAN_noise_codebook[Freq_bin, :])

        GAN_noise_estimate[512:1024] = np.flip(GAN_noise_estimate[0:512])
        scalar_factor_noise = sum([psd_value for psd_value in GAN_noise_estimate])
        GAN_noise_estimate_normalised = GAN_noise_estimate / scalar_factor_noise

        snr = PSD_of_windowed_signal / (100 * GAN_noise_estimate)
        Spectral_mask = 1 - np.minimum(1, np.maximum(0, alpha * snr))
        Clean_signal = FFT_of_windowed_signal * Spectral_mask
        Clean_frames = np.fft.ifft(Clean_signal)

        audio_ss[0 + 128 * No_of_overlaps:512 + 128 * No_of_overlaps] = audio_ss[
                                                                           0 + 128 * No_of_overlaps:512 + 128 * No_of_overlaps] + Clean_frames[0:512]

    # Save the processed audio
    sf.write(output_audio_file, audio_ss, samplerate, 'PCM_16')

In [25]:
# Load the noise GAN model
Noise_gan = tf.saved_model.load('../Spectogram/Noise_PSD_Generator_epoch_73')

for ent in entries:
    edit_ent = ent.replace(".wav", "")
    os.makedirs(path_spec_test + "{}/".format(edit_ent) + "spectrogram/", exist_ok=True)
    os.makedirs(path_spec_test + "{}/".format(edit_ent) + "wav/", exist_ok=True)
    new_ent_img = path_spec_test + "{}/".format(edit_ent) + "spectrogram/"
    new_ent_wav = path_spec_test + "{}/".format(edit_ent) + "wav/"
    create_spectrogram(folder_name + ent, new_ent_img + 'Default.png')
    current_image = new_ent_img + 'Default.png'
    current_wav = folder_name + ent
    audio_clean = False
    stop_value = 2
    start_value = 0


    while(audio_clean == False and start_value < stop_value):
        # Preprocess the image
        image = img_to_array(load_img(current_image, target_size=(224, 224, 3)))
        image = image / 255.0
        image = np.expand_dims(image, axis=0)

        # Make predictions
        predictions = new_model.predict(image)
        # Find the index of the maximum prediction
        predicted_index = np.argmax(predictions)
        # Check if the predicted class matches the target
        predicted_class = target_names[predicted_index]

        if predicted_class in clean:
            audio_clean = True
        else:
            start_value += 0.05
            start_value = round(start_value, 2)
            print("{}: Alpha value: {} Class: {}".format(edit_ent, "{:.2f}".format(start_value), predicted_class), end="\r")
            output_audio = new_ent_wav + 'Alpha{}.wav'.format("{:.2f}".format(start_value))
            perform_gan_noise_subtraction(current_wav, output_audio, start_value, Noise_gan)
            create_spectrogram(output_audio, new_ent_img + 'Alpha{}.png'.format("{:.2f}".format(start_value)))
            current_image = new_ent_img + 'Alpha{}.png'.format("{:.2f}".format(start_value))

    print("{}: Alpha value: {} Class: {}".format(edit_ent, "{:.2f}".format(start_value), predicted_class))

speech_1: Alpha value: 0.05 Class: -9dB

UFuncTypeError: ufunc 'multiply' did not contain a loop with signature matching types (dtype('<U4'), dtype('float64')) -> None

Extra

In [None]:
Noise_gan = tf.saved_model.load('../Spectogram/Noise_PSD_Generator_epoch_73')

N_fft = 1024

def Hann_window_a_signal(Windowed_data):
 Hann_window = sps.windows.hann(len(Windowed_data))
 Hann_Windowed_data = Hann_window*Windowed_data
 padded_signal = np.pad(Hann_Windowed_data,(0,512), 'constant')

 Windowed_data_fft = np.fft.fft(padded_signal,1024)
 return Windowed_data_fft

# Define parameters and variables
No_of_data_to_filter = 1
num_samples = 1024
fstep = 16000/1024
f = np.linspace(0, (num_samples-1)*fstep, num_samples)

targets = [3, 6, 9, 0, -3, -6, -9]

for folder in range (0,len(targets)):
    directory = str(targets[folder])+'dB'
    path = os.path.join('../Spectogram/Filter_outputs/Spectral_subtract',directory) 
    os.makedirs(path, exist_ok=True)


for target_db in range(0, len(targets)):
    Path_of_noisy_mixture = '../Spectogram/SNR_seg/' + str(targets[target_db]) + 'dB' + '/'
    directories = os.listdir('../Spectogram/SNR_seg/-6dB')

    for No_of_data in range(0, No_of_data_to_filter):
        samplerate, data = wavfile.read(Path_of_noisy_mixture + directories[No_of_data])
        Bit_Check = wave.open(Path_of_noisy_mixture + directories[No_of_data], 'rb')
        bit_depth = Bit_Check.getsampwidth() * 8
        data = data / (2**(bit_depth-1))
        Overlaps = math.floor(len(data) / 128)
        audio_ss = np.zeros(len(data))

        for No_of_overlaps in range(Overlaps - 5):
            Rectangular_windowed_signal = data[0 + 128*No_of_overlaps:512 + 128*No_of_overlaps]
            Estimated_noise_PSD = np.zeros(N_fft)
            GAN_noise_estimate = np.zeros(N_fft)
            FFT_of_windowed_signal = Hann_window_a_signal(Rectangular_windowed_signal)

            Hann_window = sps.windows.hann(len(Rectangular_windowed_signal))
            PSD_window_scaling = np.sum(Hann_window**2)
            PSD_of_windowed_signal = (np.abs(FFT_of_windowed_signal)**2)/(samplerate*PSD_window_scaling)

            Tensor_PSD = tf.convert_to_tensor(PSD_of_windowed_signal.reshape(1,1024), tf.float32)
            Generated_codebook = Noise_gan(Tensor_PSD)
            Generated_codebook = Generated_codebook.numpy()
            Generated_codebook_reshaped = np.abs((Generated_codebook.reshape(1024,9)))

            Generated_codebook_inverse = np.linalg.pinv(Generated_codebook_reshaped, rcond=1e-15)
            Generated_coeffs = Generated_codebook_inverse*PSD_of_windowed_signal
            Generated_coeffs = np.transpose(Generated_coeffs)
            GAN_noise_codebook = (Generated_coeffs*Generated_codebook_reshaped)
            GAN_noise_codebook = GAN_noise_codebook.clip(min=0)

            
            for Freq_bin in range (0,N_fft):
                GAN_noise_estimate[Freq_bin]=np.sum(GAN_noise_codebook[Freq_bin,:])
                # Spectral Subtraction (You can modify alpha as needed)

            GAN_noise_estimate[512:1024]=np.flip(GAN_noise_estimate[0:512])
            scalar_factor_noise = sum([psd_value for psd_value in GAN_noise_estimate])
            GAN_noise_estimate_normalised = GAN_noise_estimate/scalar_factor_noise

            alpha = 1
            snr = PSD_of_windowed_signal / (100 * GAN_noise_estimate)
            Spectral_mask = 1 - np.minimum(1, np.maximum(0, alpha * snr))
            Clean_signal = FFT_of_windowed_signal * Spectral_mask
            Clean_frames = np.fft.ifft(Clean_signal)
            
            audio_ss[0 + 128*No_of_overlaps:512 + 128*No_of_overlaps] = audio_ss[0+128*No_of_overlaps:512+128*No_of_overlaps]+ Clean_frames[0:512]
        
        # Save the processed audio
        sf.write('../Spectogram/Filter_outputs/Spectral_subtract/' + str(targets[target_db]) + 'dB/' + directories[No_of_data], audio_ss, 16000, 'PCM_16')