In [1]:
import numpy as np
from scipy import signal
from scipy.io import wavfile
import tensorflow as tf

from os import listdir
from os.path import isfile, join

import matplotlib.pyplot as plt
%matplotlib inline

from keras.utils import to_categorical

Using TensorFlow backend.


In [2]:
# import IPython
# IPython.display.Audio("./dataset/m3/02.wav")

In [3]:
def get_files(path):
    return [f for f in listdir(path) if isfile(join(path, f)) and not f.endswith('.ini')]

def read_audio(path ,directory, index, Tx):
    x = np.empty((len(path),n_freq, Tx))
    y = np.empty( ((len(path), num_classes)) )
    for i, fl in enumerate(path):
        try:
            sample_rate, audio = wavfile.read(directory+fl)
        except ValueError:
            print("{} not an wav file".format(directory+fl))
        _, _, spectrogram = signal.spectrogram(audio, sample_rate)
        try:
            x[i,:,:] = spectrogram[:,:Tx]
        except ValueError:
            print("Error: Time steps too short {}".format(directory+fl))
        y[i,:] = to_categorical(index, num_classes)
    return x, y

def mean(X, n):
    return X.sum()/n

def variance(X, n):
    return np.square(X).sum() / n

In [4]:
Tx = 400 # The number of time steps input to the model from the spectrogram
n_freq = 129 # Number of frequencies input to the model at each time step of the spectrogram
mat = Tx * n_freq # Number of values in each spectrogram
num_classes = 8 # number of output classes 

In [None]:
local_dir ='./dataset/'
mypath = ['1/','2/','3/','4/','5/','6/','7/','8/']

X_train = np.zeros((1, n_freq, Tx))
Y_train = np.zeros((1, num_classes))
for i, folder in enumerate(mypath):
    files = get_files(local_dir+folder)
    X, Y = read_audio(files, local_dir+folder, i, Tx)
    X_train = np.append(X_train , X, axis =0)
    Y_train = np.append(Y_train, Y, axis=0)

Y_train = Y_train[1:,:].astype('int32')
X_train = X_train[1:,:,:].astype('float32')

a, b, c = X_train.shape
X_train = np.reshape(X_train, (a, b, c, 1))

In [6]:
# Normalize inputs
# X_mean = (X_train.sum((1,2)) / mat).reshape(-1, 1, 1, 1)
# X_variance = (np.square(X_train).sum((1,2)) / mat).reshape(-1, 1, 1, 1)
# X_train = (X_train - X_mean) / X_variance

In [None]:
local_test_dir ='./dataset/test/'
mypath = ['1/','2/','3/','4/','5/','6/','7/','8/']

X_test = np.zeros((1, n_freq, Tx))
Y_test = np.zeros((1, num_classes))
for i, folder in enumerate(mypath):
    files = get_files(local_test_dir+folder)
    X, Y = read_audio(files, local_test_dir+folder, i, Tx)
    X_test = np.append(X_test , X, axis =0)
    Y_test = np.append(Y_test, Y, axis=0)

Y_test = Y_test[1:,:].astype('int32')
X_test = X_test[1:,:,:].astype('float32')

d, e, f = X_test.shape
X_test = np.reshape(X_test, (d, e, f, 1))

In [8]:
# Normalize inputs
# X_mean = (X_test.sum((1,2)) / mat).reshape(-1, 1, 1, 1)
# X_variance = (np.square(X_test).sum((1,2)) / mat).reshape(-1, 1, 1, 1)
# X_test = (X_test - X_mean) / X_variance

In [9]:
num_examples = X_train.shape[0]
num_steps = 250
dropout = 0.75
display_step = 10

In [10]:
tf.reset_default_graph()
X = tf.placeholder(tf.float32, [None, n_freq, Tx, 1])
Y = tf.placeholder(tf.float32, [None, num_classes])

keep_prob = tf.placeholder(tf.float32)

In [11]:
# Create some wrappers for simplicity
def conv2d(x, W, b, strides=1):
    x = tf.nn.conv2d(x, W, strides=[1, strides, strides, 1], padding='SAME')
    x = tf.nn.bias_add(x, b)
    return tf.nn.relu(x)


def maxpool2d(x, k=2):
    return tf.nn.max_pool(x, ksize=[1, k, k, 1], strides=[1, k, k, 1],
                          padding='SAME')


# Create model
def conv_net(x, weights, biases, dropout):

    conv1 = conv2d(x, weights['wc1'], biases['bc1'])
    print(conv1)
    conv1 = maxpool2d(conv1, k=2)
    print(conv1)

    # Convolution Layer
    conv2 = conv2d(conv1, weights['wc2'], biases['bc2'])
    print(conv2)
    conv2 = maxpool2d(conv2, k=2)
    print(conv2)
    
    # Convolution Layer
    conv3 = conv2d(conv2, weights['wc3'], biases['bc3'])
    print(conv3)
    conv3 = maxpool2d(conv3, k=2)
    print(conv3)
    

    fc1 = tf.reshape(conv3, [-1, weights['wd1'].get_shape().as_list()[0]])
    fc1 = tf.add(tf.matmul(fc1, weights['wd1']), biases['bd1'])
    fc1 = tf.nn.relu(fc1)
    # Apply Dropout
    fc1 = tf.nn.dropout(fc1, dropout)

    out = tf.add(tf.matmul(fc1, weights['out']), biases['out'])
    print(out)
    return out



In [12]:
weights = {
    # 5x5 conv, 1 input, 32 outputs
    'wc1': tf.Variable(tf.random_normal([5, 5, 1, 4])),
    'wc2': tf.Variable(tf.random_normal([5, 5, 4, 8])),
    'wc3': tf.Variable(tf.random_normal([5, 5, 8, 16])),
    'wd1': tf.Variable(tf.random_normal([17*50*16, 256])),
    'out': tf.Variable(tf.random_normal([256, num_classes]))
}

In [13]:
biases = {
    'bc1': tf.Variable(tf.random_normal([4])),
    'bc2': tf.Variable(tf.random_normal([8])),
    'bc3': tf.Variable(tf.random_normal([16])),
    'bd1': tf.Variable(tf.random_normal([256])),
    'out': tf.Variable(tf.random_normal([num_classes]))
}

In [14]:
logits = conv_net(X, weights, biases, keep_prob)
prediction = tf.nn.softmax(logits)

loss_op = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=logits, labels=Y))

optimizer = tf.train.AdamOptimizer()
train_ops = optimizer.minimize(loss_op)

# Evaluate model
correct_pred = tf.equal(tf.argmax(prediction, 1), tf.argmax(Y, 1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

Tensor("Relu:0", shape=(?, 129, 400, 4), dtype=float32)
Tensor("MaxPool:0", shape=(?, 65, 200, 4), dtype=float32)
Tensor("Relu_1:0", shape=(?, 65, 200, 8), dtype=float32)
Tensor("MaxPool_1:0", shape=(?, 33, 100, 8), dtype=float32)
Tensor("Relu_2:0", shape=(?, 33, 100, 16), dtype=float32)
Tensor("MaxPool_2:0", shape=(?, 17, 50, 16), dtype=float32)
Tensor("Add_1:0", shape=(?, 8), dtype=float32)


In [15]:
init = tf.global_variables_initializer()
saver = tf.train.Saver()

In [16]:
# Start training
with tf.Session() as sess:

    sess.run(init)

    for step in range(1, num_steps+1):
        
        #training uncommnt below line if needed
        ##sess.run(train_ops , feed_dict={X: X_train, Y: Y_train, keep_prob: dropout})
        
        if step % display_step == 0 or step == 1:
            # Calculate batch loss and accuracy
            acc = sess.run(accuracy, feed_dict={X: X_train, Y: Y_train, keep_prob: 1.0})
            acct = sess.run(accuracy, feed_dict={X: X_test, Y: Y_test, keep_prob: 1.0})
            print("epoch " + str(step) + ", train acc= " + \
                  "{:.3f}".format(acc) + ", test acc= " + \
                  "{:.3f}".format(acct))
            print("-"*50)
        if step % 50 == 0:
            saver.save(sess,"models/pre_trained_cnn.ckpt", global_step=step)

epoch 1, train acc= 0.124, test acc= 0.153
--------------------------------------------------
epoch 10, train acc= 0.246, test acc= 0.250
--------------------------------------------------
epoch 20, train acc= 0.370, test acc= 0.298
--------------------------------------------------
epoch 30, train acc= 0.457, test acc= 0.379
--------------------------------------------------
epoch 40, train acc= 0.529, test acc= 0.460
--------------------------------------------------
epoch 50, train acc= 0.627, test acc= 0.484
--------------------------------------------------
epoch 60, train acc= 0.671, test acc= 0.524
--------------------------------------------------
epoch 70, train acc= 0.731, test acc= 0.524
--------------------------------------------------
epoch 80, train acc= 0.775, test acc= 0.524
--------------------------------------------------
epoch 90, train acc= 0.814, test acc= 0.524
--------------------------------------------------
epoch 100, train acc= 0.837, test acc= 0.524
------

In [111]:
test_file = "voicefile.wav" # eg:-  "voicefile.wav" 
# test_file = "test_music.wav" # eg:-  "voicefile.wav"    
# test_file = "054.wav" # eg:-  "voicefile.wav"  

# upload 8bit, 16KHz mono audio
try:
    sample_rate, audio = wavfile.read(test_file)
except ValueError:
    print("{} not a wav file".format(test_file))

_, _, X_val = signal.spectrogram(audio, sample_rate)
print(X_val.shape)
freq, time = X_val.shape
if time < Tx or freq < n_freq:
    print("Error {}: Time steps too short  upload a longer file\
          or make sure that frequency is above 129 hz ".format(test_file))

X_val = X_val[:n_freq,:Tx].reshape(1, n_freq, Tx, 1)

(129, 794)
(1, 129, 400, 1)


In [112]:
predic = 0
with tf.Session() as sess:
    saver = tf.train.Saver()
    saver.restore(sess, "models/pre_trained_cnn.ckpt-100")
    
    predic = sess.run(prediction , feed_dict={X: X_val, keep_prob: 1.0})

In [113]:
print("Given audio file resembles class {} from training set.".format(predic.argmax()+1))

Given audio file resembles class 1 from training set.
