<a href="https://colab.research.google.com/github/Noppawat-Tantisiriwat/Thai-Music-Generation/blob/main/AIB_LSTM_master_CNN_LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preparation

In [None]:
from typing import List

from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Conv1D, LayerNormalization, \
    Flatten, Dense, Reshape, Conv1DTranspose, Layer, LSTM, RepeatVector, TimeDistributed
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
from tqdm.auto import tqdm

import datetime, os

In [None]:
%load_ext tensorboard

In [None]:
!nvidia-smi

Wed Jun 23 00:43:33 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 465.27       Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla P100-PCIE...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   44C    P0    27W / 250W |      0MiB / 16280MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

# Model

In [None]:
class Encoder(Model):
  def __init__(self, 
               inp_shape: List[int],
               conv_filters: List[int],
               conv_kernels: List[int],
               conv_strides: List[int],
               latent_space_dim: int,
               lstm_units : int, 
               **kwargs):
    super(Encoder, self).__init__(**kwargs)
    self.conv_filters = conv_filters # [2, 4, 8]
    self.conv_kernels = conv_kernels # [3, 5, 3]
    self.conv_strides = conv_strides # [1, 2, 2]
    self.latent_space_dim = latent_space_dim # 2
    self._shape_before_bottleneck = None
    self.lstm_units = lstm_units
    # dim assertion
    assert len(self.conv_strides) == len(self.conv_kernels) == len(self.conv_filters)

    self.convs = [Conv1D(
        filters=f,
        kernel_size=k,
        strides=s,
        padding="same",
        name=f"encoder_conv_layer_{i}",
        activation="relu"
    ) for i, (f, k, s) in enumerate(zip(self.conv_filters, self.conv_kernels, self.conv_strides))]

    self.layernorms = [LayerNormalization(name=f"encoder_ln_{i}") for i in range(len(self.conv_filters))]
    
  
    
    # self.flatten = Flatten()
    # dim assertion
    assert len(self.convs) == len(self.layernorms)

    self.dense_mu = Dense(self.latent_space_dim, name="mu")

    self.dense_logvar = Dense(self.latent_space_dim, name="log_variance")

    self._compute_shape_before_bottleneck(inp_shape)
    self.lstm = LSTM(lstm_units, name="lstm")

  def _compute_shape_before_bottleneck(self, inp_shape: List[int]):
    x = tf.zeros(shape=inp_shape) # dummy data
    x= tf.expand_dims(x, axis=0) # batching
    for conv, layernorm in zip(self.convs, self.layernorms):
      x = conv(x)
      x = layernorm(x) # flatten ด้วย LSTM 
    self._shape_before_bottleneck = tf.shape(x)[1:] # (None, shape) -> (shape) [None = batch_size]
  
  def _reparameterized(self, mu, log_var):
    eps = K.random_normal(shape=K.shape(mu), mean=0., stddev=1.)
    sample_point = mu + K.exp(log_var / 2) * eps
    return sample_point

  def call(self, x):
    for conv, layernorm in zip(self.convs, self.layernorms):
      x = conv(x)
      x = layernorm(x)
    x = self.lstm(x)
    mu = self.dense_mu(x)
    log_var = self.dense_logvar(x)
    x = self._reparameterized(mu, log_var)
    return x, (mu, log_var)

In [None]:
class Decoder(Model):
  def __init__(self,
               shape_before_bottleneck: tf.Tensor,
               conv_filters: List[int], # the first element must be 1
               conv_kernels: List[int],
               conv_strides: List[int],
               out_channel: int,
               **kwargs):
    super().__init__(**kwargs)
    self.conv_filters = conv_filters
    self.conv_kernels = conv_kernels
    self.conv_strides = conv_strides
    self.reshape = Reshape(shape_before_bottleneck.numpy())
    self.out_channel = out_channel
    self.dense = Dense(tf.reduce_prod(shape_before_bottleneck))
    # dim assertion
    assert len(self.conv_strides) == len(self.conv_kernels) == len(self.conv_filters)

    self.convs = [Conv1DTranspose(
          filters=f,
            kernel_size=k,
            strides=s,
            padding="same",
            name=f"decoder_conv_transpose_layer_{i}",
            activation="relu"
          ) for i, (f, k, s) in enumerate(zip(self.conv_filters[1:], self.conv_kernels[1:], self.conv_strides[1:]))]
    self.layernorms = [LayerNormalization(name=f"decoder_ln_{i}") for i in range(len(self.conv_filters[1:]))]
    # self.lstms = [LSTM(unit, return_sequences=True) for unit in self.conv_filters[1:]]
    # dim assertion
    assert len(self.convs) == len(self.layernorms)
  
    self.output_conv = Conv1DTranspose(
        filters=self.out_channel,
        kernel_size=self.conv_kernels[0],
        strides=self.conv_strides[0],
        padding="same",
        activation="sigmoid",
        name=f"decoder_conv_transpose_layer_{len(self.conv_strides)}"
    )

    
  def call(self, x):
    x = self.dense(x)
    x = self.reshape(x)
    for conv,layernorm in zip(self.convs, self.layernorms):
            x = conv(x)
            x = layernorm(x)
    x = self.output_conv(x)
    return x

In [None]:
class VAE(Model):
  def __init__(self,
               inp_shape: List[int],
               conv_filters: List[int],
               conv_kernels: List[int],
               conv_strides: List[int],
               latent_space_dim: int,
               lstm_units : int,
               recon_loss_weight: int,
               **kwargs):
    super(VAE, self).__init__(**kwargs)
    self.inp_shape = inp_shape
    self.recon_loss_weight = recon_loss_weight 
    self._shape_before_bottleneck = None
    self.latent_space_dim = latent_space_dim
    self._reduce_axis = list(range(1, len(inp_shape)+1))

    self.encoder = Encoder(
        inp_shape=inp_shape,
        conv_filters=conv_filters,
        conv_kernels=conv_kernels,
        conv_strides=conv_strides,
        latent_space_dim=latent_space_dim,
        lstm_units = lstm_units
    )
    
    self.decoder = Decoder(
        shape_before_bottleneck=self.encoder._shape_before_bottleneck,
        conv_filters = conv_filters[::-1],
        conv_kernels = conv_kernels[::-1],
        conv_strides=conv_strides[::-1],
        out_channel=inp_shape[-1]
    )

  def _calculate_kl_loss(self, mu, log_var):
    kl_loss = -0.5 * tf.reduce_sum(1 + log_var -tf.square(mu) - tf.exp(log_var), axis=1)
    return kl_loss

  def _calculate_recon_loss(self, x, x_prime):
    recon_loss = tf.reduce_mean(tf.square(x - x_prime), axis=self._reduce_axis)
    return self.recon_loss_weight * recon_loss
  
  def _compute_loss(self, x, x_prime, mu, log_var):
    recon_loss =  self._calculate_recon_loss(x, x_prime)
    kl_loss =  self._calculate_kl_loss(mu, log_var)
    loss =  recon_loss  + kl_loss
    self.add_loss(tf.add_n([loss]))
    self.add_metric(tf.add_n([recon_loss / self.recon_loss_weight]), name="recon_loss")
    self.add_metric(tf.add_n([kl_loss]), name="kl_loss")

  def call(self, x):
    z, (mu, log_var) = self.encoder(x)
    x_prime = self.decoder(z)
    self._compute_loss(x, x_prime, mu, log_var)
    return z, x_prime

  def full_summary(self):
    self.encoder.summary()
    self.decoder.summary()
    self.summary()

  def sample(self, eps=None):
    if eps is None:
      eps = tf.random.normal([1, self.latent_space_dim])
      return self.decoder(eps)
    else:
      print(f"sample epsilon: {eps}")
      return self.decoder(eps)

  def reconstruct(self, images):
    latent_representations = self.encoder.predict(images)
    reconstructed_images = self.decoder.predict(latent_representations)
    return reconstructed_images, latent_representations


# Training Preparation

In [None]:
vae = VAE(inp_shape=[1296, 256], 
          conv_filters=[256, 512, 512, 1024],
          conv_kernels=[5, 5, 5, 5],
          conv_strides=[3, 3, 3, 3],
          lstm_units=512,
          latent_space_dim=1024,
          recon_loss_weight=1000000)

In [None]:
_ = vae(Input(shape=[1296, 256]))

In [None]:
vae.compile(Adam(learning_rate=1e-4))

In [None]:
vae.full_summary()

Model: "encoder"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
encoder_conv_layer_0 (Conv1D multiple                  327936    
_________________________________________________________________
encoder_conv_layer_1 (Conv1D multiple                  655872    
_________________________________________________________________
encoder_conv_layer_2 (Conv1D multiple                  1311232   
_________________________________________________________________
encoder_conv_layer_3 (Conv1D multiple                  2622464   
_________________________________________________________________
encoder_ln_0 (LayerNormaliza multiple                  512       
_________________________________________________________________
encoder_ln_1 (LayerNormaliza multiple                  1024      
_________________________________________________________________
encoder_ln_2 (LayerNormaliza multiple                  1024

# Generate

In [None]:
import librosa 
import soundfile as sf
import IPython.display as ipd
import numpy as np
import matplotlib.pyplot as plt


In [None]:
import pickle as p
with open("/content/drive/MyDrive/AIB_project/Attemps/VAE_code/MinMaxValue_padded/min_max_values.pkl", "rb") as file:
  max_min = p.load(file)
min_li = []
max_li = []
for _, value in max_min.items():
    max_li.append(value["max"])
    min_li.append(value["min"])
min_array = np.array(min_li)
max_array = np.array(max_li)

In [None]:
original_min = min_array.mean()
original_max = max_array.mean()

In [None]:
def denormalise(norm_array, original_min, original_max):
    array = (norm_array - 0.) / (1. - 0.)
    array = array * (original_max - original_min) + original_min
    return array

In [None]:
def convert_spectrograms_to_audio(spectrogram):
# reshape the log spectro
  log_spectrogram = tf.squeeze(spectrogram).numpy().T
  log_spectrogram = denormalise(log_spectrogram, original_min, original_max)
            # apply denormalisation
            # log spectrogram -> spectrogram
  spec = librosa.db_to_amplitude(log_spectrogram)
            # apply Griffin-Lim
  signal = librosa.griffinlim(spec, hop_length=256, win_length=510)
  return signal

In [None]:
def generate(esp=None):
  if eps == None:
    esp = tf.random.normal([1, vae.encoder.latent_space_dim])
    spectrogram_random = loaded_vae.decoder(eps)
    wavs = convert_spectrograms_to_audio(spectrogram_random)
    return wavs
  else:
    spectrogram_random = loaded_vae.decoder(eps)
    wavs = convert_spectrograms_to_audio(spectrogram_random)
    return wavs

In [None]:
def plot_spectrogram(spectrogram_random):
  librosa.display.specshow(spectrogram_random.numpy()[0].T)