# SEGAN_OM

a GAN based filter method for speech enhancement

### 1) Data pipeline

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf
import numpy as np
from io import *
import os.path

In [2]:
SAMPLE_SIZE=2*15
kernel_size=31
BATCH_SIZE = 10 # used for loading the data 

In [3]:
def load_file(file_path, window = 2**14, stride=0.5, sampling=16000):
    """
    Loads a wav file and returns it as a Tensor.
    Inputs:
    file_path: the path of the wav file
    window: optional, size of each sample batch
    stride: optional, defult = 0.5
    sampling: optional, sampling rate, default 16000
    Returns:
    slices: a list of TF-Tensors containing the values of the wav after cropping the last part exceding the windows width
    """
    sampling_rate = tf.constant(sampling,dtype=tf.int32,shape=())
    slices = []
    window = int(window*stride)
    file = tf.io.read_file(file_path)
    input_sequence = tf.audio.decode_wav(file)
    if not tf.math.equal(sampling_rate,input_sequence[1]):
        raise ValueError(f'Sampling rate is expected to be {sampling}! Got {input_sequence[1]}')
    input_sequence_length = input_sequence[0].shape[0]
    for win_start in range(0,input_sequence_length,window):
        slice_window = input_sequence[0][win_start:win_start+window]
        if slice_window.shape[0] == window:
            slices.append(slice_window)
    return np.array(slices, dtype=np.float64)

In [4]:
# From TFRecord Tutorial
# The following functions can be used to convert a value to a type compatible

def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

In [5]:
path = "./Dataset/clean"
out_filepath = './Dataset/save10.tfrecords'
out_file = tf.io.TFRecordWriter(out_filepath)
signals_clean = []
for file in  os.listdir(path):
    sequence=load_file(os.path.join(path,file))
    signals_clean.append(sequence)
    break

In [6]:
for wav in signals_clean:
    print(wav.shape)
    #wav_raw = wav.tostring()
    #example = tf.train.Example(features=tf.train.Features(feature={
     #   'wav_raw': _bytes_feature(wav_raw)}))

(5, 8192, 1)


### 2) Layers & Model

#### 2.1: Build the Generator

In [7]:
def downsample(filter_width, kernel=31, #size, 
              strides = 2, padding = 'same', init= None):
    """
    creates a 1D-Conv-Block for the Generator with given kernel & filters.
    
    Arguments:
    filter_size -- tf.keras.Conv1D.filters
    kernel -- tf.keras.Conv1D.kernel_size, set to 31 for this application
    strides -- optional, default is '2' for this application
    padding -- optional, default is 'same'
    init -- weights initializer, will be set to He is none is given
    
    Returns:
    block -- tf.Tensor block of a 1D-Conv
    """
    # set the initializer if none is given
    if init is None:
        init = tf.keras.initializers.he_normal()
    
    # make the convolutional block
    block = tf.keras.Sequential()
    block.add(tf.keras.layers.Conv1D(filters = filter_width, kernel_size = kernel, strides=strides,
                                     #(kernel, 1), strides=(strides, 1), #for conv2d
                                     padding=padding, kernel_initializer=init, use_bias=False))
    # add the activation function
    block.add(tf.keras.layers.PReLU())
    
    return block

In [8]:
def upsample(filter_width, kernel=31, #size, 
              strides = 2, padding = 'same', init= None):
    """
    creates a 1D-Deconv-Block for the Generator with given kernel & filters.
    
    Arguments:
    filter_size -- tf.keras.Conv1D.filters
    kernel -- tf.keras.Conv1D.kernel_size, set to 31 for this application
    strides -- optional, default is '2' for this application
    padding -- optional, default is 'same'
    init -- weights initializer, will be set to He is none is given
    
    Returns:
    block -- tf.Tensor block of a 1D-Conv
    """
    # set the initializer if none is given
    if init is None:
        init = tf.keras.initializers.he_normal()
    
    # make the convolutional block
    block = tf.keras.Sequential()
    block.add(tf.keras.layers.Conv2DTranspose(filters = filter_width, kernel_size = (kernel, 1), strides=(strides, 1),
                                     padding=padding, kernel_initializer=init, use_bias=False))
    
    
    # add the activation function
    block.add(tf.keras.layers.LeakyReLU())
    
    return block

In [40]:
def Generator():
    inputs = tf.keras.layers.Input(shape=[2**14, 1])
    #inputs = tf.keras.backend.expand_dims(inputs, axis=1)
    
    down_stack = [
        downsample(16, 16384),
        downsample(32, 8192),
        downsample(32, 4096),
        downsample(64, 2048),
        downsample(64, 1024),
        downsample(128, 512),
        downsample(128, 256),
        downsample(256, 128),
        downsample(256,  64),
        downsample(512,  32),
        downsample(1024, 16),
    ]

    up_stack = [
        upsample(512,  32),
        upsample(256,  64),
        upsample(256, 128),
        upsample(128, 256),
        upsample(128, 512),
        upsample(64, 1024),
        upsample(64, 2048),
        upsample(32, 4096),
        upsample(32, 8192),
        upsample(16, 16382),
    ]

    x = inputs

    # Downsampling through the model
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)
        
    skips = reversed(skips[:-1])
    
    # Upsampling and establishing the skip connections
    for up, skip in zip(up_stack, skips):
        x = tf.keras.backend.expand_dims(x, axis=2)
        x = up(x)
        x = tf.keras.backend.squeeze(x, axis=2)
        x = tf.keras.layers.Concatenate()([x, skip])

        
    x = tf.keras.backend.expand_dims(x, axis=2)
    x = upsample(1, 32768)(x)
    x = tf.keras.backend.squeeze(x, axis=2)

    return tf.keras.Model(inputs=inputs, outputs=x)

In [41]:
generator = Generator()
generator.summary()
#tf.keras.utils.plot_model(generator, show_shapes=True, dpi=64)


Model: "model_11"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_16 (InputLayer)           [(None, 16384, 1)]   0                                            
__________________________________________________________________________________________________
sequential_321 (Sequential)     (None, 8192, 16)     393216      input_16[0][0]                   
__________________________________________________________________________________________________
sequential_322 (Sequential)     (None, 4096, 32)     4325376     sequential_321[0][0]             
__________________________________________________________________________________________________
sequential_323 (Sequential)     (None, 2048, 32)     4259840     sequential_322[0][0]             
___________________________________________________________________________________________

### Training

In order to understand the functionning of the generator we will train it over 50 epochs using the l1-loss function.