## Music VAE 구현

1. VAE - 의미론적으로 의미 있는 data의 잠재적 표현을 생성하는 모델
    - 전통적 LSTM은 후방 붕괴 문제로 music data와 같은 긴 sequence의 디코딩이 어려움
    - 후방붕괴 : sequence가 생성됨에 따라 잠재 상태의 소멸 영향
    - hierarchical decoder(계층적 디코더)를 통해 sampling 된 latent vector(잠재 백터)는 flat decoder가 아닌 여러 레벨의 decoder 통과

2. data set & preprocess
    - Music VAE Model은 midi 확장자의 data 사용
    - 양방향 LSTM encoder와 계층적 단방향 LSTM decoder 사용 
    - Music VAE 구현을 위해 midi format을 tfrecord format으로 변환 필요
        - tfrecord : tensorflow의 학습 데이터 저장을 위한 binary data format
        - midi -> tfrecord format transform은 magenta library 활용해 디렉토리 자체의 변환 가능(convert_dir_to_note_sequences)
        - 변환 된 tfrecord 파일 저장

3. Training
    - tfrecord data를 입력 sequence로 VAE Model train
    - Bidirectional Encoder : 2개층의 양방향 LSTM으로 sequence 정보를 갖도록 함
    - Hierarchical Decoder : 계층적 단방향 LSTM

### Music VAE

1. 1개의 midi data에서 drum sample 추출

In [1]:
# 필요 모듈 import

import tensorflow as tf
import numpy as np
import pathlib
import zipfile
import os
import pandas as pd
import IPython
import collections
import note_seq

from magenta.common import merge_hparams
from magenta.contrib import training as contrib_training
from magenta.models.music_vae.base_model import MusicVAE
from magenta.models.music_vae import data_hierarchical
from magenta.models.music_vae import lstm_models
from magenta.models.music_vae import data
from magenta.scripts.convert_dir_to_note_sequences import convert_directory # tfrecord 변환
from magenta.models.music_vae import configs
from magenta.models.music_vae.trained_model import TrainedModel # 훈련 모델
import tensorflow.compat.v1 as tf
import tf_slim

Import requested from: 'numba.decorators', please update to use 'numba.core.decorators' or pin to Numba version 0.48.0. This alias will not be present in Numba version 0.50.0.[0m
  from numba.decorators import jit as optional_jit
Import of 'jit' requested from: 'numba.decorators', please update to use 'numba.core.decorators' or pin to Numba version 0.48.0. This alias will not be present in Numba version 0.50.0.[0m
  from numba.decorators import jit as optional_jit


In [2]:
# 경로 설정

data_root= 'D:/PozaLabs_test/VAE/midi_data/groove' # data 저장 경로
rec_root= 'D:/PozaLabs_test/VAE/midi_data/m.tfrecord'

### PreProcessing

In [3]:
# midi format data가 저장된 directory 자체의 format 변환
# magenta library 활용

convert_directory(data_root,rec_root,recursive=True)

INFO:tensorflow:Converting files in 'D:/PozaLabs_test/VAE/midi_data/groove\'.
INFO:tensorflow:0 files converted.
INFO:tensorflow:Converted MIDI file D:/PozaLabs_test/VAE/midi_data/groove\1_funk_80_beat_4-4.mid.


### Train

In [4]:
# magenta github code - configs.py 참조

class Config(collections.namedtuple('Config', ['model', 'hparams', 'note_sequence_augmenter', 'data_converter', 'train_examples_path', 'eval_example_path', 'tfds_name'])):
    def values(self):
        return self._asdict()

Config.__new__.__defaults__= (None,) * len(Config._fields)

CONFIG_MAP = {}

HParams = contrib_training.HParams

# groovae config 활용
# drum

CONFIG_MAP['drum_4bar'] = Config(
    model=MusicVAE(lstm_models.BidirectionalLstmEncoder(), # BidirectionalLstmEncoder
                   lstm_models.GrooveLstmDecoder()), # Hierarchical Decoder

    hparams=merge_hparams(
        lstm_models.get_default_hparams(),
        HParams(
            batch_size= 512, # 데이터 배치사이즈
            max_seq_len= 64,  # 4마디 길이지정, 16이 1마디, sequences of 64th note events
            z_size= 512, # latent vector(잠재백터)
            enc_rnn_size= [2048], # 2048개의 은닉층을 갖는 양방향 LSTM 레이어 1개를 갖는 인코더
            dec_rnn_size= [1024, 1024], # 1024개의 은닉층을 갖는 계층적 단방향 LSTM 레이어 2개를 갖는 디코더
            max_beta= 0.2,
            free_bits= 48
        )),
    note_sequence_augmenter=None,

    # 4마디 단위로 sequence 분리
    data_converter=data.GrooveConverter(
        split_bars=4, steps_per_quarter=4, quarters_per_bar=4, max_tensors_per_notesequence=20,
        pitch_classes=data.ROLAND_DRUM_PITCH_CLASSES,
        inference_pitch_classes=data.REDUCED_DRUM_PITCH_CLASSES),
    # tfds_name='C:/Users/SESE/iCloudDrive/POZA/VAE/groove/4bar-midionly'
    train_examples_path='D:/PozaLabs_test/VAE/midi_data/m.tfrecord' # 데이터 경로 설정
)

In [5]:
# magenta github code - train.py 참조
# train code source code

def _trial_summary(hparams, examples_path, output_dir):
    """tensorboard summary text"""

    examples_path_summary = tf.summary.text('examples_path', tf.constant(examples_path, name='examples_path'), collections=[])

    hparams_dict = hparams.values()

    """Hyper Parameter"""
    # Create a markdown table from hparams.
    header= '| Key | Value |\n| :--- | :--- |\n'
    keys= sorted(hparams_dict.keys())
    lines= ['| %s | %s |' % (key, str(hparams_dict[key])) for key in keys]
    hparams_table = header + '\n'.join(lines) + '\n'

    hparam_summary = tf.summary.text('hparams', tf.constant(hparams_table, name='hparams'), collections=[])

    with tf.Session() as sess:
        writer = tf.summary.FileWriter(output_dir, graph=sess.graph)
        writer.add_summary(examples_path_summary.eval())
        writer.add_summary(hparam_summary.eval())
        writer.close()


def _get_input_tensors(dataset, config):

    """dataset으로부터 tensor input"""
    batch_size = config.hparams.batch_size
    iterator = tf.data.make_one_shot_iterator(dataset)
    (input_sequence, output_sequence, control_sequence, sequence_length)= iterator.get_next()
    input_sequence.set_shape([batch_size, None, config.data_converter.input_depth])
    output_sequence.set_shape([batch_size, None, config.data_converter.output_depth])
    
    if not config.data_converter.control_depth:
        control_sequence = None
    
    else:
        control_sequence.set_shape([batch_size, None, config.data_converter.control_depth])

    sequence_length.set_shape([batch_size] + sequence_length.shape[1:].as_list())
        
    return {
        'input_sequence': input_sequence,
        'output_sequence': output_sequence,
        'control_sequence': control_sequence,
        'sequence_length': sequence_length
    }

"""train param & time setting"""
def train(train_dir,
          config,
          dataset_fn,
          checkpoints_to_keep= 5,
          keep_checkpoint_every_n_hours= 1,
          num_steps= None,
          master= '',
          num_sync_workers= 0,
          num_ps_tasks= 0,
          task= 0):
          
     """train loop"""
     tf.gfile.MakeDirs(train_dir)
     is_chief= (task== 0)

     with tf.Graph().as_default():
         with tf.device(tf.train.replica_device_setter(
            num_ps_tasks, merge_devices=True)):
            
             model = config.model
             model.build(config.hparams,
                         config.data_converter.output_depth,
                         is_training=True)

             """Optimizer"""
             optimizer = model.train(**_get_input_tensors(dataset_fn(), config))

             hooks = []
             if num_sync_workers:
                optimizer = tf.train.SyncReplicasOptimizer(optimizer,num_sync_workers)
                hooks.append(optimizer.make_session_run_hook(is_chief))

             grads, var_list = list(zip(*optimizer.compute_gradients(model.loss)))
             global_norm = tf.global_norm(grads)
             tf.summary.scalar('global_norm', global_norm)
            
             if config.hparams.clip_mode == 'value':
                g = config.hparams.grad_clip
                clipped_grads = [tf.clip_by_value(grad, -g, g) for grad in grads]
             elif config.hparams.clip_mode == 'global_norm':
                clipped_grads = tf.cond(
                    global_norm < config.hparams.grad_norm_clip_to_zero,
                    lambda: tf.clip_by_global_norm(  # pylint:disable=g-long-lambda
                        grads, config.hparams.grad_clip, use_norm=global_norm)[0],
                    lambda: [tf.zeros(tf.shape(g)) for g in grads])
             else:
                raise ValueError(
                    'Unknown clip_mode: {}'.format(config.hparams.clip_mode))
             train_op = optimizer.apply_gradients(
                       list(zip(clipped_grads, var_list)),
                       global_step=model.global_step,
                       name='train_step')

             logging_dict = {'global_step': model.global_step,
                            'loss': model.loss}
            
             hooks.append(tf.train.LoggingTensorHook(logging_dict, every_n_iter=100))
             if num_steps:
                hooks.append(tf.train.StopAtStepHook(last_step=num_steps))
                
             scaffold = tf.train.Scaffold(
                saver=tf.train.Saver(
                    max_to_keep=checkpoints_to_keep,
                    keep_checkpoint_every_n_hours=keep_checkpoint_every_n_hours))
            
             tf_slim.training.train(
                train_op=train_op,
                logdir=train_dir,
                scaffold=scaffold,
                hooks=hooks,
                save_checkpoint_secs= 60, # 저장 주기 시간 체크
                master=master,
                is_chief=is_chief)

def evaluate(train_dir,
             eval_dir,
             config,
             dataset_fn,
             num_batches,
             master=''):

     """Evaluate the model repeatedly."""
     tf.gfile.MakeDirs(eval_dir)

     _trial_summary(config.hparams, config.eval_examples_path or config.tfds_name, eval_dir)
     with tf.Graph().as_default():
        model = config.model
        model.build(config.hparams,
                    config.data_converter.output_depth,
                    is_training=False)

        eval_op = model.eval(**_get_input_tensors(dataset_fn().take(num_batches), config))

        hooks = [
            tf_slim.evaluation.StopAfterNEvalsHook(num_batches),
            tf_slim.evaluation.SummaryAtEndHook(eval_dir)
            ]

        tf_slim.evaluation.evaluate_repeatedly(
            train_dir,
            eval_ops=eval_op,
            hooks=hooks,
            eval_interval_secs=60,
            master=master)

# 학습 실행 함수 정의
def run(config_map,
        tf_file_reader=tf.data.TFRecordDataset,
        file_reader=tf.python_io.tf_record_iterator,
        is_training=True):
    config = config_map['drum_4bar']
    train_dir = 'D:/PozaLabs_test/VAE/one_midi_train'
    num_steps = 5000 #훈련 epoch
    
    def dataset_fn():
        return data.get_dataset(
            config,
            tf_file_reader=tf_file_reader,
            is_training=True,
            cache_dataset=True)
    
    if is_training == True:
        train(train_dir, config= config, dataset_fn= dataset_fn, num_steps= num_steps)      
    
    else:
        print("EVAL")

In [6]:
# model train fucntion 수행

run(CONFIG_MAP)

INFO:tensorflow:Building MusicVAE model with BidirectionalLstmEncoder, GrooveLstmDecoder, and hparams:
{'max_seq_len': 64, 'z_size': 512, 'free_bits': 48, 'max_beta': 0.2, 'beta_rate': 0.0, 'batch_size': 512, 'grad_clip': 1.0, 'clip_mode': 'global_norm', 'grad_norm_clip_to_zero': 10000, 'learning_rate': 0.001, 'decay_rate': 0.9999, 'min_learning_rate': 1e-05, 'conditional': True, 'dec_rnn_size': [1024, 1024], 'enc_rnn_size': [2048], 'dropout_keep_prob': 1.0, 'sampling_schedule': 'constant', 'sampling_rate': 0.0, 'use_cudnn': False, 'residual_encoder': False, 'residual_decoder': False, 'control_preprocessing_rnn_size': [256]}
INFO:tensorflow:
Encoder Cells (bidirectional):
  units: [2048]

INFO:tensorflow:
Decoder Cells:
  units: [1024, 1024]

INFO:tensorflow:Reading examples from file: D:/PozaLabs/VAE/midi_data/m.tfrecord
Instructions for updating:
Use `tf.cast` instead.
Instructions for updating:
Please use `keras.layers.Bidirectional(keras.layers.RNN(cell))`, which is equivalent to t

  self._kernel = self.add_variable(
  self._bias = self.add_variable(


Instructions for updating:
`scale_identity_multiplier` is deprecated; please combine it into `scale_diag` directly instead.


  mu = tf.layers.dense(
  sigma = tf.layers.dense(
  tf.layers.dense(


INFO:tensorflow:Create CheckpointSaverHook.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from D:/PozaLabs/VAE/one_midi_train\model.ckpt-80
Instructions for updating:
Use standard file utilities to get mtimes.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 80...
INFO:tensorflow:Saving checkpoints for 80 into D:/PozaLabs/VAE/one_midi_train\model.ckpt.
INFO:tensorflow:D:/PozaLabs/VAE/one_midi_train\model.ckpt-80.data-00000-of-00001
INFO:tensorflow:661600
INFO:tensorflow:D:/PozaLabs/VAE/one_midi_train\model.ckpt-80.index
INFO:tensorflow:661600
INFO:tensorflow:D:/PozaLabs/VAE/one_midi_train\model.ckpt-80.meta
INFO:tensorflow:664300
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 80...
INFO

### Generate

In [6]:
# 훈련 모델을 통해 4마디에 해당하는 드럼 샘플 추출 & 해당 샘플 midi format으로 저장

model= TrainedModel(config= CONFIG_MAP['drum_4bar'], batch_size= 1, checkpoint_dir_or_path= 'D:/PozaLabs_test/VAE/one_midi_train')

generated_sequence= model.sample(n= 1, length= 64, temperature= 0.5)
note_seq.sequence_proto_to_midi_file(generated_sequence[0], 'D:/PozaLabs_test/VAE/gen_midi_one/one_midi_drum_4bar.mid')

INFO:tensorflow:Building MusicVAE model with BidirectionalLstmEncoder, GrooveLstmDecoder, and hparams:
{'max_seq_len': 64, 'z_size': 512, 'free_bits': 48, 'max_beta': 0.2, 'beta_rate': 0.0, 'batch_size': 1, 'grad_clip': 1.0, 'clip_mode': 'global_norm', 'grad_norm_clip_to_zero': 10000, 'learning_rate': 0.001, 'decay_rate': 0.9999, 'min_learning_rate': 1e-05, 'conditional': True, 'dec_rnn_size': [1024, 1024], 'enc_rnn_size': [2048], 'dropout_keep_prob': 1.0, 'sampling_schedule': 'constant', 'sampling_rate': 0.0, 'use_cudnn': False, 'residual_encoder': False, 'residual_decoder': False, 'control_preprocessing_rnn_size': [256]}
INFO:tensorflow:
Encoder Cells (bidirectional):
  units: [2048]

INFO:tensorflow:
Decoder Cells:
  units: [1024, 1024]



  tf.layers.dense(
  self._kernel = self.add_variable(
  self._bias = self.add_variable(


Instructions for updating:
Use `tf.cast` instead.
Instructions for updating:
Please use `keras.layers.Bidirectional(keras.layers.RNN(cell))`, which is equivalent to this API
Instructions for updating:
Please use `keras.layers.RNN(cell)`, which is equivalent to this API
Instructions for updating:
`scale_identity_multiplier` is deprecated; please combine it into `scale_diag` directly instead.
INFO:tensorflow:Restoring parameters from D:/PozaLabs/VAE/one_midi_train\model.ckpt-80


  mu = tf.layers.dense(
  sigma = tf.layers.dense(
