In [1]:
'''
A Reccurent Neural Network (LSTM) implementation example using TensorFlow library.
This example is using the MNIST database of handwritten digits (http://yann.lecun.com/exdb/mnist/)
Long Short Term Memory paper: http://deeplearning.cs.cmu.edu/pdfs/Hochreiter97_lstm.pdf

Author: Aymeric Damien
Project: https://github.com/aymericdamien/TensorFlow-Examples/
'''

import tensorflow as tf
from tensorflow.models.rnn import rnn, rnn_cell
import numpy as np

import os
from os import path
import sys
import random
from random import randint

import numpy, scipy, matplotlib.pyplot as plt, pandas, librosa

In [22]:
slicelength = 50
feature_power = 2 #TODO play with this
data_proportion = .01

classical_names = [f for f in os.listdir("./classical_short_%i/" % slicelength)]
country_names = [f for f in os.listdir("./country_short_%i/" % slicelength)]

num_datapoints = int(min(len(classical_names), len(country_names)) * data_proportion)
print "num datapoints: " + str(num_datapoints)

random.shuffle(classical_names)
random.shuffle(country_names)
classical_names = classical_names[0:num_datapoints]
country_names = country_names[0:num_datapoints]

def stft_array(names, kind):
    update_per = len(names)/10
    print "update per %i" % update_per
    array = []
    for i, x in enumerate(names):
        song_path = '%s_short_%i/%s' % (kind, slicelength, x)
        if i%update_per == 0:
            sys.stdout.write(str(i/update_per) + " ")
            print "adding " + song_path + " to " + kind
        csong_stft = librosa.stft(librosa.load(song_path)[0])
        csong_log_stft = librosa.logamplitude(np.abs(csong_stft)**feature_power,ref_power=np.max)
#         csong_log_stft = [abs(item)/80.0 for sublist in csong_log_stft for item in sublist] #TODO: consider removing abs / 80
        array.append(csong_log_stft.T)
    return array

#shape is 4 by 1025
classical_train = stft_array(classical_names, 'classical')
country_train = stft_array(country_names, 'country')

test_classical = []
test_country = []

print "number of classical slices: %i" % len(classical_train)

while len(test_classical) < len(classical_train) / 10:
    r = randint(0, len(classical_train) - 1)
    test_classical.append(classical_train[r])
    del classical_train[r]
    
print "number of classical training slices: %i" % len(classical_train)
print "number of classical testing slices: %i" % len(test_classical)

print "number of country slices: %i" % len(country_train)
while len(test_country) < len(country_train) / 10:
    r = randint(0, len(country_train) - 1)
    test_country.append(country_train[r])
    del country_train[r]
    
print "number of country training slices: %i" % len(country_train)
print "number of country testing slices: %i" % len(test_country)

num datapoints: 408
update per 40
0 adding classical_short_50/song9_289.mp3 to classical
1 adding classical_short_50/song6_6623.mp3 to classical
2 adding classical_short_50/song3_7138.mp3 to classical
3 adding classical_short_50/song3_3809.mp3 to classical
4 adding classical_short_50/song0_19293.mp3 to classical
5 adding classical_short_50/song9_5476.mp3 to classical
6 adding classical_short_50/song3_7263.mp3 to classical
7 adding classical_short_50/song0_325.mp3 to classical
8 adding classical_short_50/song8_1100.mp3 to classical
9 adding classical_short_50/song6_4151.mp3 to classical
10 adding classical_short_50/song0_5779.mp3 to classical
update per 40
0 adding country_short_50/song9_1482.mp3 to country
1 adding country_short_50/song1_2294.mp3 to country
2 adding country_short_50/song0_1134.mp3 to country
3 adding country_short_50/song1_1365.mp3 to country
4 adding country_short_50/song1_2047.mp3 to country
5 adding country_short_50/song1_1090.mp3 to country
6 adding country_short_5

In [30]:
testing_with_mnist = True

if testing_with_mnist:
    # testing to see if mnist will work here
    from tensorflow.examples.tutorials.mnist import input_data
    mnist = input_data.read_data_sets("MNIST_data/", one_hot=True)

    training = mnist.train.images
    testing = mnist.test.images
    training_validation_one_hot = mnist.train.labels
    testing_validation_one_hot = mnist.test.labels
else:
    training = (numpy.array(classical_train + country_train)).astype(numpy.float32)
    testing = (numpy.array(test_classical + test_country)).astype(numpy.float32)
    classical_one_hot = numpy.array([0,1])
    country_one_hot = numpy.array([1,0])
    training_validation_one_hot = (numpy.array([classical_one_hot for x in classical_train] + [country_one_hot for x in country_train])).astype(numpy.float32)
    testing_validation_one_hot = (numpy.array([classical_one_hot for x in test_classical] + [country_one_hot for x in test_country])).astype(numpy.float32)

def type_check(array_types, num_types):
    error_flag = False
    for x in array_types:
        if type(x) is numpy.ndarray:
            print "ARRAY TYPE ERROR"
            error_flag = True
    for x in num_types:
        if type(x) is numpy.float32:
            print "NUM TYPE ERROR"
            error_flag = True 
    if not error_flag:
        print "All types check out"
        
array_types = [type(training), type(training[0]), type(training_validation_one_hot), type(training_validation_one_hot[0]),
              type(testing), type(testing[0]), type(testing_validation_one_hot), type(testing_validation_one_hot[0])]
num_types = [type(training[0][0]), type(training_validation_one_hot[0][0]), type(testing[0][0]), type(testing_validation_one_hot[0][0])]
type_check(array_types, num_types)      

if len(training)==len(training_validation_one_hot) and len(testing) == len(testing_validation_one_hot) and len(training[0])==len(testing[0]):
    print "training: " + str(len(training))
    print "testing: " + str(len(testing))
    print "num features: " + str(len(training[0]))
else:
    print "ERROR"

Extracting MNIST_data/train-images-idx3-ubyte.gz
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz
All types check out
training: 55000
testing: 10000
num features: 784


In [31]:
def shuffle_array(xs, ys):
    zipped = zip(xs,ys)
    random.shuffle(zipped)
    return zip(*zipped)
# print training_validation_one_hot[0:10]
type_check(array_types, num_types)

training, training_validation_one_hot = shuffle_array(training, training_validation_one_hot)
testing, testing_validation_one_hot = shuffle_array(testing, testing_validation_one_hot)

# print training_validation_one_hot[0:10]
type_check(array_types, num_types)

# print training[5:10]

All types check out
All types check out


In [32]:
'''
To classify images using a reccurent neural network, we consider every image row as a sequence of pixels.
Because MNIST image shape is 28*28px, we will then handle 28 sequences of 28 steps for every sample.
'''

# Parameters
learning_rate = 0.001
training_iters = 100000
batch_size = 128
display_step = 10

if testing_with_mnist:
    # Network Parameters
    n_input = 28 # MNIST data input (img shape: 28*28)
    n_steps = 28 # timesteps
    n_hidden = 128 # hidden layer num of features
    n_classes = 10 # MNIST total classes (0-9 digits)
else:
    # Parameters
    learning_rate = 0.001
    training_iters = 100000
    batch_size = 128 #TODO: play with
    display_step = 10

    # Network Parameters
    n_input = len(training[0][0])
    n_steps = len(training[0])
    n_hidden = 128 #TODO: play with
    n_classes = 2
    
#REMEMBER: mnist is row x column. my data is column x row. shouldn't actually matter

In [33]:
# tf Graph input
x = tf.placeholder("float", [None, n_steps, n_input])
istate = tf.placeholder("float", [None, 2*n_hidden]) #state & cell => 2x n_hidden
y = tf.placeholder("float", [None, n_classes])

# Define weights
weights = {
    'hidden': tf.Variable(tf.random_normal([n_input, n_hidden])), # Hidden layer weights
    'out': tf.Variable(tf.random_normal([n_hidden, n_classes]))
}
biases = {
    'hidden': tf.Variable(tf.random_normal([n_hidden])),
    'out': tf.Variable(tf.random_normal([n_classes]))
}

In [34]:
def RNN(_X, _istate, _weights, _biases):
    # input shape: (batch_size, n_steps, n_input)
    _X = tf.transpose(_X, [1, 0, 2])  # permute n_steps and batch_size
    # Reshape to prepare input to hidden activation
    _X = tf.reshape(_X, [-1, n_input]) # (n_steps*batch_size, n_input)
    # Linear activation
    _X = tf.matmul(_X, _weights['hidden']) + _biases['hidden']

    # Define a lstm cell with tensorflow
    lstm_cell = rnn_cell.BasicLSTMCell(n_hidden, forget_bias=1.0)
    # Split data because rnn cell needs a list of inputs for the RNN inner loop
    _X = tf.split(0, n_steps, _X) # n_steps * (batch_size, n_hidden)

    # Get lstm cell output
    outputs, states = rnn.rnn(lstm_cell, _X, initial_state=_istate)

    # Linear activation
    # Get inner loop last output
    return tf.matmul(outputs[-1], _weights['out']) + _biases['out']

In [35]:
pred = RNN(x, istate, weights, biases)

# Define loss and optimizer
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(pred, y)) # Softmax loss
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost) # Adam Optimizer

# Evaluate model
correct_pred = tf.equal(tf.argmax(pred,1), tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

In [36]:
# Initializing the variables
init = tf.initialize_all_variables()

# Launch the graph
with tf.Session() as sess:
    sess.run(init)
    step = 1
    # Keep training until reach max iterations
    while step * batch_size < training_iters:
        batch_xs, batch_ys = mnist.train.next_batch(batch_size)
        if testing_with_mnist:
            # Reshape data to get 28 seq of 28 elements
            batch_xs = batch_xs.reshape((batch_size, n_steps, n_input))
        # Fit training using batch data
        sess.run(optimizer, feed_dict={x: batch_xs, y: batch_ys, istate: np.zeros((batch_size, 2*n_hidden))})
        if step % display_step == 0:
            # Calculate batch accuracy
            acc = sess.run(accuracy, feed_dict={x: batch_xs, y: batch_ys, istate: np.zeros((batch_size, 2*n_hidden))})
            # Calculate batch loss
            loss = sess.run(cost, feed_dict={x: batch_xs, y: batch_ys, istate: np.zeros((batch_size, 2*n_hidden))})
            print "Iter " + str(step*batch_size) + ", Minibatch Loss= " + "{:.6f}".format(loss) + ", Training Accuracy= " + "{:.5f}".format(acc)
        step += 1
    print "Optimization Finished!"
    # Calculate accuracy for 256 mnist test images
    if testing_with_mnist:
        testing = testing.reshape((-1, 28, 28))
    print "Testing Accuracy:", sess.run(accuracy, feed_dict={x: testing, y: testing_validation_one_hot, istate: np.zeros((test_len, 2*n_hidden))})

Iter 1280, Minibatch Loss= 1.465753, Training Accuracy= 0.53125
Iter 2560, Minibatch Loss= 1.352810, Training Accuracy= 0.54688
Iter 3840, Minibatch Loss= 1.229517, Training Accuracy= 0.57812
Iter 5120, Minibatch Loss= 0.949928, Training Accuracy= 0.64062
Iter 6400, Minibatch Loss= 0.815072, Training Accuracy= 0.67188
Iter 7680, Minibatch Loss= 0.987152, Training Accuracy= 0.64844
Iter 8960, Minibatch Loss= 0.678250, Training Accuracy= 0.77344
Iter 10240, Minibatch Loss= 0.624892, Training Accuracy= 0.79688
Iter 11520, Minibatch Loss= 0.445168, Training Accuracy= 0.85156
Iter 12800, Minibatch Loss= 0.735893, Training Accuracy= 0.75000
Iter 14080, Minibatch Loss= 0.499550, Training Accuracy= 0.82812
Iter 15360, Minibatch Loss= 0.436378, Training Accuracy= 0.81250
Iter 16640, Minibatch Loss= 0.452826, Training Accuracy= 0.86719
Iter 17920, Minibatch Loss= 0.474916, Training Accuracy= 0.82031
Iter 19200, Minibatch Loss= 0.326172, Training Accuracy= 0.91406
Iter 20480, Minibatch Loss= 0.26