In [1]:
import tensorflow as tf
import numpy as np
import librosa
import pickle

In [39]:
# generating 4 digit number strings from 0000 to 1119 for the filenames
ids = ["%04d"%i for i in range(1200)]

In [2]:
"""
loading the trianing set pickled files X_tr, S_tr, N_tr in stft domain
X_tr: clean + noise signals 
S_tr: clean signals
N_tr: noise signals 
"""

with open('data/signals.pkl', 'rb') as f:
    X_tr, S_tr, N_tr = pickle.load(f)

In [3]:
# taking magnitude and transpose

X_tr_mod = [np.abs(signal).T for signal in X_tr]
S_tr_mod = [np.abs(signal).T for signal in S_tr]
N_tr_mod = [np.abs(signal).T for signal in N_tr]

The target of the training procedure is Ideal Binary Masks (IBM) and it is constructed in the following way:

\begin{equation*}
M_{f,t}^{(l)} =
\begin{cases}
    1 & \text{if} & |S^{(l)}_{tr}|_{f,t} > |N^{(l)}_{tr}|_{f,t} \\
    0 & \text{if} & |S^{(l)}_{tr}|_{f,t} \leq |N^{(l)}_{tr}|_{f,t}
\end{cases}
\end{equation*}
where $l$ denotes a trainng sample.


IBM assumes that each of the time-frequency bin at (f,t), an element of $X_{tr}^{(l)}$, is from either speech or noise. Clean speech signal can be recovered using:

\begin{equation*}
\hat{S} = M \odot X
\end{equation*}


In [4]:
# constructing IBM matrix

M = []

for i in range(1200):
    M.append(np.greater(S_tr_mod[i], N_tr_mod[i]).astype(int))

In [5]:
# path to save/restore model
model_path = "models/rnn-denoise/model.ckpt"

x = tf.placeholder(tf.float32, [10, None, 513])
y_ = tf.placeholder(tf.float32, [10, None, 513])

hidden_units = 256
out_weights = tf.Variable(tf.random_normal([hidden_units, 513], stddev=2/(hidden_units+513), mean=0)) # xavier init
out_bias = tf.Variable(tf.zeros([513]))

In [6]:
# defining the LSTM cell & n/w

lstm_cell = tf.nn.rnn_cell.LSTMCell(hidden_units, initializer=tf.contrib.layers.xavier_initializer())
outputs, _ = tf.nn.dynamic_rnn(lstm_cell, x, dtype=tf.float32)

In [7]:
"""
add dimension to out_weights so that it can be multiplied with RNN outputs 

outputs.shape = [10, ?, hidden_units]
out_weights.shape = [hidden_units, 513]

To multiply them, out_weights needs to be expanded to [10, hidden_units, 513]
"""
weights = tf.expand_dims(tf.ones([10, 1]), 1) * out_weights

y = tf.nn.sigmoid(tf.matmul(outputs, weights) + out_bias)

In [8]:
mse = tf.reduce_mean(tf.losses.mean_squared_error(y_, y))
train_step = tf.train.AdamOptimizer().minimize(mse) # adam optimizer with default learning rate

In [9]:
init = tf.global_variables_initializer()
saver = tf.train.Saver() 
sess = tf.Session(config=config)
sess.run(init)

In [10]:
#train step

epochs = 100

for epoch in range(epochs):
    avg_cost = 0
    for i in range(0, 1200, 10):
        batch_x = X_tr_mod[i:i+10] 
        batch_y = M[i:i+10]
        _, cost = sess.run([train_step, mse], feed_dict={x: batch_x, y_: batch_y})
        avg_cost += cost/120
    print("Epoch:", '%02d'%(epoch+1), "\tcost={:.9f}".format(avg_cost))

Epoch: 01 	cost=0.226507796
Epoch: 02 	cost=0.198697235
Epoch: 03 	cost=0.183353049
Epoch: 04 	cost=0.169997571
Epoch: 05 	cost=0.162413992
Epoch: 06 	cost=0.155697847
Epoch: 07 	cost=0.151109692
Epoch: 08 	cost=0.147298979
Epoch: 09 	cost=0.144272449
Epoch: 10 	cost=0.141634188
Epoch: 11 	cost=0.139778016
Epoch: 12 	cost=0.137745353
Epoch: 13 	cost=0.135811115
Epoch: 14 	cost=0.134418409
Epoch: 15 	cost=0.133526837
Epoch: 16 	cost=0.131870356
Epoch: 17 	cost=0.131329252
Epoch: 18 	cost=0.129605003
Epoch: 19 	cost=0.128048953
Epoch: 20 	cost=0.126626614
Epoch: 21 	cost=0.125324596
Epoch: 22 	cost=0.124352452
Epoch: 23 	cost=0.123274874
Epoch: 24 	cost=0.122364058
Epoch: 25 	cost=0.121724157
Epoch: 26 	cost=0.120848986
Epoch: 27 	cost=0.120137190
Epoch: 28 	cost=0.119741255
Epoch: 29 	cost=0.118962771
Epoch: 30 	cost=0.118396570
Epoch: 31 	cost=0.118777787
Epoch: 32 	cost=0.117374832
Epoch: 33 	cost=0.116481439
Epoch: 34 	cost=0.116489816
Epoch: 35 	cost=0.115418935
Epoch: 36 	cost=0.11

In [None]:
# saving the model
save_path = saver.save(sess, model_path)

In [17]:
# restore trained model to use for validation
saver.restore(sess, model_path)

INFO:tensorflow:Restoring parameters from models/rnn-denoise/model.ckpt


In [12]:
# load pickled time domain S_v: clean signals from validation set

with open('data/validation_clean.pkl', 'rb') as f:
    s = pickle.load(f)

In [13]:
# loading picked validation set files X_v, S_v, N_v in stft domain

with open('data/validation.pkl', 'rb') as f:
    X_v, S_v, N_v = pickle.load(f)

In [14]:
# taking magnitude and transpose

X_v_mod = [np.abs(signal).T for signal in X_v]
S_v_mod = [np.abs(signal).T for signal in S_v]
N_v_mod = [np.abs(signal).T for signal in N_v]
X_v_T = [signal.T for signal in X_v]

In [15]:
M_v = []
for i in range(1200):
    M_v.append(np.greater(S_v_mod[i], N_v_mod[i]).astype(int))

Calculating SNR (Signal-to-Noise ratio) to check performance of model on validation set. 

\begin{equation*}
SNR = 10\log_{10}\frac{\sum_t s^2(t)}{\sum_t (s(t) - \hat{s}(t))^2}
\end{equation*}

If the recovered signal is same as the original clean signal, the denominator will be zero and the SNR becomes infinitely large. Therfore, higher the SNR, the better the model. 

In [18]:
# checking validation loss and calculating snr

avg_cost = 0
snr = []

for i in range(0, 1200, 10):
    batch_x = X_v_mod[i:i+10] 
    batch_y = M_v[i:i+10]

    cost, M_hat = sess.run([mse, y], feed_dict={x: batch_x, y_: batch_y})
    
    avg_cost += cost/120
    
    batch_x_complex = X_v_T[i:i+10]
    batch_s = s[i:i+10]
    for j in range(10):
        S_hat = np.multiply(M_hat[j], batch_x_complex[j])
        s_hat = librosa.istft(S_hat.T, win_length=1024, hop_length=512)
        
        t = min(len(s_hat), len(batch_s[j]))
        snr.append(10*np.log10((np.sum(np.square(batch_s[j][:t])))/np.sum(np.square(batch_s[j][:t]-s_hat[:t]))))

print("Validation loss = {:.9f}".format(avg_cost))
print("SNR = ", sum(snr)/1200)

Validation loss = 0.130348144
SNR =  11.051182855168978


In [17]:
# filenames of test files (400 test files)
te_filenames = ['tex{}.wav'.format(id) for id in ids[:400]]

In [18]:
# load pickled test files X_te in stft domain

with open('data/test.pkl', 'rb') as f:
    X_te, srs = pickle.load(f)

In [19]:
# taking magnitude and transpose

X_te_mod = [np.abs(signal).T for signal in X_te]
X_te_T = [signal.T for signal in X_te]

In [None]:
# reconstructing test signals

for i in range(0, 400, 10):
    batch_x = X_te_mod[i:i+10] 
    M_hat = sess.run(y, feed_dict={x: batch_x})
    
    batch_x_complex = X_te_T[i:i+10]
    batch_filenames = te_filenames[i:i+10]
    batch_sr = srs[i:i+10]
    for j in range(10):
        S_hat = np.multiply(M_hat[j], batch_x_complex[j])
        s_hat = librosa.istft(S_hat.T, win_length=1024, hop_length=512)
        librosa.output.write_wav('outputs/recons_'+batch_filenames[j][-11:], s_hat, srs[j])

In [21]:
sess.close()