In [1]:
import tensorflow as tf
import numpy as np
import scipy.io.wavfile
import os
tf.reset_default_graph()
sess = tf.InteractiveSession()

In [2]:
# data preprocessing
seq_length = 800
merge_factor = 4#temporal resolution = merge_factor/16000 #discrimination_window = merge_factor*seq_length/16000
n_train = 200; n_test = 200;
num_channels = 2
input_size = num_channels*merge_factor #n_channels*merge_factor
num_classes = 5 #n_directions
offset = 64
train_x = []; train_t = []
test_x = []; test_t = []
for i in range(num_classes):
    angle = 0+(45*i)
    temp_data = scipy.io.wavfile.read('tao3/'+'d'+str(angle)+'.wav')[1][:]
    for j in range(n_train+n_test):
        if j%2 != 1:
            train_x.append(temp_data[offset+(j*seq_length*merge_factor):offset+((j+1)*seq_length*merge_factor)])
            train_t.append(i)
        else:
            test_x.append(temp_data[offset+(j*seq_length*merge_factor):offset+((j+1)*seq_length*merge_factor)])
            test_t.append(i)

train_x = np.array(train_x); train_t = np.array(train_t)
train_x = train_x.reshape(n_train*num_classes,seq_length,input_size)
mean = np.mean(train_x); std = np.std(train_x)
train_x = (train_x - mean + 0.0)/std
test_x = np.array(test_x); test_t = np.array(test_t)
test_x = test_x.reshape(n_test*num_classes,seq_length,input_size)
test_x = (test_x - mean + 0.0)/std

In [3]:
# hyperparameters
x = tf.placeholder(tf.float32, [None,seq_length, input_size])
labels = tf.placeholder(tf.int32, [None])
i_size = 40
h_size = 40
g_size = 40
learning_rate = tf.placeholder(tf.float32,[])

In [4]:
sdev = 0.1   
params = {
    'Wxi': tf.Variable(tf.random_normal([input_size, i_size],stddev=sdev)),
    'Wih': tf.Variable(tf.random_normal([i_size,h_size],stddev=sdev)),
    'Whg': tf.Variable(tf.random_normal([h_size, g_size],stddev = sdev)),
    'Wgg': tf.Variable(tf.random_normal([g_size, g_size],stddev = sdev)),
    'Wgy': tf.Variable(tf.random_normal([g_size,num_classes],stddev = sdev)),
    'bi': tf.Variable(tf.random_normal([i_size],stddev=sdev)),
    'bh': tf.Variable(tf.random_normal([h_size],stddev=sdev)),
    'bg': tf.Variable(2.0*tf.ones([g_size])),
    'by': tf.Variable(tf.zeros([num_classes]))
}

In [5]:
# architecture
def RNN(x,params):
    batch_size = tf.shape(x)[0]
    g = tf.zeros([batch_size,g_size])
    for t in range(seq_length):
        i = tf.tanh(tf.add(tf.matmul(x[:,t,:],params['Wxi']),params['bi']))
        mu,sigma = tf.nn.moments(i,[0]); i = tf.nn.batch_normalization(i, mu, sigma,None,None,variance_epsilon=1e-8)
        h = tf.tanh(tf.add(tf.matmul(i,params['Wih']),params['bh']))
        mu,sigma = tf.nn.moments(h,[0]); h = tf.nn.batch_normalization(h, mu, sigma,None,None,variance_epsilon=1e-8)
        g = tf.tanh(tf.add(tf.add(tf.matmul(g,params['Wgg']),tf.matmul(h,params['Whg'])),params['bg']))
        mu,sigma = tf.nn.moments(g,[0]); g = tf.nn.batch_normalization(g, mu, sigma,None,None,variance_epsilon=1e-8)
    y = tf.add(tf.matmul(g,params['Wgy']),params['by'])
    return y

In [6]:
# training strategy
y = RNN(x,params)

In [7]:
cost = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(y,labels))
optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost)
# grads_and_vars = optimizer.compute_gradients(cost)
# capped_grads_and_vars = [(tf.clip_by_value(gv[0],-5.,5.), gv[1]) for gv in grads_and_vars]
# capped_optimizer = optimizer.apply_gradients(capped_grads_and_vars)
# saver = tf.train.Saver()
sess.run(tf.initialize_all_variables())

In [None]:
n_iter = 0
train_x1 = train_x[:n_train/2]; train_t1 = train_t[:n_train/2]
train_x2 = train_x[n_train/2:]; train_t2 = train_t[n_train/2:]
while True:
    _,cost1 = sess.run([optimizer,cost],feed_dict={x:train_x1,labels:train_t1,learning_rate:0.05})
    _,cost2 = sess.run([optimizer,cost],feed_dict={x:train_x2,labels:train_t2,learning_rate:0.05})
    print('iter'+str(n_iter)+' cost:'+str((cost1+cost2)/2.))
    if n_iter % 20 == 0:
        y_train = sess.run(y,feed_dict={x:train_x})
        print 'train pf = ' + str((sum(np.argmax(y_train,axis=1) == train_t) + 0.)/(n_train*num_classes))
        y_test = sess.run(y,feed_dict={x:test_x})
        print 'test pf = ' + str((sum(np.argmax(y_test,axis=1) == test_t) + 0.)/(n_test*num_classes))+'\n'
#         mypath = 'vars/iter'+str(n_iter)
#         if not os.path.isdir(mypath):
#            os.makedirs(mypath)
#         myfile = mypath+'/'+str(n_iter)
#         saver.save(sess,myfile,write_meta_graph=False)
    n_iter += 1

In [None]:
sess.close()

In [17]:
len(train_x)

1000

In [None]:
# new test data
seq_length = 800
merge_factor = 4#temporal resolution = merge_factor/16000 #discrimination_window = merge_factor*seq_length/16000
n_test = 360;
num_channels = 2
input_size = num_channels*merge_factor #n_channels*merge_factor
num_classes = 5 #n_directions
offset = 64
test_x = []; test_t = []
for i in range(num_classes):
    angle = 0+(45*i)
    temp_data = scipy.io.wavfile.read('speech2/'+'d'+str(angle)+'.wav')[1]
    #temp_data = scipy.io.wavfile.read('test.wav')[1]
    for j in range(n_test):
            test_x.append(temp_data[offset+(j*seq_length*merge_factor):offset+((j+1)*seq_length*merge_factor)])
            test_t.append(i)

test_x = np.array(test_x); test_t = np.array(test_t)
test_x = test_x.reshape(n_test*num_classes,seq_length,input_size)
y_test = sess.run(y,feed_dict={x:test_x})
a = np.argmax(y_test,axis=1)
np.set_printoptions(threshold=np.inf)
print a
print 'test pf = ' + str((sum(a == test_t) + 0.0)/(n_test*num_classes))+'\n'

In [None]:
var = params['Wxi'].eval()

In [None]:
import matplotlib.pyplot as plt
plt.imshow(var,cmap='Greys')
plt.colorbar()
plt.show()