## Using TensorFlow to Generate Images with PixelCNNs
import all the necessary dependencies.


In [0]:
import os

import numpy as np
from tqdm import tqdm
import tensorflow as tf

from utils import *
from ops import *
# from statistic import Statistic

### Set up the parameters
Here we set all the paramaters for the PixelRNN model. These include the model hyperparameters, some dataset properties, and debugging information. We also set up the random seeds for Tensorflow and Numpy.

In [0]:
hyperparams = {# network
    "model" : "pixel_cnn", # name of model [pixel_rnn, pixel_cnn]
    "batch_size" : 32, # size of a batch
    "hidden_dims" : 16, # dimesion of hidden states of LSTM or Conv layers
    "recurrent_length" : 8, # the length of LSTM or Conv layers
    "out_hidden_dims" : 32, # dimesion of hidden states of output Conv layers
    "out_recurrent_length" : 4, # the length of output Conv layers
    "use_residual" : False, # whether to use residual connections or not
    "use_dynamic_rnn" : False, # whether to use dynamic_rnn or not

    # training
    "max_epoch" : 200, # # of step in an epoch
    "test_step" : 10, # # of step to test a model
    "save_step" : 5, # # of step to save a model
    "learning_rate" : 1e-3, # learning rate
    "grad_clip" : 1, # value of gradient to be used for clipping
    "use_gpu" : True, # whether to use gpu for training

    #data
    "x_path": "music_x",
    "y_path": "music_y",
    "test_path":"test",
    
    # Debug
    "is_train" : True, # training or testing
    "display" : False, # whether to display the training results or not
    "random_seed" :  123 # random seed for python
}
p = dotdict(hyperparams)

In [0]:
if "random_seed" in p:
    tf.set_random_seed(p.random_seed)
    np.random.seed(p.random_seed)

### Prepare the dataset

The music dataset for this project is 10 songs from one of Jay chou's albums. Use librosa to load the music data.

In [0]:
xfile = os.listdir(p.x_path)
yfile = os.listdir(p.y_path)
testfile = os.listdir(p.test_path)
time_steps = 512
height = 1
width = time_steps
X_train=np.zeros((1,time_steps))
Y_train=np.zeros((1,time_steps))
X_test=np.zeros((1,time_steps))

In [0]:
import librosa


for file in xfile:
  try:
    y, sr = librosa.load(p.y_path+'/'+file, mono=True)
    yc, sr = librosa.load(p.x_path+'/'+file, mono=True)
    
    music_length = len(y)
    sequence_size = int(music_length/time_steps)
    
    x_train = np.zeros((sequence_size,time_steps))
    y_train = np.zeros((sequence_size,time_steps))
    
    for i in range(sequence_size):
      x_train[i] = yc[time_steps*i:time_steps*(i+1)]
      y_train[i] = y[time_steps*i:time_steps*(i+1)]
  
    x_train = x_train.reshape(sequence_size,time_steps)[100:-100]
    y_train = y_train.reshape(sequence_size,time_steps)[100:-100]
    
    X_train = np.concatenate((X_train,x_train))
    Y_train = np.concatenate((Y_train,y_train))
    
  except IsADirectoryError:
    pass
  except FileNotFoundError:
    pass
  
  
for file in testfile:
  try:
    y, sr = librosa.load(p.test_path+'/'+file, mono=True)
    music_length = len(y)
    sequence_size = int(music_length/time_steps)
    x_test = np.zeros((sequence_size,time_steps))
    
    for i in range(sequence_size):
      x_test[i] = yc[time_steps*i:time_steps*(i+1)]
      
    x_test = x_test.reshape(sequence_size,time_steps)[100:-100]
    X_test = np.concatenate((X_test,x_test))
    
  except IsADirectoryError:
    pass
  except FileNotFoundError:
    pass
    
# l = int(len(X_train)*0.2)

# X_test = X_train[:l]
# Y_test = Y_train[:l]

# X_train = X_train[l:]
# Y_train = Y_train[l:]

### Setting up network

Let's construct the PixelCNN model. First, we set up input placeholder. We'll be feeding batches of training images into the model through this.

Next, we construct the masked convolutional layers. You can find the implementation of this masking procedure in ```ops.py```. These layers apply a series of convolutions to the image, where each filter is masked to only account for pixels in the region of interest. These are the pixels above and to the left of the pixel in the center of the mask, which follows the PixelRNN generative model assumptions. Also of note is that the receptive field of the PixelCNN model grows linearly with the depth of these convolutional stacks.

In [34]:
def pixelRNN(height, width, channel, params):
    """
    Args
    height, width, channel - the dimensions of the input
    params -- the hyperparameters of the network
    """
    input_shape = [None, height, width, channel] if params.use_gpu else [None, channel, height, width]
    inputs = tf.placeholder(tf.float32, input_shape, name='inputs')
    truth = tf.placeholder(tf.float32, input_shape, name='truth')
    # input of main convolutional layers
    scope = "conv_inputs"

    conv_inputs = conv2d(inputs, params.hidden_dims, [1,63], "A", scope=scope)
    # main convolutions layers    
    last_hid = conv_inputs
    for idx in range(params.recurrent_length):
        scope = 'CONV%d' % idx
        last_hid = conv2d(last_hid, 16, [1, 1], "B", scope=scope)
        print("Building %s" % scope)

    # output convolutional layers
    for idx in range(params.out_recurrent_length):
        scope = 'CONV_OUT%d' % idx
        last_hid = tf.nn.relu(conv2d(last_hid, params.out_hidden_dims, [1, 1], "B", scope=scope))
        print("Building %s" % scope)

    conv2d_out_logits = conv2d(last_hid, 1, [1, 1], "B", scope='conv2d_out_logits')
#     output = tf.nn.tanh(conv2d_out_logits)
    output = conv2d_out_logits
#     output = tf.layers.dense(conv2d_out_logits,units=1)
    return inputs, truth, output, conv2d_out_logits

tf.reset_default_graph()
inputs, truth, output, conv2d_out_logits = pixelRNN(height, width, channel, p)

Building CONV0
Building CONV1
Building CONV2
Building CONV3
Building CONV4
Building CONV5
Building CONV6
Building CONV7
Building CONV_OUT0
Building CONV_OUT1
Building CONV_OUT2
Building CONV_OUT3


### Optimization

Now, let's train the model. To do so, we will minimize the cross entropy loss using an RMSPropOptimizer. We also clip the gradients to help deal with potential exploding gradient problems.

In [35]:
loss = tf.reduce_mean(tf.losses.mean_squared_error(truth,output))


# loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=conv2d_out_logits, labels=truth, name='loss'))

optimizer = tf.train.RMSPropOptimizer(p.learning_rate)
grads_and_vars = optimizer.compute_gradients(loss)

new_grads_and_vars = \
    [(tf.clip_by_value(gv[0], -p.grad_clip, p.grad_clip), gv[1]) for gv in grads_and_vars]
optim = optimizer.apply_gradients(new_grads_and_vars)
# optim = optimizer.apply_gradients(grads_and_vars)

# show_all_variables()
print("Building %s finished!" % p.model)

Building pixel_cnn finished!


### Image generation
To generate an image, we predict a single pixel at a time. Once we generate a pixel, the next prediction will use the previous pixels to generate the next pixel intensity using the masked convolutions. 

In [0]:
def predict(sess, music, inputs, output):
    return sess.run(output, {inputs: music})
  

def generate(sess, inputs, output, sample):
    a = sess.run(output, {inputs:sample.reshape((4518,1,time_steps,1))})
    result = a.flatten()
    librosa.output.write_wav('result.wav',result,sr)
    ipd.Audio('result.wav')

    return a

### Training


In [37]:
# with tf.Session() as sess:
sess = tf.Session()
init = tf.global_variables_initializer()

sess.run(init)
print("Start training")

# initial_step = stat.get_t() if stat else 0
# iterator = range(p.max_epoch, ncols=70, initial=initial_step)
iterator = tqdm(range(p.max_epoch))

i=0
for epoch in iterator:
    # print('Start epoch')
    # 1. train
    total_train_costs = []
    for idx in range(p.max_epoch):
        if (i+1)*p.batch_size > len(x_train):
            i=0
        x = X_train[i*p.batch_size:(i+1)*p.batch_size].reshape([p.batch_size, height, width, channel])
        y = Y_train[i*p.batch_size:(i+1)*p.batch_size].reshape([p.batch_size, height, width, channel])
        i += 1
        _, cost = sess.run([optim, loss], feed_dict={inputs: x, truth: y})
        total_train_costs.append(cost)
        
#     print('Start testing')
    # 2. test
#     total_test_costs = []
#     for idx in range(p.test_step):
#         if (i+1)*p.batch_size > len(x_train):
#             i=0
#         x = X_test[i*p.batch_size:(i+1)*p.batch_size].reshape([p.batch_size, height, width, channel])
#         y = Y_test[i*p.batch_size:(i+1)*p.batch_size].reshape([p.batch_size, height, width, channel])
#         i += 1
#         cost = sess.run(loss, feed_dict={inputs: x, truth: y})
#         total_test_costs.append(cost)

#     avg_train_cost, avg_test_cost = np.mean(total_train_costs), np.mean(total_test_costs)
    avg_train_cost = np.mean(total_train_costs)
    # print('Start generation')
    # 3. generate samples
#     samples = generate_occlusions(sess, height, width, inputs, output)
#     iterator.set_description("train loss: %.3f, test loss: %.3f" % (avg_train_cost, avg_test_cost))
    iterator.set_description("train loss: %.3f" % avg_train_cost)


  0%|          | 0/200 [00:00<?, ?it/s][A

Start training



train loss: 0.053:   0%|          | 0/200 [00:17<?, ?it/s][A
train loss: 0.053:   0%|          | 1/200 [00:17<58:43, 17.70s/it][A
train loss: 0.054:   0%|          | 1/200 [00:34<58:43, 17.70s/it][A
train loss: 0.054:   1%|          | 2/200 [00:34<57:41, 17.48s/it][A
train loss: 0.053:   1%|          | 2/200 [00:51<57:41, 17.48s/it][A
train loss: 0.053:   2%|▏         | 3/200 [00:51<56:50, 17.31s/it][A
train loss: 0.055:   2%|▏         | 3/200 [01:08<56:50, 17.31s/it][A
train loss: 0.055:   2%|▏         | 4/200 [01:08<56:13, 17.21s/it][A
train loss: 0.049:   2%|▏         | 4/200 [01:25<56:13, 17.21s/it][A
train loss: 0.049:   2%|▎         | 5/200 [01:25<55:43, 17.15s/it][A
train loss: 0.058:   2%|▎         | 5/200 [01:42<55:43, 17.15s/it][A
train loss: 0.058:   3%|▎         | 6/200 [01:42<55:11, 17.07s/it][A
train loss: 0.053:   3%|▎         | 6/200 [01:59<55:11, 17.07s/it][A
train loss: 0.053:   4%|▎         | 7/200 [01:59<54:49, 17.04s/it][A
train loss: 0.055:   4%|▎  

In [0]:
import IPython.display as ipd

a = sess.run(output, {inputs:X_test.reshape((len(X_test),1,time_steps,1))})
result = a.flatten()
librosa.output.write_wav('result.wav',result,sr)
# ipd.Audio('result.wav')

Once we are done, we close our Tensorflow session.

In [0]:
sess.close()