In [27]:
import keras
import keras.backend as K
from keras import layers, models, optimizers
import tensorflow as tf

In [28]:
def unet(input_size = (128,128,1), verbose = False):
    #size filter input
    size_filter_in = 16
    #normal initialization of weights
    kernel_init = 'he_normal'
    #To apply leaky relu after the conv layer 
    activation_layer = None
    inputs = layers.Input(input_size)
    conv1 = layers.Conv2D(size_filter_in, 
                          3, 
                          activation = activation_layer, 
                          padding = 'same', 
                          kernel_initializer = kernel_init)(inputs)
    conv1 = layers.LeakyReLU()(conv1)
    conv1 = layers.Conv2D(size_filter_in, 
                          3, 
                          activation = activation_layer, 
                          padding = 'same',
                          kernel_initializer = kernel_init)(conv1)
    conv1 = layers.LeakyReLU()(conv1)
    pool1 = layers.MaxPooling2D(pool_size=(2, 2))(conv1)
    conv2 = layers.Conv2D(size_filter_in * 2, 
                          3, 
                          activation = activation_layer,
                          padding = 'same', 
                          kernel_initializer = kernel_init)(pool1)
    conv2 = layers.LeakyReLU()(conv2)
    conv2 = layers.Conv2D(size_filter_in * 2, 
                          3, 
                          activation = activation_layer, 
                          padding = 'same',
                          kernel_initializer = kernel_init)(conv2)
    conv2 = layers.LeakyReLU()(conv2)
    pool2 = layers.MaxPooling2D(pool_size=(2, 2))(conv2)
    conv3 = layers.Conv2D(size_filter_in * 4, 
                          3, 
                          activation = activation_layer,
                          padding = 'same',
                          kernel_initializer = kernel_init)(pool2)
    conv3 = layers.LeakyReLU()(conv3)
    conv3 = layers.Conv2D(size_filter_in * 4, 
                          3,
                          activation = activation_layer, 
                          padding = 'same', 
                          kernel_initializer = kernel_init)(conv3)
    conv3 = layers.LeakyReLU()(conv3)
    pool3 = layers.MaxPooling2D(pool_size=(2, 2))(conv3)
    conv4 = layers.Conv2D(size_filter_in * 8, 
                          3, 
                          activation = activation_layer,
                          padding = 'same', 
                          kernel_initializer = kernel_init)(pool3)
    conv4 = layers.LeakyReLU()(conv4)
    conv4 = layers.Conv2D(size_filter_in * 8,
                          3,
                          activation = activation_layer,
                          padding = 'same',
                          kernel_initializer = kernel_init)(conv4)
    conv4 = layers.LeakyReLU()(conv4)
    drop4 = layers.Dropout(0.5)(conv4)
    pool4 = layers.MaxPooling2D(pool_size=(2, 2))(drop4)

    conv5 = layers.Conv2D(size_filter_in * 16,
                          3, activation = activation_layer,
                          padding = 'same',
                          kernel_initializer = kernel_init)(pool4)
    conv5 = layers.LeakyReLU()(conv5)
    conv5 = layers.Conv2D(size_filter_in * 16, 
                          3,
                          activation = activation_layer,
                          padding = 'same',
                          kernel_initializer = kernel_init)(conv5)
    conv5 = layers.LeakyReLU()(conv5)
    drop5 = layers.Dropout(0.5)(conv5)

    up6 = layers.Conv2D(size_filter_in * 8,
                        2,
                        activation = activation_layer,
                        padding = 'same',
                        kernel_initializer = kernel_init)(layers.UpSampling2D(size = (2,2))(drop5))
    up6 = layers.LeakyReLU()(up6)

    merge6 = layers.Concatenate(axis = 3)([drop4, up6])
    conv6 = layers.Conv2D(size_filter_in * 8,
                          3,
                          activation = activation_layer, 
                          padding = 'same',
                          kernel_initializer = kernel_init)(merge6)
    conv6 = layers.LeakyReLU()(conv6)
    conv6 = layers.Conv2D(size_filter_in * 8, 
                          3, 
                          activation = activation_layer,
                          padding = 'same', 
                          kernel_initializer = kernel_init)(conv6)
    conv6 = layers.LeakyReLU()(conv6)
    up7 = layers.Conv2D(size_filter_in * 4,
                        2,
                        activation = activation_layer,
                        padding = 'same',
                        kernel_initializer = kernel_init)(layers.UpSampling2D(size = (2,2))(conv6))
    up7 = layers.LeakyReLU()(up7)

    merge7 = layers.Concatenate(axis = 3)([conv3, up7])
    conv7 = layers.Conv2D(size_filter_in * 4, 
                          3, 
                          activation = activation_layer, 
                          padding = 'same', 
                          kernel_initializer = kernel_init)(merge7)
    conv7 = layers.LeakyReLU()(conv7)
    conv7 = layers.Conv2D(size_filter_in * 4, 
                          3,
                          activation = activation_layer,
                          padding = 'same',
                          kernel_initializer = kernel_init)(conv7)
    conv7 = layers.LeakyReLU()(conv7)
    up8 = layers.Conv2D(size_filter_in * 2,
                        2,
                        activation = activation_layer, 
                        padding = 'same',
                        kernel_initializer = kernel_init)(layers.UpSampling2D(size = (2,2))(conv7))
    up8 = layers.LeakyReLU()(up8)

    merge8 = layers.Concatenate(axis = 3)([conv2, up8])
    conv8 = layers.Conv2D(size_filter_in * 2,
                          3,
                          activation = activation_layer,
                          padding = 'same',
                          kernel_initializer = kernel_init)(merge8)
    conv8 = layers.LeakyReLU()(conv8)
    conv8 = layers.Conv2D(size_filter_in * 2,
                          3,
                          activation = activation_layer, 
                          padding = 'same',
                          kernel_initializer = kernel_init)(conv8)
    conv8 = layers.LeakyReLU()(conv8)

    up9 = layers.Conv2D(size_filter_in,
                        2,
                        activation = activation_layer,
                        padding = 'same',
                        kernel_initializer = kernel_init)(layers.UpSampling2D(size = (2,2))(conv8))
    up9 = layers.LeakyReLU()(up9)

    merge9 = layers.Concatenate(axis = 3)([conv1, up9])
    conv9 = layers.Conv2D(size_filter_in,
                          3, 
                          activation = activation_layer,
                          padding = 'same',
                          kernel_initializer = kernel_init)(merge9)
    conv9 = layers.LeakyReLU()(conv9)
    conv9 = layers.Conv2D(size_filter_in,
                          3,
                          activation = activation_layer, 
                          padding = 'same',
                          kernel_initializer = kernel_init)(conv9)
    conv9 = layers.LeakyReLU()(conv9)
    conv9 = layers.Conv2D(2, 
                          3,
                          activation = activation_layer,
                          padding = 'same',
                          kernel_initializer = kernel_init)(conv9)
    conv9 = layers.LeakyReLU()(conv9)
    conv10 = layers.Conv2D(1, 1, activation = 'tanh')(conv9)

    model = models.Model(inputs,conv10)

    if verbose:
        model.summary()
    return model

model = unet(verbose = False)

In [29]:
import os
import numpy as np
import librosa
import random
from keras.utils.data_utils import Sequence

In [30]:
n_fft = 255
hop_length_fft = 64
dim_square_spec = int(n_fft / 2) + 1

In [55]:
class data_generator(Sequence):
    def __init__(self,
                clean_audio_path,
                noise_audio_path,
                batch_size = 5,
                file_size = 100,
                wave_size = 8192):
        self.clean_audio_path = clean_audio_path
        self.noise_audio_path = noise_audio_path
        self.batch_size = batch_size
        self.file_size = file_size
        self.wave_size = wave_size
        
        self.n_fft = 255
        self.hop_length = 64
        self.dim_square_spec = int(self.n_fft / 2) + 1
        
        self.clean_file_list = self._load_audio_list(self.clean_audio_path)
        self.noise_file_list = self._load_audio_list(self.noise_audio_path)
        
        if isinstance(self.file_size, int):
            self.clean_file_list = self.clean_file_list[:self.file_size]
            self.noise_file_list = self.noise_file_list[:self.file_size]
        elif isinstance(self.file_size, float):
            n = int(self.file_size * len(self.clean_file_list))
            self.clean_file_list = self.clean_file_list[:n]
            self.noise_file_list = self.noise_file_list[:n]
        
    def _load_audio_list(self, path):
        assert os.path.exists(path), f"{path} not exists."
        return [os.path.join(path, file) for file in os.listdir(path) if file != ".DS_Store"]
    
    def __len__(self):
        return len(self.clean_file_list) // self.batch_size
    
    def __getitem__(self, index):
        if self.batch_size == 1:
            x, y = self._batch_1(index)
            return np.expand_dims(x, 0), np.expand_dims(y, 0)
        else:
            beg = index * self.batch_size
            end = (index + 1) * self.batch_size
            return self._batch_n(beg, end)
    
    def _batch_1(self, index):
        x, y = self._get_audio_wave(index)
        
        y = x - y
        x_magnitude, x_phase = self._wave_to_magnitude_db_and_phase(x, n_fft = self.n_fft, hop_length = self.hop_length)
        y_magnitude, y_phase = self._wave_to_magnitude_db_and_phase(y, n_fft = self.n_fft, hop_length = self.hop_length)
        
#         y_magnitude = x_magnitude - y_magnitude
        x_magnitude = self._normalize(x_magnitude)
        y_magnitude = self._normalize(y_magnitude)
        return np.expand_dims(x_magnitude, -1), np.expand_dims(y_magnitude, -1)
    
    def _batch_n(self, beg, end):
        X = []
        Y = []
        for i in range(beg, end):
            x, y = self._batch_1(i)
            X.append(x)
            Y.append(y)
        return np.array(X), np.array(Y)
    
    def _load_audio(self, path):
        wave, sr = librosa.load(path, mono = True, sr = None)
        return wave
    
    def _get_audio_wave(self, index):
        clean_file = self.clean_file_list[index]
        noise_file = os.path.join(self.noise_audio_path, clean_file.rsplit("/", 1)[1])
        clean_wave = self._load_audio(clean_file)
        noise_wave = self._load_audio(noise_file)
        start_location = np.random.randint(len(clean_wave) - self.wave_size)
        return noise_wave[start_location: start_location + self.wave_size], \
                clean_wave[start_location: start_location + self.wave_size]
    
    def _wave_to_magnitude_db_and_phase(self, wave, n_fft, hop_length):
        stftaudio = librosa.stft(wave, n_fft=n_fft, hop_length=hop_length)
        stftaudio_magnitude, stftaudio_phase = librosa.magphase(stftaudio)
        stftaudio_magnitude_db = librosa.amplitude_to_db(stftaudio_magnitude, ref=np.max)
        return stftaudio_magnitude_db, stftaudio_phase
        
    def _normalize(self, x):
        min_val = np.min(x)
        max_val = np.max(x)
        x = (x - min_val) / ((max_val - min_val) / 2)
        return x - 1

In [58]:
clean_audio_path = "/Volumes/IPEVO_X0244/speech_to_text/speech_to_text_dataset/clean_trainset_56spk_wav/"
noise_audio_path = "/Volumes/IPEVO_X0244/speech_to_text/speech_to_text_dataset/noisy_trainset_56spk_wav/"

g = data_generator(clean_audio_path = clean_audio_path, noise_audio_path= noise_audio_path, file_size= 0.5)

d = g.__getitem__(0)
print(d[0].shape)
print(d[1].shape)

(5, 128, 128, 1)
(5, 128, 128, 1)


In [59]:
model = unet(verbose = False)
model.compile(optimizer= keras.optimizers.adam(learning_rate= 1e-4),loss = keras.losses.huber_loss)
model.fit_generator(g, epochs=10,)


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10

KeyboardInterrupt: 