In [1]:
import tensorflow as tf
import numpy as np
import librosa

np.random.seed(666)
np.set_printoptions(precision=5, suppress=True)

In [2]:
fft_size = 512
hop_size = 512
sequence_length = 60

audio_frames_length = fft_size * (sequence_length + 1)
audio_shape = [None, audio_frames_length]


rnn_sizes = [512, 512]
dense_sizes = [512, 512]
dense_activation = tf.nn.relu
weight_decay = 0.001
batch_norm = True
amount_epochs = 10000
batch_size = 64
learning_rate = 0.001

optimisation_method = 'adam' #'l-bfgs-b'

data, sr = librosa.core.load(mono=True, 
                             path="./assets/electronic_piano/HM_120_AF_EPiano5.wav")

In [3]:
def get_cell(size, keep_prob=1.0, depth=None):
    cell = tf.contrib.rnn.BasicLSTMCell(num_units=size)
    cell = tf.contrib.rnn.DropoutWrapper(cell,
                                         input_keep_prob=keep_prob,
                                         output_keep_prob=keep_prob,
                                         state_keep_prob=keep_prob)
    return cell


def dict_callback():
    return np.array([data[:audio_frames_length] for _ in range(batch_size)])


def loss_callback(l2, mag, pha):
    print("l2:", l2, "\nmag:", mag,"\npha:", pha) 

In [4]:
# tf.reset_default_graph()

# with tf.variable_scope("inputs"):
#     audio = tf.placeholder(tf.float32, 
#                            shape=audio_shape)
#     keep_prob = tf.placeholder(tf.float32)

# with tf.variable_scope("stft"):
#     stfts = tf.contrib.signal.stft(audio, 
#                                    frame_length=fft_size, 
#                                    frame_step=hop_size,
#                                    fft_length=fft_size,
#                                    pad_end=False)

# with tf.variable_scope("cart2polar"):    
#     real = tf.real(stfts)
#     imag = tf.imag(stfts)
#     magnitudes = tf.abs(stfts)
#     phases = tf.atan2(imag, real)

#     # Quadrants
#     condition = real < 0.0
#     phases = tf.where(condition, phases, phases + np.pi)

# with tf.variable_scope("input_target_split"):
#     input_magnitudes = magnitudes[:, :sequence_length]
#     input_phases = phases[:, :sequence_length]
#     target_magnitudes = magnitudes[:, -1]
#     target_phases = phases[:, -1]

# with tf.variable_scope("mag_phases_concat"):
#     features = tf.concat([input_magnitudes, input_phases], axis=2)

#     if batch_norm:
#         features = tf.contrib.layers.batch_norm(features)
        
# with tf.variable_scope("rnn"):
#     depth = features.get_shape()[2]
#     cells_fw = []
#     cells_bw = []
#     for size in rnn_sizes:
#         cell_fw = get_cell(size, keep_prob, depth)
#         cell_bw = get_cell(size, keep_prob, depth)
#         cells_fw.append(cell_fw)
#         cells_bw.append(cell_bw)

#     outputs, output_state_fw, output_state_bw = \
#         tf.contrib.rnn.stack_bidirectional_dynamic_rnn(
#             cells_fw,
#             cells_bw,
#             features,
#             dtype=tf.float32,
#             time_major=False)

#     last_rnn_output = outputs[:, -1, :]

# with tf.variable_scope("dense"):
#     dense = last_rnn_output
#     for size in dense_sizes:

#         dense = tf.contrib.layers.fully_connected(inputs=dense, 
#                                                   num_outputs=size, 
#                                                   activation_fn=dense_activation)  
#         dense = tf.contrib.layers.dropout(inputs=dense, keep_prob=keep_prob)

#         if batch_norm:
#             dense = tf.contrib.layers.batch_norm(dense)

#     logits = tf.contrib.layers.fully_connected(inputs=dense, 
#                                                num_outputs=int(depth), 
#                                                activation_fn=None)
# split_size = int(depth) // 2

# with tf.variable_scope("mag_phase_predict_split"):
#     predicted_magnitudes = logits[:, :split_size] 
#     predicted_phases = logits[:, split_size:]    


# with tf.variable_scope("loss"):
#     magnitude_loss = tf.losses.mean_squared_error(target_magnitudes, predicted_magnitudes)
#     phase_loss = tf.losses.mean_squared_error(target_phases, predicted_phases)

#     l2_losses = tf.add_n([tf.nn.l2_loss(v) for v in tf.trainable_variables() 
#                           if 'bias' not in v.name and 'BatchNorm' not in v.name])
#     l2_loss = l2_losses * weight_decay

#     update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)

#     with tf.control_dependencies(update_ops):
#         #loss = l2_loss + magnitude_loss + phase_loss
#         loss = magnitude_loss
    
# loss_fetches = [l2_loss, magnitude_loss, phase_loss]

# feed_dict = {
#     audio: dict_callback(),
#     keep_prob: 0.2
# }

# if optimisation_method == 'l-bfgs-b':

#     optimiser = tf.contrib.opt.ScipyOptimizerInterface(loss,
#                                                        method='L-BFGS-B',
#                                                        options={'maxiter': iterations})

#     with tf.Session() as sess:
#         sess.run(tf.global_variables_initializer())

#         print('Started optimisation.')

#         optimiser.minimize(sess, 
#                            loss_callback=loss_callback, 
#                            fetches=loss_fetches, 
#                            feed_dict=feed_dict)
        
# else:
    
#     if optimisation_method == 'adam':
#         optimiser = tf.train.AdamOptimizer(learning_rate=learning_rate)
        
#     optimise = optimiser.minimize(loss)
    
#     with tf.Session() as sess:
#         sess.run(tf.global_variables_initializer())
        
#         print('Started optimisation.')
        
#         for i in range(iterations):
#             print(sess.run([loss] + loss_fetches, feed_dict=feed_dict))

In [5]:
class AudioGenerator():
    
    def __init__(self, audio_path, sample_rate, window_size, sequence_length):
        data, _ = librosa.core.load(mono=True,
                                    path=audio_path,
                                    sr=sample_rate)
        self.audio_frames = []
        skip = 128
        audio_frames_length = window_size * (sequence_length + 1)
        for start in range(0, len(data) - audio_frames_length, skip):
            end = start + audio_frames_length
            self.audio_frames.append(data[start:end])
        self.audio_frames = np.array(self.audio_frames)
        self.index = 0
        self.epochs = 0
        np.random.shuffle(self.audio_frames)
        
        
    def next_batch(self, batch_size):
        if self.index + batch_size > len(self.audio_frames):
            self.index = 0
            self.epochs += 1
            np.random.shuffle(self.audio_frames)
        
        self.index += batch_size
        return self.audio_frames[self.index:self.index + batch_size], self.epochs

In [8]:
import tflearn
   
                      
def recurrent_net(net, rec_type, rec_size, return_sequence):
    """
    A quick if else block to build a recurrent layer, based on the type specified
    by the user.
    """
    if rec_type == 'lstm':
        net = tflearn.layers.recurrent.lstm(net, rec_size, return_seq=return_sequence)
    elif rec_type == 'gru':
        net = tflearn.layers.recurrent.gru(net, rec_size, return_seq=return_sequence)
    elif rec_type == 'bi_lstm':
        net = bidirectional_rnn(net, 
                                BasicLSTMCell(rec_size), 
                                BasicLSTMCell(rec_size), 
                                return_seq=return_sequence)
    elif rec_type == 'bi_gru':
        net = bidirectional_rnn(net, 
                                GRUCell(rec_size), 
                                GRUCell(rec_size), 
                                return_seq=return_sequence)
    else:
        raise ValueError('Incorrect rnn type passed. Try lstm, gru, bi_lstm or bi_gru.')
    return net


tf.reset_default_graph()

with tf.variable_scope("inputs"):
    audio = tf.placeholder(tf.float32, 
                           shape=audio_shape)
    
    keep_prob = 0.5
with tf.variable_scope("stft"):
    stfts = tf.contrib.signal.stft(audio, 
                                   frame_length=fft_size, 
                                   frame_step=hop_size,
                                   fft_length=fft_size,
                                   pad_end=False)

with tf.variable_scope("cart2polar"):    
    real = tf.real(stfts)
    imag = tf.imag(stfts)
    magnitudes = tf.abs(stfts)
    phases = tf.atan2(imag, real)
#     features = tf.concat([real[:, :sequence_length], imag[:, :sequence_length]], axis=2)
#     target_features = tf.concat([real[:, -1], imag[:, -1]], axis=1)
    # Quadrants
    condition = real < 0.0
    phases = tf.where(condition, phases, phases + np.pi)

with tf.variable_scope("input_target_split"):
    input_magnitudes = magnitudes[:, :sequence_length]
    input_phases = phases[:, :sequence_length]
    target_magnitudes = magnitudes[:, -1]
    target_phases = phases[:, -1]
    target_features = tf.concat([target_magnitudes, target_phases], axis=1)
    

with tf.variable_scope("mag_phases_concat"):
    features = tf.concat([input_magnitudes, input_phases], axis=2)

    if batch_norm:
        features = tf.contrib.layers.batch_norm(features)
        
net = features

# Recurrent
for layer, size in enumerate(rnn_sizes):
    return_sequence = False if layer == (len(rnn_sizes) - 1) else True
    net = recurrent_net(net, 'lstm', size, return_sequence)
    net = tflearn.dropout(net, keep_prob) 
    
    
# Dense + MLP Out
net = tflearn.fully_connected(net, 
                              features.get_shape()[2], 
                              activation=dense_activation,                                            
                              regularizer='L2', 
                              weight_decay=0.001)
                      
logits = tflearn.fully_connected(net, 
                                 features.get_shape()[2], 
                                 activation='linear')

split_size = int(features.get_shape()[2]) // 2

with tf.variable_scope("mag_phase_predict_split"):
    predicted_magnitudes = logits[:, :split_size] 
    predicted_phases = logits[:, split_size:]        

predicted_real = predicted_magnitudes * tf.cos(predicted_phases)
predicted_imag = predicted_magnitudes * tf.sin(predicted_phases)
# predicted_real = logits[:, :split_size] 
# predicted_imag = logits[:, split_size:]        
predicted_stft = tf.complex(predicted_real, predicted_imag)
predicted_audio = tf.contrib.signal.inverse_stft(
    predicted_stft,
    frame_length=fft_size, 
    frame_step=hop_size,
    fft_length=fft_size,
    window_fn=None,
    name=None
)

loss = tf.losses.mean_squared_error(logits, target_features)

black_list = ['BatchNorm', 'batch_norm', 'LSTM', 'lstm', 'bias', '/b:']
regulisable_vars = []
for var in tf.trainable_variables():
    if not any([bad in var.name for bad in black_list]):
        regulisable_vars.append(tf.nn.l2_loss(var))

l2_losses = tf.add_n(regulisable_vars)
l2_loss = l2_losses * weight_decay

update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)

with tf.control_dependencies(update_ops):
    loss += l2_loss


optimiser = tf.train.AdamOptimizer(learning_rate=learning_rate)
        
optimise = optimiser.minimize(loss)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    
    audio_generator = AudioGenerator("./assets/electronic_piano/HM_120_AF_EPiano5.wav",
                                     44100,
                                     fft_size,
                                     sequence_length)
    

    print('Started optimisation.')
    
    amount_iterations = len(data) // fft_size
    
    generation_step = 20
    generation_length = 400
    generated_audio = []

    for epoch in range(amount_epochs):
        
        epoch = 0
        last_epoch = 0
            
        while epoch != amount_epochs:
            
            audio_frames, epoch = audio_generator.next_batch(batch_size)

            sess.run(optimise, feed_dict={
                audio: audio_frames
            })
            print(epoch, sess.run(loss, feed_dict={
                audio: audio_frames
            }))
            
            if epoch != last_epoch:
                
                print('Generating.')
                
                index = np.random.randint(len(audio_generator.audio_frames))
                impulse = audio_generator.audio_frames[index]
                impulse_size = len(impulse)

                for _ in range(generation_length):
                    
                    predicted_audio_frames = sess.run(predicted_audio, feed_dict={
                        audio: impulse[-impulse_size:].reshape((1, -1))
                    })
                    
                    impulse = np.concatenate((impulse, predicted_audio_frames))  
                    
                generated_audio.append(impulse)
            
            last_epoch = epoch
                    
                
            

Started optimisation.
0 4.69977
0 4.56812
0 4.35947
0 3.91337
0 3.33316
0 3.54233
0 3.2507
0 3.38724
0 3.41367
0 3.33194
0 3.43998
0 3.30345
0 3.1323
0 3.29256
0 3.36088
0 3.21344
0 3.26929
0 3.24265
0 3.18753
0 3.09822
0 3.30949
0 3.29899
0 3.15541
0 3.20001
0 3.16746
0 3.15145
0 3.14823
0 3.09728
0 3.19563
0 3.18632
0 3.17994
0 3.41437
0 3.18495
0 3.44275
0 3.30517
0 3.20687
0 3.33659
0 3.20347
0 3.22134
0 3.23398
0 3.10214
0 3.05008
0 3.18602
0 3.10942
0 3.07576
0 3.14645
0 3.05114
0 3.16717
0 3.06776
0 3.11272
0 3.02387
0 3.06713
0 3.04586
0 3.06357
0 3.08185
0 3.08277
0 3.03246
0 3.01378
0 3.10497
0 3.16322
0 3.01712
0 3.04499
0 3.07662
0 3.03647
0 3.02252
0 3.0338
0 3.04171
0 3.0068
0 3.07898
0 3.04204
0 3.05539
0 3.03689
0 3.12651
0 2.9993
0 3.02557
0 3.08083
0 3.19327
0 3.00992
0 2.97772
0 3.08388
0 2.98824
0 3.0012
1 3.0701
Generating.
1 3.17973
1 3.04889
1 3.07819
1 3.00123
1 3.04795
1 3.02929
1 3.01186
1 3.12107
1 2.97975
1 3.04115
1 2.97143
1 3.04005
1 3.01592
1 3.01601
1 3

KeyboardInterrupt: 

In [9]:
generated_audio = np.array(generated_audio).reshape((-1))

import IPython.display as ipd
ipd.Audio(generated_audio, rate=44100)
librosa.output.write_wav('generated_audio.wav', generated_audio, 44100)