<a href="https://colab.research.google.com/github/Noppawat-Tantisiriwat/Thai-Music-Generation/blob/main/AIB_Master_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preparation

In [1]:
from typing import List

from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Conv1D, LayerNormalization, \
    Flatten, Dense, Reshape, Conv1DTranspose, Layer
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
from tqdm.auto import tqdm

import datetime, os

import numpy as np

In [2]:
%load_ext tensorboard

In [None]:
!nvidia-smi

## Connect to your working directory path

### GoogleDrive

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
# if not have folder yet
# %mkdir /content/drive/MyDrive/AIB_project

In [None]:
# %cd /content/drive/MyDrive/AIB_project

### Local

In [None]:
%cd Your_working_directory_path

# Model

In [3]:
class Encoder(Model):
  def __init__(self, 
               inp_shape: List[int],
               conv_filters: List[int],
               conv_kernels: List[int],
               conv_strides: List[int],
               latent_space_dim: int, 
               **kwargs):
    super(Encoder, self).__init__(**kwargs)
    self.conv_filters = conv_filters # [2, 4, 8]
    self.conv_kernels = conv_kernels # [3, 5, 3]
    self.conv_strides = conv_strides # [1, 2, 2]
    self.latent_space_dim = latent_space_dim # 2
    self._shape_before_bottleneck = None
    # dim assertion
    assert len(self.conv_strides) == len(self.conv_kernels) == len(self.conv_filters)

    self.convs = [Conv1D(
        filters=f,
        kernel_size=k,
        strides=s,
        padding="same",
        name=f"encoder_conv_layer_{i}",
        activation="relu"
    ) for i, (f, k, s) in enumerate(zip(self.conv_filters, self.conv_kernels, self.conv_strides))]

    self.layernorms = [LayerNormalization(name=f"encoder_ln_{i}") for i in range(len(self.conv_filters))]
    
    self.flatten = Flatten()
    # dim assertion
    assert len(self.convs) == len(self.layernorms)

    self.dense_mu = Dense(self.latent_space_dim, name="mu")

    self.dense_logvar = Dense(self.latent_space_dim, name="log_variance")

    self._compute_shape_before_bottleneck(inp_shape)


  def _compute_shape_before_bottleneck(self, inp_shape: List[int]):
    x = tf.zeros(shape=inp_shape) # dummy data
    x= tf.expand_dims(x, axis=0) # batching
    for conv, layernorm in zip(self.convs, self.layernorms):
      x = conv(x)
      x = layernorm(x)
    self._shape_before_bottleneck = tf.shape(x)[1:] # (None, shape) -> (shape) [None = batch_size]
  
  def _reparameterized(self, mu, log_var):
    eps = K.random_normal(shape=K.shape(mu), mean=0., stddev=1.)
    sample_point = mu + K.exp(log_var / 2) * eps
    return sample_point

  def call(self, x):
    for conv, layernorm in zip(self.convs, self.layernorms):
      x = conv(x)
      x = layernorm(x)
    x = self.flatten(x)
    mu = self.dense_mu(x)
    log_var = self.dense_logvar(x)
    x = self._reparameterized(mu, log_var)
    return x, (mu, log_var)

In [4]:
class Decoder(Model):
  def __init__(self,
               shape_before_bottleneck: tf.Tensor,
               conv_filters: List[int], # the first element must be 1
               conv_kernels: List[int],
               conv_strides: List[int],
               out_channel: int,
               **kwargs):
    super().__init__(**kwargs)
    self.conv_filters = conv_filters
    self.conv_kernels = conv_kernels
    self.conv_strides = conv_strides
    self.dense = Dense(tf.reduce_prod(shape_before_bottleneck), name="decoder_dense")
    self.reshape = Reshape(shape_before_bottleneck.numpy())
    self.out_channel = out_channel

    # dim assertion
    assert len(self.conv_strides) == len(self.conv_kernels) == len(self.conv_filters)

    self.convs = [Conv1DTranspose(
          filters=f,
            kernel_size=k,
            strides=s,
            padding="same",
            name=f"decoder_conv_transpose_layer_{i}",
            activation="relu"
          ) for i, (f, k, s) in enumerate(zip(self.conv_filters[1:], self.conv_kernels[1:], self.conv_strides[1:]))]
    self.layernorms = [LayerNormalization(name=f"decoder_ln_{i}") for i in range(len(self.conv_filters[1:]))]

    # dim assertion
    assert len(self.convs) == len(self.layernorms)

    self.output_conv = Conv1DTranspose(
        filters=self.out_channel,
        kernel_size=self.conv_kernels[0],
        strides=self.conv_strides[0],
        padding="same",
        name=f"decoder_conv_transpose_layer_{len(self.conv_strides)}"
    )


  def call(self, x):
    x = self.dense(x)
    x = self.reshape(x)
    for conv, layernorm in zip(self.convs, self.layernorms):
            x = conv(x)
            x = layernorm(x)
    x = self.output_conv(x)
    return x

In [68]:
class VAE(Model):
  def __init__(self,
               inp_shape: List[int],
               conv_filters: List[int],
               conv_kernels: List[int],
               conv_strides: List[int],
               latent_space_dim: int,
               recon_loss_weight: int,
               **kwargs):
    super(VAE, self).__init__(**kwargs)
    self.inp_shape = inp_shape
    self.recon_loss_weight = recon_loss_weight 
    self._shape_before_bottleneck = None
    self.latent_space_dim = latent_space_dim
    self._reduce_axis = list(range(1, len(inp_shape)+1))

    self.encoder = Encoder(
        inp_shape=inp_shape,
        conv_filters=conv_filters,
        conv_kernels=conv_kernels,
        conv_strides=conv_strides,
        latent_space_dim=latent_space_dim
    )
    
    self.decoder = Decoder(
        shape_before_bottleneck=self.encoder._shape_before_bottleneck,
        conv_filters = conv_filters[::-1],
        conv_kernels = conv_kernels[::-1],
        conv_strides=conv_strides[::-1],
        out_channel=inp_shape[-1]
    )

  def set_recon_loss_weight(self, recon_loss_weight):
    self.recon_loss_weight = recon_loss_weight

  def _calculate_kl_loss(self, mu, log_var):
    kl_loss = -0.5 * tf.reduce_sum(1 + log_var -tf.square(mu) - tf.exp(log_var), axis=1)
    return kl_loss

  def _calculate_recon_loss(self, x, x_prime):
    recon_loss = tf.reduce_mean(tf.square(x - x_prime), axis=self._reduce_axis)
    return self.recon_loss_weight * recon_loss
  
  def _compute_loss(self, x, x_prime, mu, log_var):
    recon_loss =  self._calculate_recon_loss(x, x_prime)
    kl_loss =  self._calculate_kl_loss(mu, log_var)
    loss =  recon_loss  + kl_loss
    self.add_loss(tf.add_n([loss]))
    self.add_metric(tf.add_n([recon_loss / self.recon_loss_weight]), name="recon_loss")
    self.add_metric(tf.add_n([kl_loss]), name="kl_loss")

  def call(self, x):
    z, (mu, log_var) = self.encoder(x)
    x_prime = self.decoder(z)
    self._compute_loss(x, x_prime, mu, log_var)
    return z, x_prime

  def full_summary(self):
    self.encoder.summary()
    self.decoder.summary()
    self.summary()

  def sample(self, eps=None):
    if eps is None:
      eps = tf.random.normal([1, self.latent_space_dim])
      return self.decoder(eps)
    else:
      print(f"sample epsilon: {eps}")
      return self.decoder(eps)

  def reconstruct(self, images):
    latent_representations = self.encoder.predict(images)
    reconstructed_images = self.decoder.predict(latent_representations)
    return reconstructed_images, latent_representations

# Training Preparation

In [5]:
def parse_function(example_proto):
    features = {
        'frequency': tf.io.FixedLenFeature([], tf.int64),
        'time': tf.io.FixedLenFeature([], tf.int64),
        'spectrograms': tf.io.FixedLenSequenceFeature([], tf.float32, allow_missing=True)
    }
    parsed_features = tf.io.parse_single_example(example_proto, features)

    spectrogram = tf.reshape(parsed_features["spectrograms"],
                        [parsed_features["frequency"], parsed_features['time']])

    spectrogram = tf.transpose(spectrogram)
    return spectrogram

In [6]:
def get_train_data(tfrec_path):
    train_data = tf.data.TFRecordDataset(tfrec_path)\
        .shuffle(300)\
        .map(parse_function, num_parallel_calls=tf.data.experimental.AUTOTUNE)\
        .batch(32)\
        .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
        

    return train_data

In [7]:
tfrec_path = r"where-you-save-tfrec"
x_train = get_train_data(tfrec_path)

In [8]:
K.clear_session()

In [69]:
vae = VAE(inp_shape=[1296, 256], 
          conv_filters=[256, 512, 512, 1024],
          conv_kernels=[5, 5, 5, 5],
          conv_strides=[3, 3, 3, 3],
          latent_space_dim=1024,
          recon_loss_weight=1000)

In [70]:
_ = vae(Input(shape=[1296, 256]))

In [71]:
vae.compile(Adam(learning_rate=1e-4))

In [None]:
logdir = os.path.join("tensorboaed_logs/VAE_layernorms_new_arch", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
tensorboard_callback = tf.keras.callbacks.TensorBoard(logdir, histogram_freq=1)
print(logdir)

In [14]:
checkpoint_file_path = "T:\\AI_project\\Compress_and_model\\v.final\\model\\checkpoint_VAE_256_512_512_1024_rc1000000_layernorm\\weight_improvement_{epoch:02d}-{loss:.4f}"
model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    checkpoint_file_path,
    monitor="loss",
    verbose=1,
    save_best_only=True,
    mode="min",
)

# Training

### Changing weight of L2 loss

In [73]:
vae.set_recon_loss_weight(300)

In [None]:
vae.recon_loss_weight

### Start training

In [None]:
EPOCHS = 100 # up to you

In [None]:
vae.fit(x_train, epochs=EPOCHS)
# tf.keras.models.save_model(vae, "T:\\DATASET\\v1.1\\model\\CNN1000", overwrite=True)

# Save Model

In [None]:
tf.keras.models.save_model(vae, "where-you-want-to-save", overwrite=True)
#vae.save(filepath, save_format+"tf")

# PRODUCE SOUND MAYBE

In [1]:
import tensorflow as tf
import librosa
import soundfile as sf
import pickle as p
import numpy as np
import os
from tqdm.auto import tqdm

In [None]:
with open("E:\\datasethere\\MeanStdValue\\mean_std_values.pkl", "rb") as file:
  mean_std = p.load(file)

In [None]:
mean, std = mean_std["mean"], mean_std["std"]

def denormalize(array, mean, std):
    # denormalize
    return array * std + mean

In [49]:
def generate(model):
    eps = tf.random.normal([1, 1024])
    log_spectrogram = model.decoder(eps)
    log_spectrogram = tf.squeeze(log_spectrogram).numpy().T
    log_denorm = denormalize(log_spectrogram, mean, std)
    spectrogram = librosa.db_to_amplitude(log_denorm)
    wave = librosa.griffinlim(spectrogram, hop_length=256, win_length=510)
    return wave

def main(num_generate, model_path, output):
    os.makedirs(output)
    vae = tf.keras.models.load_model(model_path)
    for i in tqdm(range(num_generate)):
        wave = generate(vae)
        sf.write(os.path.join(output, f"generation_no.{i+1:02d}.wav"), wave, samplerate=22050)

In [60]:
num_generate = 10 # up to you
model_path = "where-you-save-model"
output = "where-you-want-your-files-to-be"

In [None]:
# start generating your audio files
main(num_generate, model_path, output)