In [1]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import librosa

In [None]:
piano=librosa.load('/Users/vapnik/Downloads/piano/piano.mp3',sr=44100)
harp=librosa.load('/Users/vapnik/Downloads/harp/harp.mp3',sr=44100)
violin=librosa.load('/Users/vapnik/Downloads/violin/violin.mp3',sr=44100)

def divide_chunks(l, n):
    for i in range(0, len(l), n):
        yield l[i:i + n]

piano_splits = divide_chunks(piano[0],10000)
violin_splits = divide_chunks(violin[0],10000)
harp_splits = divide_chunks(harp[0],10000)
np.savez("piano",*piano_splits)
np.savez("harp",*harp_splits)
np.savez("violin",*violin_splits)

In [2]:
inst_files = ['./datasets/piano.npz',
              './datasets/violin.npz',
              './datasets/harp.npz']
T = 10000

inst_waves_list = [np.stack(list(np.load(inst_file).values())[:-1]) for inst_file in inst_files]


In [9]:
def mulaw(x, MU):
    return tf.sign(x) * tf.log(1. + MU * tf.abs(x)) / tf.log(1. + MU)

def inv_mulaw(x, MU):
    return tf.sign(x) * (1. / MU) * (tf.pow(1. + MU, tf.abs(x)) - 1.)
    
def naive_wavenet(inputs, condition, layers, h_filters, out_filters, name='naive_wavenet', reuse=False):
    with tf.variable_scope(name, reuse=reuse):
        
        outputs = tf.pad(inputs, [[0, 0], [1, 0], [0, 0]])
        outputs = tf.layers.conv1d(inputs=outputs, filters=h_filters, 
                                       kernel_size=2, dilation_rate=1, use_bias=False)
        dilation_sum = 1
        skips = []

        for layer in range(layers):
            dilation = 2 ** layer
            dilation_sum += dilation
            layer_outputs = tf.pad(outputs, [[0, 0], [dilation, 0], [0, 0]])
            filter_outputs = tf.layers.conv1d(inputs=layer_outputs, filters=h_filters, 
                                       kernel_size=2, dilation_rate=dilation, use_bias=False)
            gate_outputs = tf.layers.conv1d(inputs=layer_outputs, filters=h_filters, 
                                       kernel_size=2, dilation_rate=dilation, use_bias=False)
            if condition is not None:
                filter_condition = tf.layers.dense(condition, h_filters)
                gate_condition = tf.layers.dense(condition, h_filters)
            else:
                filter_condition = 0
                gate_condition = 0

            layer_outputs = tf.nn.tanh(filter_outputs + filter_condition) * \
                            tf.nn.sigmoid(gate_outputs + gate_condition)

            residual = tf.layers.dense(layer_outputs, h_filters)
            outputs += residual

            skip = tf.layers.dense(layer_outputs, h_filters)
            skips.append(skip)

        outputs = tf.nn.relu(sum(skips))
        outputs = tf.layers.dense(outputs, out_filters, activation=tf.nn.relu)
        outputs = tf.layers.dense(outputs, out_filters, activation=None)

    return dilation_sum, outputs

def downsample(inputs, pool_size, channel):
    #pad_size = (pool_size - 1) - (tf.shape(inputs)[1] - pool_size + 1) % pool_size
    #outputs = tf.pad(inputs, [[0, 0], [pad_size, 0], [0, 0]])
    outputs = tf.layers.average_pooling1d(inputs=inputs, pool_size=pool_size, strides=pool_size)
    #outputs = tf.reshape(outputs, [tf.shape(outputs)[0], tf.shape(outputs)[1], channel])
    pad_size = 1
    return pad_size, outputs

def upsample(inputs, output_size, channel):
    outputs = tf.expand_dims(inputs, axis=1)
    outputs = tf.image.resize_nearest_neighbor(outputs, [1, output_size])
    outputs = tf.squeeze(outputs, axis=1)
    outputs = tf.reshape(outputs, [tf.shape(outputs)[0], tf.shape(outputs)[1], channel])
    return outputs[:, -output_size:]

def domain_confusion(inputs, layers, domain_num, h_filters):
    outputs = inputs
    for layer in range(layers):
        dilation = 2 ** layers
        outputs = tf.layers.conv1d(inputs=outputs, filters=h_filters, kernel_size=2, 
                                   dilation_rate=1, activation=tf.nn.elu)
    
    outputs = tf.layers.dense(outputs, domain_num, activation=tf.nn.tanh)
    outputs = tf.layers.dense(outputs, domain_num)
    return outputs

In [10]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf
from tensorflow.python.framework import ops


class FlipGradientBuilder(object):
    def __init__(self):
        self.num_calls = 0

    def __call__(self, x, l=1.0):
        grad_name = "FlipGradient%d" % self.num_calls
        @ops.RegisterGradient(grad_name)
        def _flip_gradients(op, grad):
            return [tf.negative(grad) * l]
        
        g = tf.get_default_graph()
        with g.gradient_override_map({"Identity": grad_name}):
            y = tf.identity(x)
            
        self.num_calls += 1
        return y
    
flip_gradient = FlipGradientBuilder()

In [11]:
# hyper-parameters
MU = 256
LATENT_DIM = 64
POOL_SIZE = 400
INSTRUMENTS_NUM=3

In [12]:
tf.reset_default_graph()

'''

INPUT LAYER

'''
# wave input
x_holder = tf.placeholder(dtype=tf.float32, shape=[None, None])
x_mulaw = mulaw(x_holder, MU)
x_onehot_index = tf.clip_by_value(tf.cast((x_mulaw + 1.) * 0.5 * MU, tf.int32), 0, MU - 1)
x_onehot = tf.one_hot(x_onehot_index, depth=MU)

# label input
label_holder = tf.placeholder(dtype=tf.int32, shape=())

'''

ENCODER LAYER

'''

# encode
_, latents = naive_wavenet(inputs=tf.expand_dims(x_holder, axis=-1), condition=None, 
                           layers=9, h_filters=64, out_filters=LATENT_DIM, name='wavenet_encoder')

# downsample
_, down_latents = downsample(latents, POOL_SIZE, LATENT_DIM)

# upsample
up_latents = upsample(down_latents, tf.shape(x_holder)[1], LATENT_DIM)

'''

DOMAIN CONFUSION LAYER

'''

# gradient reversal layer
flipped_down_latents = flip_gradient(down_latents)
#flipped_down_latents = down_latents

# domain predict
label_predicts = domain_confusion(flipped_down_latents, 3, INSTRUMENTS_NUM, 128)
label_predicts = tf.reduce_mean(label_predicts, axis=1)
label_predicts_prob = tf.nn.softmax(label_predicts)
label_tiled = tf.tile(tf.expand_dims(label_holder, axis=0), [tf.shape(label_predicts)[0]])

# loss
domain_confusion_loss = tf.losses.sparse_softmax_cross_entropy(labels=label_tiled, logits=label_predicts)

'''

DECODER LAYER for traininng

'''
decode_losses = []
samples_list = []
for instrument_index in range(INSTRUMENTS_NUM):
    # decode
    dilation_sum, outputs = naive_wavenet(inputs=x_onehot, condition=up_latents, 
                                          layers=9, h_filters=64, out_filters=MU, 
                                          name='wavenet_decoder_' + str(instrument_index))
    outputs_probs = tf.nn.softmax(outputs)
    
    # sample from outputs
    dist = tf.distributions.Categorical(probs=outputs_probs)
    samples = inv_mulaw(tf.cast(dist.sample(), tf.float32) / MU * 2. - 1., MU)
    samples_list.append(samples)

    # loss
    decode_loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=x_onehot_index[:, dilation_sum + 1:],
                                                                 logits=outputs[:, dilation_sum:-1])
    decode_loss = tf.reduce_mean(decode_loss)
    decode_losses.append(decode_loss)

decode_losses = tf.stack(decode_losses, axis=0) * tf.one_hot(label_holder, depth=INSTRUMENTS_NUM)
decode_losses = tf.reduce_mean(decode_losses)

loss = decode_losses + 1e-2 * domain_confusion_loss
train_step = tf.train.AdamOptimizer(1e-3).minimize(loss)

'''

DECODER LAYER for inference

'''

# input for decoder
latents_holder = tf.placeholder(dtype=tf.float32, shape=[None, None, LATENT_DIM])

inference_sample_list = []

for instrument_index in range(INSTRUMENTS_NUM):
    # decode
    _, outputs = naive_wavenet(inputs=x_onehot, condition=latents_holder, 
                               layers=9, h_filters=64, out_filters=MU, 
                               name='wavenet_decoder_' + str(instrument_index), reuse=True)
    outputs_probs = tf.nn.softmax(outputs)

    # sample from outputs
    dist = tf.distributions.Categorical(probs=outputs_probs[:, -1])
    sample = inv_mulaw(tf.cast(dist.sample(), tf.float32) / MU * 2. - 1., MU)
    inference_sample_list.append(sample)

'''

SESSION CREATE

'''

config = tf.ConfigProto(
        device_count = {'GPU': 0}
    )
sess = tf.Session(config=config)

#sess = tf.Session()
sess.run(tf.global_variables_initializer())

saver = tf.train.Saver()
# Restore variables from disk.
saver.restore(sess, "./model12.ckpt")
print("Model restored.")

print('Tensorflow graph created.')


INFO:tensorflow:Restoring parameters from ./model12.ckpt
Model restored.
Tensorflow graph created.


In [13]:
src_instrument_index = 0
dest_instrument_index = 1

In [14]:
inst_waves_test = inst_waves_list[src_instrument_index][100:150]
inst_waves_test = np.vstack(inst_waves_test)

In [15]:
_down_latents, _label_predicts_prob, _samples_list = sess.run([down_latents, label_predicts_prob, samples_list], 
                                               feed_dict={x_holder: inst_waves_test})

In [16]:
import IPython.display as ipd
ipd.Audio(np.hstack(_samples_list[2]), rate=44100) # load a NumPy array

In [17]:
ipd.Audio(np.hstack(inst_waves_test), rate=44100)