## Ejemplo de generacion de audio con WGAN


### En este ejemplo se busca utilizar la arquitectura WGAN (Wasserstein GAN) para generar audio. Esta arquitectura de red neuronal es similar a la usada para generar imagenes, pero implementa una función de perdida diferente. Además utilizamos penalización de gradiente al entrenar al modelo discriminador, para evitar que su perdida disminuya a cero.


Este tutorial está basado en el trabajo de [HStuart18](https://github.com/HStuart18/tfworldhackathon) que es una implementación de la arquitectura de [WaveGAN](https://github.com/chrisdonahue/wavegan)

Iportante: No se recomienda correr esta red sin una GPU, pues es muy pesada.

### Creación del modelo.


Primero importamos las librerías necesarias y creamos variables para la GPU.

In [1]:
from __future__ import absolute_import, division, print_function, unicode_literals
%load_ext tensorboard
from scipy.io import wavfile
import librosa
import json
from sklearn.model_selection import train_test_split
import tensorflow as tf
import random
import glob
import imageio
import matplotlib.pyplot as plt
import numpy as np
import os
import PIL
import time
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession
from IPython import display
from tensorflow.python.client import device_lib
import tensorflow.keras.backend as K
import librosa.display
import os
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Lambda, Dense, LSTM, Activation, Input, Bidirectional, Dropout, Conv1DTranspose
from tensorflow.keras.layers import Reshape, Conv2DTranspose, TimeDistributed, Conv1D, LeakyReLU, Layer, ReLU
from tensorflow.keras.optimizers import Adam
import soundfile as sf
# Definimos un uso máximo de memoria de GPU
config = ConfigProto()
config.gpu_options.allow_growth=True 
session = InteractiveSession(config=config)
from tensorflow.keras.models import load_model




 Esta función crea nuestro modelo discriminador. A diferencia de nuestro ejemplo que utiliza imágenes, este modelo se basa en convoluciones unidimensionales. Esto es porque, en la arquitectura WaveGAN, el audio se recibe como un vector con tantos elementos como puntos muestreados, a diferencia de la arquitectura DCGAN que funciona en base a matrices. 
 
Nuestro modelo funcionará en base a intervalos de 1 segundo de audio, con un samplerate de 16384.

Usamos phse shuffle en el modelo para aleatorizar la forma que el discriminador percibe las entradas. Esto es necesario pues al generarlas se suelen producir patrones, y no queremos que esto condicione al discriminador.

In [2]:
# def make_discriminator_model():
def Critic(d, num_samples, c=1):
    model = tf.keras.Sequential()
    model.add(Input(shape=(num_samples, 1)))
    model.add(Conv1D(c*d, 25, strides=4, padding = 'same'))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Lambda(lambda x: _apply_phaseshuffle(x)))
    # Aplicamos phaseshuffle para aleatorizar las salidas de cada capa, dentro de un margen.
    # [4096, 64]
    
    c *= 2
    model.add(Conv1D(c*d, 25 , strides = 4, padding = 'same'))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Lambda(lambda x: _apply_phaseshuffle(x)))
    #[1024, 128]
    
    c *= 2
    model.add(Conv1D(c*d, 25,  strides = 4, padding = 'same'))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Lambda(lambda x: _apply_phaseshuffle(x)))
    #[256, 256]
    c *= 2
    model.add(Conv1D(c*d, 25,  strides = 4, padding = 'same'))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Lambda(lambda x: _apply_phaseshuffle(x)))
    #[64, 512]
    c *= 2
    model.add(Conv1D(c*d, 25, strides = 4, padding = 'same'))
    model.add(LeakyReLU(alpha=0.2))
    #[16, 1024]
    

    model.add(Reshape((16*c*d,)))
    
    model.add(Dense(1))
    

    return model

def Generator(d, num_samples, c=16):
    
    model = tf.keras.Sequential()
    
    model.add(tf.keras.layers.Input(shape=(100,)))

    # output shape = (None, 16, 16d)
    model.add(Dense(256*d))
    model.add(Reshape((16, 16*d)))
    model.add(ReLU())

    # Upsampling
    # output shape = (None, 64, 8d)
    c //= 2
    model.add(Conv1DTranspose(c*d,25,strides = 4, padding = 'same'))
    model.add(ReLU())

    # output shape = (None, 256, 4d)
    c //= 2
    model.add(Conv1DTranspose(c*d,25,strides = 4, padding = 'same'))
    model.add(ReLU())

    # output shape = (None, 1024, 2d)
    c //= 2
    model.add(Conv1DTranspose(c*d,25,strides = 4, padding = 'same'))
    model.add(ReLU())

    # output shape = (None, 4096, d)
    c //= 2
    model.add(Conv1DTranspose(c*d,25,strides = 4, padding = 'same'))
    model.add(ReLU())

    # output shape = (None, 16384, 1)
    model.add(Conv1DTranspose(1,25,strides = 4, padding = 'same'))


    #### The number of transposed convolution operations  should be modified
    #### in accordance with num_samples. This current architecture expects
    #### num_samples == 16384

    # Squeeze values between (-1, 1)
    model.add(Activation('tanh'))


    return model

# Makes critic invariant to upsampling artifacts of generator to avoid the critic learning to
# easily identify generated audio from said artifacts
def _apply_phaseshuffle(x, rad=2, pad_type='reflect'):
    b, x_len, nch = x.get_shape().as_list()

    phase = tf.random.uniform([], minval=-rad, maxval=rad + 1, dtype=tf.int32)
    pad_l = tf.maximum(phase, 0)
    pad_r = tf.maximum(-phase, 0)
    phase_start = pad_r
    x = tf.pad(x, [[0, 0], [pad_l, pad_r], [0, 0]], mode=pad_type)

    x = x[:, phase_start:phase_start+x_len]
    x.set_shape([b, x_len, nch])

    return x


 Nuestro generador funciona de manera similar al ejemplo de creación de imágenes, pero usando convolusiones unidimensionales.

In [3]:
# Hiperparametros y directorios necesarios
MODEL_DIMS = 64
NUM_SAMPLES = 16384
D_UPDATES_PER_G_UPDATE = 5
GRADIENT_PENALTY_WEIGHT = 10.0
NOISE_LEN = 100
EPOCHS = 834 #1600
EPOCHS_PER_SAMPLE = 2
BATCH_SIZE = 16
Fs = 16000

DATA_DIR = "piano/train"
INSTRUMENT = "piano"

print("Creating necessary directories")

paths = ["logs/traintest3", 
         f"models/{INSTRUMENT}/js",
         f"output/{INSTRUMENT}",]

for path in paths:
    if not os.path.exists(os.path.join(os.getcwd(), path)):
        os.makedirs(path)

Creating necessary directories


In [4]:
class GAN:
    def __init__(self, model_dims=MODEL_DIMS, num_samples=NUM_SAMPLES, 
                 gradient_penalty_weight=GRADIENT_PENALTY_WEIGHT, instrument=INSTRUMENT,
                 noise_len=NOISE_LEN, batch_size=BATCH_SIZE, sr=Fs):
        self.model_dims = model_dims
        self.num_samples = num_samples
        self.noise_dims = (noise_len,)
        self.batch_size = batch_size
        
        self.G = Generator(self.model_dims, num_samples)
        print(self.G.summary())

        self.D = Critic(self.model_dims, num_samples)
        print(self.D.summary())
        
        self.G_optimizer = Adam(learning_rate=1e-4, beta_1=0.5, beta_2=0.9)
        self.D_optimizer = Adam(learning_rate=1e-4, beta_1=0.5, beta_2=0.9)
        
        self.gradient_penalty_weight = gradient_penalty_weight
        
        self.sr = sr

        self.instrument = INSTRUMENT

    # Loss function for critic
    def _d_loss_fn(self, r_logit, f_logit):
        r_loss = - tf.reduce_mean(r_logit)
        f_loss = tf.reduce_mean(f_logit)
        return r_loss, f_loss
    
    # Loss function for generator
    def _g_loss_fn(self, f_logit):
        f_loss = - tf.reduce_mean(f_logit)
        return f_loss

    # Calculates gradient penalty
    def _gradient_penalty(self, real, fake):
        def _interpolate(a, b):
            shape = [tf.shape(a)[0]] + [1] * (a.shape.ndims - 1)
            alpha = tf.random.uniform(shape=shape, minval=0., maxval=1.)
            inter = a + alpha * (b - a)
            inter.set_shape(a.shape)
            return inter
        x = _interpolate(real, fake)
        with tf.GradientTape() as t:
            t.watch(x)
            pred = self.D(x, training=True)
            
        grad = t.gradient(pred, x)
        norm = tf.norm(tf.reshape(grad, [tf.shape(grad)[0], -1]), axis=1)
        gp = tf.reduce_mean((norm - 1.)**2)

        return gp
        
    # Trains generator by keeping critic constant
    @tf.function
    def train_G(self, k):
        with tf.GradientTape() as t:
            z = tf.random.normal(shape=(self.batch_size,) + self.noise_dims)
            x_fake = self.G(z, training=True)
            x_fake_d_logit = self.D(x_fake, training=True)
            topk_predictions = tf.math.top_k(tf.transpose(x_fake_d_logit),k)
            G_loss = self._g_loss_fn(tf.transpose(topk_predictions.values))
            losstest = self._g_loss_fn(x_fake_d_logit)

        G_grad = t.gradient(G_loss, self.G.trainable_variables)
        self.G_optimizer.apply_gradients(zip(G_grad, self.G.trainable_variables))

        return {'g_loss': G_loss}

    # Trains critic by keeping generator constant
    @tf.function
    def train_D(self, x_real):
        with tf.GradientTape() as t:
            z = tf.random.normal(shape=(x_real.shape[0],) + self.noise_dims)
            x_fake = self.G(z, training=True)

            x_real_d_logit = self.D(x_real, training=True)
            x_fake_d_logit = self.D(x_fake, training=True)

            x_real_d_loss, x_fake_d_loss = self._d_loss_fn(x_real_d_logit, x_fake_d_logit)
            gp = self._gradient_penalty(x_real, x_fake)

            D_loss = (x_real_d_loss + x_fake_d_loss) + gp * self.gradient_penalty_weight

        D_grad = t.gradient(D_loss, self.D.trainable_variables)
        self.D_optimizer.apply_gradients(zip(D_grad, self.D.trainable_variables))

        return {'d_loss': x_real_d_loss + x_fake_d_loss, 'gp': gp}
        
    # Creates music samples and saves current generator model
    def sample(self, epoch, num_samples=10):
        self.G.save(f"models/{epoch}.h5")
        z = tf.random.normal(shape=(num_samples,) + self.noise_dims)
        result = self.G(z, training=False)
        for i in range(num_samples):
            audio = result[i, :, :]
            audio = np.reshape(audio, (self.num_samples,))
            sf.write(f"output/{self.instrument}/{epoch}-{i}.wav", 
                                     audio, samplerate=self.sr)
        return(z)
    def sample_vec(self, vec, num_samples=1):
        z = vec
        result = self.G(z, training=False)
        for i in range(num_samples):
            audio = result[i, :, :]
            audio = np.reshape(audio, (self.num_samples,))
            sf.write(f"output/{self.instrument}/sample-{i}.wav", 
                                     audio, samplerate=self.sr)
    
    def sample_progres(self):
        z = tf.random.normal(shape=(1,) + (NOISE_LEN,))
        for model_num in range(0, EPOCHS-1):
            if model_num%50 == 0 or model_num == EPOCHS-2:
                model = load_model(f'models_1/{model_num}.h5')
                self.G = model
                result = self.G(z, training=False)
                audio = np.reshape(result, (self.num_samples,))
                sf.write(f"output_1/{self.instrument}/sample/{model_num}.wav", 
                                         audio, samplerate=self.sr)

    

In [5]:
gan = GAN()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 16384)             1654784   
_________________________________________________________________
reshape (Reshape)            (None, 16, 1024)          0         
_________________________________________________________________
re_lu (ReLU)                 (None, 16, 1024)          0         
_________________________________________________________________
conv1d_transpose (Conv1DTran (None, 64, 512)           13107712  
_________________________________________________________________
re_lu_1 (ReLU)               (None, 64, 512)           0         
_________________________________________________________________
conv1d_transpose_1 (Conv1DTr (None, 256, 256)          3277056   
_________________________________________________________________
re_lu_2 (ReLU)               (None, 256, 256)          0

In [7]:
# Create training data
X_train = []
for file in os.listdir(DATA_DIR): ### Modify for your data directory
    with open(DATA_DIR + "/"+f"{file}", "rb") as f:
        samples, _ = librosa.load(f, Fs)
        # Pad short audio files to NUM_SAMPLES duration
        if len(samples) < NUM_SAMPLES:
            audio = np.array([np.array([sample]) for sample in samples])
            padding = np.zeros(shape=(NUM_SAMPLES - len(samples), 1), dtype='float32')
            X_train.append(np.append(audio, padding, axis=0))
        # Create slices of length NUM_SAMPLES from long audio
        else:
            p = len(samples) // (NUM_SAMPLES)
            for i in range(p - 1):
                sample = np.expand_dims(samples[i*NUM_SAMPLES:(i+1)*NUM_SAMPLES], axis=1)
                X_train.append(sample)

print(f"X_train shape = {(len(X_train),) + X_train[0].shape}")

# Save some random training data slices and create baseline generated data for comparison
for i in range(10):
    sf.write(f"output/{INSTRUMENT}/real-{i}.wav", 
                             X_train[random.randint(0, len(X_train) - 1)], samplerate=Fs)


   

X_train shape = (1175, 16384, 1)


In [9]:
# Save some random training data slices and create baseline generated data for comparison
# for i in range(10):
#     sf.write(f"output/{INSTRUMENT}/real-{i}.wav", 
#                              X_train[random.randint(0, len(X_train) - 1)], samplerate=Fs)

# gan = GAN()
# gan.sample("fake")

# train_summary_writer = tf.summary.create_file_writer("logs/train")
    
# Train GAN
gan.sample("fake")
nu = 12
zeta = 0.99
k = BATCH_SIZE
train_summary_writer = tf.summary.create_file_writer("logs/traintest3")
with train_summary_writer.as_default():
    steps_per_epoch = len(X_train) // BATCH_SIZE

    for e in range(EPOCHS):
        if not e == 0:
            k = max(zeta*k, nu)
        for i in range(steps_per_epoch):
            D_loss_sum = 0
        
            # Update dcritic a set number of times for each update of the generator
            for n in range(D_UPDATES_PER_G_UPDATE):
                gan.D.reset_states()
                D_loss_dict = gan.train_D(np.array(random.sample(X_train, BATCH_SIZE)))
                D_loss_sum += D_loss_dict['d_loss']
            
            # Calculate average loss of critic for current step
            D_loss = D_loss_sum / D_UPDATES_PER_G_UPDATE
            
            G_loss_dict = gan.train_G(round(k))
            G_loss = G_loss_dict['g_loss']
        
            # Write logs
            tf.summary.scalar('d_loss', D_loss, step=(e*steps_per_epoch)+i)
            tf.summary.scalar('g_loss', G_loss, step=(e*steps_per_epoch)+i)
            if (e*steps_per_epoch)+i % 5 == 0:
                print(f"step {(e*steps_per_epoch)+i}: d_loss = {D_loss} g_loss = {G_loss}")
        
        # Periodically sample generator
        if e % EPOCHS_PER_SAMPLE == 0:
            gan.sample(e)
            

step 0: d_loss = -0.017147362232208252 g_loss = 0.0010967960115522146
step 5: d_loss = 0.6834036707878113 g_loss = -4.694429397583008
step 10: d_loss = -0.18023009598255157 g_loss = 3.6326773166656494
step 15: d_loss = -4.363028049468994 g_loss = -1.813288927078247
step 20: d_loss = -4.269129753112793 g_loss = -0.6866412162780762
step 25: d_loss = -2.3882603645324707 g_loss = 5.47983455657959
step 30: d_loss = -1.724114179611206 g_loss = 1.0382493734359741
step 35: d_loss = -1.5426703691482544 g_loss = 2.431617498397827
step 40: d_loss = -0.7409750819206238 g_loss = -3.147001266479492
step 45: d_loss = -1.893587350845337 g_loss = -2.306830644607544
step 50: d_loss = -2.0204689502716064 g_loss = 0.6953307390213013
step 55: d_loss = -2.312525987625122 g_loss = 2.063833713531494
step 60: d_loss = -3.4170126914978027 g_loss = -0.067031130194664
step 65: d_loss = -2.40675687789917 g_loss = 0.18770527839660645
step 70: d_loss = -2.040931224822998 g_loss = -2.046673536300659


In [None]:
# noise = tf.random.normal([1, 100])
# generated_image = np.array(generator(noise, training=False))
# comparison = np.array(X_train[0])


# wavfile.write("1600epochs.wav", 16000, *generated_image)

# wavfile.write("Comparison.wav", 16384, X_train[0])
output, sr = librosa.load("output/piano/80-0.wav", 16384)
FIG_SIZE = (15,10)
fig = plt.figure(figsize=FIG_SIZE)
librosa.display.waveplot(output, 16000)


plt.ylabel("Amplitud")
fig.savefig('80_piano.png')

In [2]:
tensorboard --logdir logs/traintest3 --host localhost

In [3]:
tensorboard --logdir logs_1/train --host localhost

In [None]:
from numpy import asarray
from numpy.random import randn
from numpy.random import randint
from tensorflow.keras.models import load_model
from matplotlib import pyplot
from numpy import linspace
 
# load model

# generate images
def sample(num_samples=5):
        model = load_model('models/700.h5')

        result = model(z, training=False)
        for i in range(num_samples):
            audio = result[i, :, :]
            audio = np.reshape(audio, (num_samples,))
            sf.write(f"output/sample{i}.wav", 
                                     audio, samplerate=self.sr)
model = load_model('models/700.h5')
gan.G = model
# scale from [-1,1] to [0,1]
vectores = gan.sample('sample')

def interpolate_points(p1, p2, n_steps=10):
	# interpolate ratios between the points
	ratios = linspace(0, 1, num=n_steps)
	# linear interpolate vectors
	vectors = list()
	for ratio in ratios:
		v = (1.0 - ratio) * p1 + ratio * p2
		vectors.append(v)
	return asarray(vectors)
# plot the result

In [6]:
gan.sample_progres()




In [None]:
def generate_latent_points():
	# generate points in the latent space
	x_input = randn(latent_dim * n_samples)
	# reshape into a batch of inputs for the network
	z_input = x_input.reshape(n_samples, latent_dim)
	return z_input
# uniform interpolation between two points in latent space
def interpolate_points(p1, p2, n_steps=10):
	# interpolate ratios between the points
	ratios = linspace(0, 1, num=n_steps)
	# linear interpolate vectors
	vectors = list()
	for ratio in ratios:
		v = (1.0 - ratio) * p1 + ratio * p2
		vectors.append(v)
	return asarray(vectors)

In [4]:
# calculate inception score in numpy
from numpy import asarray
from numpy import expand_dims
from numpy import log
from numpy import mean
from numpy import exp

# calculate the inception score for p(y|x)
def calculate_inception_score(p_yx, eps=1E-16):
	# calculate p(y)
	p_y = expand_dims(p_yx.mean(axis=0), 0)
	# kl divergence for each image
	kl_d = p_yx * (log(p_yx + eps) - log(p_y + eps))
	# sum over classes
	sum_kl_d = kl_d.sum(axis=1)
	# average over images
	avg_kl_d = mean(sum_kl_d)
	# undo the logs
	is_score = exp(avg_kl_d)
	return is_score


In [15]:
from numpy import asarray

from numpy import expand_dims

from numpy import log

from numpy import imply

from numpy import exp

 

# calculate the inception rating for p(y|x)

def calculate_inception_score(p_yx, eps=1e-16):

    # calculate p(y)

    p_y = expand_dims(p_yx.imply(axis=zero), zero)

    # kl divergence for every picture

    kl_d = p_yx * (log(p_yx + eps) - log(p_y + eps))

    # sum over courses

    sum_kl_d = kl_d.sum(axis=1)

    # common over pictures

    avg_kl_d = imply(sum_kl_d)

    # undo the logs

    is_score = exp(avg_kl_d)

    return is_score


p_yx = asarray([0])
rating = calculate_inception_score(p_yx)
print(rating)

ImportError: cannot import name 'imply' from 'numpy' (/home/rafa/.local/lib/python3.8/site-packages/numpy/__init__.py)