<a href="https://colab.research.google.com/github/CadeHarger/portfolio/blob/main/Personal_Projects/Conditional_Audio_Synthesis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title Setup Dependencies
!pip install tensorflow-io

%tensorflow_version 2.x
from __future__ import absolute_import, division, print_function, unicode_literals

import os
import pathlib
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import tensorflow as tf
import pandas as pd
import glob
import imageio
import PIL
import tensorflow_probability as tfp
import time

from tensorflow.keras.layers.experimental import preprocessing
from tensorflow.keras import layers
from tensorflow.keras import models
from IPython import display
from tqdm.autonotebook import tqdm

In [None]:
#@title Load the Dataset
# You'll write a script to download a portion of the Speech Commands dataset.
# The original dataset consists of over 105,000 WAV audio files of people saying thirty different words.
# This data was collected by Google and released under a CC BY license.
# You'll be using a portion of the dataset to save time with data loading. Extract the mini_speech_commands.zip and load it in using the tf.data API.
data_dir = pathlib.Path('data/mini_speech_commands')
if not data_dir.exists():
  tf.keras.utils.get_file(
      'mini_speech_commands.zip',
      origin="http://storage.googleapis.com/download.tensorflow.org/data/mini_speech_commands.zip",
      extract=True,
      cache_dir='.', cache_subdir='data')

commands = np.array(tf.io.gfile.listdir(str(data_dir)))
commands = commands[commands != 'README.md']
print('Commands:', commands)

In [None]:
#@title Extract and shuffle audio files
filenames = tf.io.gfile.glob(str(data_dir) + '/*/*')
filenames = tf.random.shuffle(filenames)
num_samples = len(filenames)
print('Number of total examples:', num_samples)
print('Number of examples per label:',
      len(tf.io.gfile.listdir(str(data_dir/commands[0]))))
print('Example file tensor:', filenames[0])

# 80:10:10 Split
train_files = filenames[:6400]
val_files = filenames[6400: 6400 + 800]
test_files = filenames[-800:]

print('Training set size', len(train_files))
print('Validation set size', len(val_files))
print('Test set size', len(test_files))

In [None]:
#@title Converting Audio to usable format
# To load an audio file, you will use tf.audio.decode_wav, which returns the WAV-encoded audio as a Tensor and the sample rate.
# A WAV file contains time series data with a set number of samples per second.
# Each sample represents the amplitude of the audio signal at that specific time. In a 16-bit system, like the files in mini_speech_commands, the values range from -32768 to 32767.
# The sample rate for this dataset is 16kHz. Note that tf.audio.decode_wav will normalize the values to the range [-1.0, 1.0].
def decode_audio(audio_binary):
  audio, _ = tf.audio.decode_wav(audio_binary)
  return tf.squeeze(audio, axis=-1)

# Labels are its parent directory
def get_label(file_path):
  parts = tf.strings.split(file_path, os.path.sep)

  # Note: You'll use indexing here instead of tuple unpacking to enable this
  # to work in a TensorFlow graph.
  return parts[-2]

def get_waveform_and_label(file_path):
  label = get_label(file_path)
  audio_binary = tf.io.read_file(file_path)
  waveform = decode_audio(audio_binary)
  return waveform, label

# Create the datasets from tensor slices and by mapping our function to the filenames (Redundant because we split validation and train later)
AUTOTUNE = tf.data.AUTOTUNE
files_ds = tf.data.Dataset.from_tensor_slices(train_files)
waveform_ds = (files_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE))

In [None]:
#@title Display Examples
rows = 3
cols = 3
n = rows*cols
fig, axes = plt.subplots(rows, cols, figsize=(10, 12))
for i, (audio, label) in enumerate(waveform_ds.take(n)):
  r = i // cols
  c = i % cols
  ax = axes[r][c]
  ax.plot(audio.numpy())
  ax.set_yticks(np.arange(-1.2, 1.2, 0.2))
  label = label.numpy().decode('utf-8')
  ax.set_title(label)

plt.show()

In [None]:
#@title Audio Example
for waveform, label in waveform_ds.take(1):
  label = label.numpy().decode('utf-8')
print('Label:', label)
print('Waveform shape:', waveform.shape)
print('Audio playback')
display.display(display.Audio(waveform, rate=16000))

In [None]:
#@title Prepare Data for Training
def preprocess_dataset(files):
  files_ds = tf.data.Dataset.from_tensor_slices(files)
  output_ds = files_ds.map(get_waveform_and_label, num_parallel_calls=AUTOTUNE)
  return output_ds

train_ds = preprocess_dataset(train_files)
val_ds = preprocess_dataset(val_files)
test_ds = preprocess_dataset(test_files)

batchSize = 2
train_ds = train_ds.padded_batch(batchSize)
val_ds = val_ds.padded_batch(batchSize)

train_ds = train_ds.cache().prefetch(AUTOTUNE)
val_ds = val_ds.cache().prefetch(AUTOTUNE)

In [None]:
#@title Construct the Model

class CIDVAE(tf.keras.Model):
  def __init__(self, latentDimensions, reconRatio, latentRatio, encOpt, decOpt, discOpt, sigmaMultiplier=20):
    super(CIDVAE, self).__init__()
    self.encOpt = encOpt
    self.decOpt = decOpt
    self.discOpt = discOpt
    self.reconRatio = reconRatio
    self.latentRatio = latentRatio
    self.sigmaMultiplier = sigmaMultiplier
    inputs = tf.keras.layers.Input(shape=(16000, 1))
    x = tf.keras.layers.LSTM(600, return_sequences=False,
                dropout=0.1, recurrent_dropout = 0.0)(inputs)
    x = tf.keras.layers.Dense(400, activation='relu')(x)

    self.discriminator = tf.keras.layers.Dense(200, activation='relu')(x)
    self.discriminator = tf.keras.layers.Dense(1, activation='sigmoid')(self.discriminator)
    self.discriminator = tf.keras.Model(inputs = inputs, outputs = self.discriminator, name = 'Discriminator')

    self.encoder = tf.keras.layers.Dense(200, activation='relu')(x)
    self.encoder = tf.keras.layers.Dense(latentDimensions + latentDimensions, activation = None)(self.encoder)
    self.encoder = tf.keras.Model(inputs = inputs, outputs = self.encoder, name = 'Encoder')

    inputsDec = tf.keras.layers.Input(shape=(latentDimensions, 1), name='Latent Input')
    x = tf.keras.layers.LSTM(50, return_sequences=True, dropout=0.1, recurrent_dropout = 0.0)(inputsDec)
    x = tf.keras.layers.Flatten()(x)
    self.decoder = tf.keras.layers.Dense(16000, activation='relu')(x)
    self.decoder = tf.keras.Model(inputs = inputsDec, outputs = self.decoder, name = 'Decoder')

    #self.encoder.summary()
    #self.decoder.summary()
    #self.discriminator.summary()

  @tf.function
  def sample(self, eps=None):
    if eps is None:
      eps = tf.random.normal(shape=(100, self.latent_dim))
    return self.decode(eps, apply_sigmoid=True)

  def decode(self, z, apply_sigmoid=False):
    logits = self.decoder(z)
    if apply_sigmoid:
      probs = tf.sigmoid(logits)
      return probs
    return logits

  def distEncode(self, x):
    mu, sigma = self.encode(x)
    return tfp.distributions.MultivariateNormalDiag(loc=mu, scale_diag=sigma)

  def encode(self, x):
    mean, logvar = tf.split(self.encoder(x), num_or_size_splits=2, axis=1)
    return mean, logvar

  def reparameterize(self, mean, logvar):
    eps = tf.random.normal(shape = mean.shape)
    return eps * tf.exp(logvar * .5) + mean

  # Shared + encoder: Lrecon + Lkldivergence
  # Shared + discriminator: Ldis (Lreal - Lrandom - Lgenerated)
  # Decoder: Lrecon (L2(real - generated)) + Lgenerated

  def compute_loss(self, x):
    zDist = self.distEncode(x)
    z = zDist.sample()
    zpDist = tfp.distributions.MultivariateNormalDiag(loc=[0.0] * z.shape[-1], scale_diag=[1.0] * z.shape[-1])
    zp = tf.random.normal([x.shape[0], z.shape[-1], 1])
    xhat = self.decode(z)
    xp = self.decode(zp)
    realOutput = self.discriminator(x)
    genOutput = self.discriminator(xhat)
    randOutput = self.discriminator(xp)
    lreal = self.discriminatorLoss(realOutput, 1)
    lgenerated = self.discriminatorLoss(genOutput, 0)
    lrandom = self.discriminatorLoss(randOutput, 0)
    #print(lreal, lgenerated, lrandom)
    lrecon = tf.reduce_mean(tf.reduce_mean(tf.math.square(realOutput - genOutput), axis=0)) / self.reconRatio
    self.dProp = tf.constant(1.0) / (tf.constant(1.0) + tf.exp(-tf.constant(1.0) * ((lgenerated + lrandom) * self.sigmaMultiplier)))

    klDiv = tfp.distributions.kl_divergence(zDist, zpDist)
    llatent = tf.reduce_mean(tf.maximum(klDiv, 0)) / self.latentRatio

    return (
      self.dProp,
      llatent,
      lrecon,
      lrandom,
      lgenerated,
      lreal,
    )

  def compute_gradients(self, x):
    with tf.GradientTape() as encTape, tf.GradientTape() as decTape, tf.GradientTape() as discTape:
        (_, llatent, lrecon, lrandom, lgenerated, lreal, ) = self.compute_loss(x)
        lEnc = llatent + lrecon
        lDec = lrandom + lrecon
        lDisc = lgenerated + lreal
    enc_gradients = encTape.gradient(lEnc, self.encoder.trainable_variables)
    dec_gradients = decTape.gradient(lDec, self.decoder.trainable_variables)
    disc_gradients = discTape.gradient(lDisc, self.discriminator.trainable_variables)

    return enc_gradients, dec_gradients, disc_gradients

  @tf.function
  def apply_gradients(self, enc_gradients, dec_gradients, disc_gradients):
      self.encOpt.apply_gradients(
          zip(enc_gradients, self.encoder.trainable_variables)
      )
      self.decOpt.apply_gradients(
          zip(dec_gradients, self.decoder.trainable_variables)
      )
      self.discOpt.apply_gradients(
          zip(disc_gradients, self.discriminator.trainable_variables)
      )

  def train(self, x):
    enc_gradients, dec_gradients, disc_gradients = self.compute_gradients(x)
    self.apply_gradients(enc_gradients, dec_gradients, disc_gradients)

  def discriminatorLoss(_, logits, real=True):
    if real:
        labels = tf.ones_like(logits)
    else:
        labels = tf.zeros_like(logits)
    return tf.compat.v1.losses.sigmoid_cross_entropy(multi_class_labels=labels, logits=logits)



In [None]:
model = CIDVAE(20, 1, 0.5, tf.keras.optimizers.Adam(1e-4, beta_1=0.5), tf.keras.optimizers.Adam(1e-4, beta_1=0.5), tf.keras.optimizers.Adam(1e-4, beta_1=0.5))

In [None]:
example_data = tf.expand_dims(next(iter(train_ds))[0], -1)
model.train(example_data)

In [None]:
#@title Train the Model

TRAIN_BUF=6400
TEST_BUF=800
N_TRAIN_BATCHES =int(TRAIN_BUF/batchSize)
N_TEST_BATCHES = int(TEST_BUF/batchSize)
n_epochs = 1

for epoch in range(n_epochs):
    # train
    for batch, train_x in tqdm(
        zip(range(N_TRAIN_BATCHES), train_ds), total=N_TRAIN_BATCHES
    ):
        model.train(tf.expand_dims(train_x[0], -1))
    # test on holdout
    loss = []
    for batch, test_x in tqdm(
        zip(range(N_TEST_BATCHES), test_ds), total=N_TEST_BATCHES
    ):
        loss.append(model.compute_loss(train_x))
    losses.loc[len(losses)] = np.mean(loss, axis=0)
    # plot results
    display.clear_output()
    print(
        "Epoch: {}".format(epoch)
    )
    plot_reconstruction(model, example_data)
    plot_losses(losses)

In [None]:
wav = model.decoder(     np.expand_dims(np.expand_dims((np.random.rand(20) * 2) - 1, -1), 0)    )
display.display(display.Audio(wav, rate=16000))