In [None]:
# 재사용을 위하여, 구글 드라이브에 연결
from google.colab import drive
drive.mount('/content/gdrive/')

In [None]:
# https://colab.research.google.com/github/magenta/magenta-demos/blob/master/colab-notebooks/MusicVAE.ipynb#scrollTo=0x8YTRDwv8Gk 
# 여러가지 잡다한 문제들로, 설치 부분 대부분 그대로 사용. tf1을 사용해서 문제가 더 많은 것으로 예상. magenta==2.1.0 부분만 수정.
import glob

BASE_DIR = "gs://download.magenta.tensorflow.org/models/music_vae/colab2"

print('Installing dependencies...')
!apt-get update -qq && apt-get install -qq libfluidsynth1 fluid-soundfont-gm build-essential libasound2-dev libjack-dev
!pip install -q pyfluidsynth
!pip install -qU magenta==2.1.0

# Hack to allow python to pick up the newly-installed fluidsynth lib.
# This is only needed for the hosted Colab environment.
import ctypes.util
orig_ctypes_util_find_library = ctypes.util.find_library
def proxy_find_library(lib):
    if lib == 'fluidsynth':
        return 'libfluidsynth.so.1'
    else:
        return orig_ctypes_util_find_library(lib)
ctypes.util.find_library = proxy_find_library


print('Importing libraries and defining some helper functions...')
from google.colab import files
import magenta.music as mm
from magenta.models.music_vae import configs
from magenta.models.music_vae.trained_model import TrainedModel
import numpy as np
import os
import tensorflow.compat.v1 as tf

tf.disable_v2_behavior()

# Necessary until pyfluidsynth is updated (>1.2.5).
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

def play(note_sequence):
    mm.play_sequence(note_sequence, synth=mm.fluidsynth)

def interpolate(model, start_seq, end_seq, num_steps, max_length=32,
                assert_same_length=True, temperature=0.5,
                individual_duration=4.0):
    """Interpolates between a start and end sequence."""
    note_sequences = model.interpolate(
        start_seq, end_seq,num_steps=num_steps, length=max_length,
        temperature=temperature,
        assert_same_length=assert_same_length)

    print('Start Seq Reconstruction')
    play(note_sequences[0])
    print('End Seq Reconstruction')
    play(note_sequences[-1])
    print('Mean Sequence')
    play(note_sequences[num_steps // 2])
    print('Start -> End Interpolation')
    interp_seq = mm.sequences_lib.concatenate_sequences(
        note_sequences, [individual_duration] * len(note_sequences))
    play(interp_seq)
    mm.plot_sequence(interp_seq)
    return interp_seq if num_steps > 3 else note_sequences[num_steps // 2]

def download(note_sequence, filename):
    mm.sequence_proto_to_midi_file(note_sequence, filename)
    files.download(filename)

print('Done')

# 추가 import
from copy import deepcopy
import pathlib
import zipfile

from magenta.scripts.convert_dir_to_note_sequences import convert_directory

import note_seq

from magenta.models.music_vae import configs
from magenta.models.music_vae import data
import tf_slim

import collections

from magenta.common import merge_hparams
from magenta.contrib import training as contrib_training
from magenta.models.music_vae import data_hierarchical
from magenta.models.music_vae import lstm_models
from magenta.models.music_vae.base_model import MusicVAE

In [None]:
def make_directory(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)

# 경로 설정 및 필요한 폴더 생성
path = "/content/gdrive/MyDrive/과제/포자랩스/"

make_directory(path + "data_dir")
make_directory(path + 'train')
make_directory(path + 'gen_midi')

In [None]:
def make_tfrecord(path):
    # 파일 다운로드 + 압축 해제
    root_dir= path + 'groove'
    output_file = path + 'data_dir/music.tfrecord'
    save_groove_dataset_path = path + 'data_dir/data.zip'
    groove_midionly_url = "https://storage.googleapis.com/magentadata/datasets/groove/groove-v1.0.0-midionly.zip"

    tf.keras.utils.get_file(fname=save_groove_dataset_path, origin=groove_midionly_url, extract=True)
    zipfile.ZipFile(save_groove_dataset_path).extractall(path)

    # 아래 함수를 이용해서 데이터 전처리

    # https://github.com/magenta/magenta/blob/77ed668af96edea7c993d38973b9da342bd31e82/magenta/scripts/convert_dir_to_note_sequences.py
    # def convert_directory(root_dir, output_file, recursive=False):
    #   """Converts files to NoteSequences and writes to `output_file`.
    #   Input files found in `root_dir` are converted to NoteSequence protos with the
    #   basename of `root_dir` as the collection_name, and the relative path to the
    #   file from `root_dir` as the filename. If `recursive` is true, recursively
    #   converts any subdirectories of the specified directory.
    #   Args:
    #     root_dir: A string specifying a root directory.
    #     output_file: Path to TFRecord file to write results to.
    #     recursive: A boolean specifying whether or not recursively convert files
    #         contained in subdirectories of the specified directory.
    #   """
    #   with tf.io.TFRecordWriter(output_file) as writer:
    #     convert_files(root_dir, '', writer, recursive)

    convert_directory(root_dir, output_file, recursive=True)
    
#   def convert_files(root_dir, sub_dir, writer, recursive=False):
#       """Converts files.
#       Args:
#         root_dir: A string specifying a root directory.
#         sub_dir: A string specifying a path to a directory under `root_dir` in which
#             to convert contents.
#         writer: A TFRecord writer
#         recursive: A boolean specifying whether or not recursively convert files
#             contained in subdirectories of the specified directory.
#       Returns:
#         A map from the resulting Futures to the file paths being converted.
#       """
#       dir_to_convert = os.path.join(root_dir, sub_dir)
#       tf.logging.info("Converting files in '%s'.", dir_to_convert)
#       files_in_dir = tf.gfile.ListDirectory(os.path.join(dir_to_convert))
#       recurse_sub_dirs = []
#       written_count = 0
#       for file_in_dir in files_in_dir:
#         tf.logging.log_every_n(tf.logging.INFO, '%d files converted.',
#                                1000, written_count)
#         full_file_path = os.path.join(dir_to_convert, file_in_dir)
#         if (full_file_path.lower().endswith('.mid') or
#             full_file_path.lower().endswith('.midi')):
#           try:
#             sequence = convert_midi(root_dir, sub_dir, full_file_path)
#           except Exception as exc:  # pylint: disable=broad-except
#             tf.logging.fatal('%r generated an exception: %s', full_file_path, exc)
#             continue
#           if sequence:
#             writer.write(sequence.SerializeToString())
#         elif (full_file_path.lower().endswith('.xml') or
#               full_file_path.lower().endswith('.mxl')):
#           try:
#             sequence = convert_musicxml(root_dir, sub_dir, full_file_path)
#           except Exception as exc:  # pylint: disable=broad-except
#             tf.logging.fatal('%r generated an exception: %s', full_file_path, exc)
#             continue
#           if sequence:
#             writer.write(sequence.SerializeToString())
#         elif full_file_path.lower().endswith('.abc'):
#           try:
#             sequences = convert_abc(root_dir, sub_dir, full_file_path)
#           except Exception as exc:  # pylint: disable=broad-except
#             tf.logging.fatal('%r generated an exception: %s', full_file_path, exc)
#             continue
#           if sequences:
#             for sequence in sequences:
#               writer.write(sequence.SerializeToString())
#         else:
#           if recursive and tf.gfile.IsDirectory(full_file_path):
#             recurse_sub_dirs.append(os.path.join(sub_dir, file_in_dir))
#           else:
#             tf.logging.warning(
#                 'Unable to find a converter for file %s', full_file_path)

#       for recurse_sub_dir in recurse_sub_dirs:
#         convert_files(root_dir, recurse_sub_dir, writer, recursive)

In [None]:
def get_config(batch_size = 128, num_bar = 4):
    # https://github.com/magenta/magenta/blob/main/magenta/models/music_vae/configs.py
    # 에서 필요한 부분 수정 논문의 구현과 비슷하게, 드럼만 뽑아내는 coonfig 작성

    """Configurations for MusicVAE models."""


    HParams = contrib_training.HParams


    class Config(collections.namedtuple(
        'Config',
        ['model', 'hparams', 'note_sequence_augmenter', 'data_converter',
            'train_examples_path', 'eval_examples_path', 'tfds_name'])):

        def values(self):
            return self._asdict()

    Config.__new__.__defaults__ = (None,) * len(Config._fields)


    def update_config(config, update_dict):
        config_dict = config.values()
        config_dict.update(update_dict)
        return Config(**config_dict)

    # https://github.com/magenta/magenta/issues/1549 참고
    CONFIG_MAP = Config(
        model=MusicVAE(
            lstm_models.BidirectionalLstmEncoder(), 
            lstm_models.HierarchicalLstmDecoder(
                    lstm_models.SplitMultiOutLstmDecoder(
                        core_decoders=[
                            lstm_models.CategoricalLstmDecoder()],
                        output_depths=[
                            512,  # drums
                    ]),
                    level_lengths=[16, num_bar],
                    disable_autoregression=True)), #Hierarchical Decoder
        hparams=merge_hparams(
            lstm_models.get_default_hparams(),
            HParams(
                batch_size=batch_size, # 512
                max_seq_len=16*num_bar,
                z_size=512, # 512
                enc_rnn_size=[2048, 2048], # 2048, 2048
                dec_rnn_size=[1024, 1024], # 1024, 1024
                free_bits=256,
                max_beta=0.2,
            )),
        note_sequence_augmenter=None,
        data_converter=data.DrumsConverter(
            max_bars=num_bar,  # Truncate long drum sequences before slicing.
            slice_bars=num_bar,
            steps_per_quarter=num_bar,
            roll_input=True),
        train_examples_path=path + 'data_dir/music.tfrecord',
        eval_examples_path=None,
        tfds_name='groove-4bar-midionly',
    )

    return CONFIG_MAP

In [None]:
class Trainer():
    def __init__(self, config_map, path):
        self.config_map = config_map
        self.path = path

    # https://github.com/magenta/magenta/blob/77ed668af96edea7c993d38973b9da342bd31e82/magenta/models/music_vae/music_vae_train.py
    # 에서 필요한 부분 뽑아서 학습 코드 작성

    # Should not be called from within the graph to avoid redundant summaries.
    @staticmethod
    def _trial_summary(hparams, examples_path, output_dir):
        """Writes a tensorboard text summary of the trial."""

        examples_path_summary = tf.summary.text(
            'examples_path', tf.constant(examples_path, name='examples_path'),
            collections=[])

        hparams_dict = hparams.values()

        # Create a markdown table from hparams.
        header = '| Key | Value |\n| :--- | :--- |\n'
        keys = sorted(hparams_dict.keys())
        lines = ['| %s | %s |' % (key, str(hparams_dict[key])) for key in keys]
        hparams_table = header + '\n'.join(lines) + '\n'

        hparam_summary = tf.summary.text(
            'hparams', tf.constant(hparams_table, name='hparams'), collections=[])

        with tf.Session() as sess:
            writer = tf.summary.FileWriter(output_dir, graph=sess.graph)
            writer.add_summary(examples_path_summary.eval())
            writer.add_summary(hparam_summary.eval())
            writer.close()

    @staticmethod
    def _get_input_tensors(dataset, config):
        """Get input tensors from dataset."""
        batch_size = config.hparams.batch_size
        iterator = tf.data.make_one_shot_iterator(dataset)
        (input_sequence, output_sequence, control_sequence,
        sequence_length) = iterator.get_next()
        input_sequence.set_shape(
            [batch_size, None, config.data_converter.input_depth])
        output_sequence.set_shape(
            [batch_size, None, config.data_converter.output_depth])
        if not config.data_converter.control_depth:
            control_sequence = None
        else:
            control_sequence.set_shape(
                [batch_size, None, config.data_converter.control_depth])
        sequence_length.set_shape([batch_size] + sequence_length.shape[1:].as_list())

        return {
            'input_sequence': input_sequence,
            'output_sequence': output_sequence,
            'control_sequence': control_sequence,
            'sequence_length': sequence_length
        }

    def train(self, train_dir,
            config,
            dataset_fn,
            checkpoints_to_keep=5,
            keep_checkpoint_every_n_hours=1,
            num_steps=None,
            master='',
            num_sync_workers=0,
            num_ps_tasks=0,
            task=0):
        """Train loop."""
        tf.gfile.MakeDirs(train_dir)
        is_chief = (task == 0)
        if is_chief:
            self._trial_summary(
                config.hparams, config.train_examples_path or config.tfds_name,
                train_dir)
        with tf.Graph().as_default():
            with tf.device(tf.train.replica_device_setter(
                num_ps_tasks, merge_devices=True)):

                model = config.model
                model.build(config.hparams,
                            config.data_converter.output_depth,
                            is_training=True)

                optimizer = model.train(**self._get_input_tensors(dataset_fn(), config))

                hooks = []
                if num_sync_workers:
                    optimizer = tf.train.SyncReplicasOptimizer(
                        optimizer,
                        num_sync_workers)
                    hooks.append(optimizer.make_session_run_hook(is_chief))

                grads, var_list = list(zip(*optimizer.compute_gradients(model.loss)))
                global_norm = tf.global_norm(grads)
                tf.summary.scalar('global_norm', global_norm)

                if config.hparams.clip_mode == 'value':
                    g = config.hparams.grad_clip
                    clipped_grads = [tf.clip_by_value(grad, -g, g) for grad in grads]
                elif config.hparams.clip_mode == 'global_norm':
                    clipped_grads = tf.cond(
                        global_norm < config.hparams.grad_norm_clip_to_zero,
                        lambda: tf.clip_by_global_norm(  # pylint:disable=g-long-lambda
                            grads, config.hparams.grad_clip, use_norm=global_norm)[0],
                        lambda: [tf.zeros(tf.shape(g)) for g in grads])
                else:
                    raise ValueError(
                        'Unknown clip_mode: {}'.format(config.hparams.clip_mode))
                train_op = optimizer.apply_gradients(
                    list(zip(clipped_grads, var_list)),
                    global_step=model.global_step,
                    name='train_step')

                logging_dict = {'global_step': model.global_step,
                                'loss': model.loss}

                hooks.append(tf.train.LoggingTensorHook(logging_dict, every_n_iter=100))
                if num_steps:
                    hooks.append(tf.train.StopAtStepHook(last_step=num_steps))

                scaffold = tf.train.Scaffold(
                    saver=tf.train.Saver(
                        max_to_keep=checkpoints_to_keep,
                        keep_checkpoint_every_n_hours=keep_checkpoint_every_n_hours))
                tf_slim.training.train(
                    train_op=train_op,
                    logdir=train_dir,
                    scaffold=scaffold,
                    hooks=hooks,
                    save_checkpoint_secs=60,
                    master=master,
                    is_chief=is_chief)


    def run(self, num_steps=100,
            tf_file_reader=tf.data.TFRecordDataset,
            file_reader=tf.python_io.tf_record_iterator):
        """Load model params, save config file and start trainer.
        Args:
        config_map: Dictionary mapping configuration name to Config object.
        tf_file_reader: The tf.data.Dataset class to use for reading files.
        file_reader: The Python reader to use for reading files.
        Raises:
        ValueError: if required flags are missing or invalid.
        """
        train_dir = os.path.join(self.path, 'train')

        config = self.config_map

        def dataset_fn():
            return data.get_dataset(
                config,
                tf_file_reader=tf_file_reader,
                is_training=True,
                cache_dataset=True)
            
        self.train(
            train_dir,
            config=config,
            dataset_fn=dataset_fn,
            num_steps=num_steps)

In [None]:
def music_vae_generate(config_map, path, output_dir, num_steps, temperature = 1.5, num_gens = 5, length_gens = 32, batch_size = 1):
    # https://github.com/magenta/magenta/blob/main/magenta/models/music_vae/music_vae_generate.py
    # 에서 필요한 부분 뽑아서 작성

    model = TrainedModel(
        config=config_map, batch_size=batch_size,
        checkpoint_dir_or_path=path + '/train')

    results = model.sample(
        n=num_gens, 
        length=length_gens, 
        temperature=temperature)

    # https://colab.research.google.com/github/magenta/magenta-demos/blob/master/colab-notebooks/MusicVAE.ipynb#scrollTo=0x8YTRDwv8Gk 
    # 에서 다운로드 부분 사용
    for i, ns in enumerate(results):
        download(ns, output_dir + '/%s_sample_step_%d_%d.mid' % ("groovae_4bar", num_steps, i))

In [None]:
CONFIG_MAP = get_config(batch_size=512)
model_configs = deepcopy(CONFIG_MAP)

# 학습 데이터 전처리
make_tfrecord(path)

# 모델 학습
num_steps = 5555
trainer = Trainer(CONFIG_MAP, path)
trainer.run(num_steps=num_steps)

# 음악 생성
for temp in np.arange(0.5, 16.0, 1.0):
    music_vae_generate(model_configs, path, path + 'gen_midi', num_steps, temperature = temp, length_gens = 72)

# 가벼운 모델 이야기

https://github.com/magenta/magenta/blob/77ed668af96edea7c993d38973b9da342bd31e82/magenta/models/music_vae/base_model.py
https://github.com/magenta/magenta/blob/77ed668af96edea7c993d38973b9da342bd31e82/magenta/models/music_vae/lstm_models.py
https://github.com/magenta/magenta/blob/77ed668af96edea7c993d38973b9da342bd31e82/magenta/contrib/rnn.py
https://github.com/magenta/magenta/blob/77ed668af96edea7c993d38973b9da342bd31e82/magenta/models/music_vae/lstm_utils.py
에서 모델을 자세히 볼 수 있습니다. 아래 내용 중, 혹시 틀린 부분이 있을 경우, 알려주시면 감사하겠습니다! 핵심만 쓰기 위해, 많은 중간과정을 생략하고 있습니다.

## ENCODER
ENCODER는 논문과 같이 latent distribution parameters(μ and σ)를 만드는데 필요한, BiLSTM의 final state vectors를 얻습니다.

## HIERARCHICAL DECODER
DECODER는 논문과 같이 HIERARCHICAL DECODER를 사용합니다. 입력 sequence(drum midi)를 4마디마다 subsequences로 분할합니다. subsequences를 unidirectional LSTM에 넣어, 각각의 embedding vectors c(Conductuor)를 듭니다. 이를 이용하여, 생성을 진행합니다. 이러한 방법으로 긴 sequence도 학습 및 생성을 진행할 수 있습니다.

## Multi-Stream Modeling
우리의 목표는, 4마디에 해당하는 drum 샘플을 뽑아내는 것입니다. 따라서 Multi-Stream Modeling는 사실상 진행하지 않습니다. 다만 구현된 모델에 맞게 학습을 위해, drum distributions만 설정하게 사용했습니다. Multi-Stream Modeling은 다양한 signal로 연주하기 위해서, independent한 3 separate distributions(drum, bass, and melody)을 만듭니다. 이는 별도의 DECODER를 사용하여, 생성을 진행합니다.

## ETC
이외에 사항들은 VAE 생성 방식과 유사합니다.

## Some code

```
    Encodes input sequences into a precursors for latent code `z`.
    Args:
       sequence: Batch of sequences to encode.
       sequence_length: Length of sequences in input batch.
    Returns:
       outputs: Raw outputs to parameterize the prior distribution in
          MusicVae.encode, sized `[batch_size, N]`.
    ....
    
    last_h_fw = states_fw[-1][-1].h
    last_h_bw = states_bw[-1][-1].h

    return tf.concat([last_h_fw, last_h_bw], 1)
```

```
    """Initializer for HierarchicalLstmDecoder.
    Hierarchicaly decodes a sequence across time.
    Each sequence is padded per-segment. For example, a sequence with
    three segments [1, 2, 3], [4, 5], [6, 7, 8 ,9] and a `max_seq_len` of 12
    is represented as `sequence = [1, 2, 3, 0, 4, 5, 0, 0, 6, 7, 8, 9]` with
    `sequence_length = [3, 2, 4]`.
    `z` initializes the first level LSTM to produce embeddings used to
    initialize the states of LSTMs at subsequent levels. The lowest-level
    embeddings are then passed to the given `core_decoder` to generate the
    final outputs.
    This decoder has 3 modes for what is used as the inputs to the LSTMs
    (excluding those in the core decoder):
      Autoregressive: (default) The inputs to the level `l` decoder are the
        final states of the level `l+1` decoder.
      Non-autoregressive: (`disable_autoregression=True`) The inputs to the
        hierarchical decoders are 0's.
      Re-encoder: (`hierarchical_encoder` provided) The inputs to the level `l`
        decoder are re-encoded outputs of level `l+1`, using the given encoder's
        matching level.
    Args:
      core_decoder: The BaseDecoder implementation to use at the output level.
      level_lengths: A list of the number of outputs of each level of the
        hierarchy. The final level is the (padded) maximum length. The product
        of the lengths must equal `hparams.max_seq_len`.
      disable_autoregression: Whether to disable the autoregression within the
        hierarchy. May also be a collection of levels on which to disable.
      hierarchical_encoder: (Optional) A HierarchicalLstmEncoder instance to use
        for re-encoding the decoder outputs at each level for use as inputs to
        the next level up in the hierarchy, instead of the final decoder state.
        The encoder level output lengths (except for the final single-output
        level) should be the reverse of `level_output_lengths`.
    Raises:
      ValueError: If `hierarchical_encoder` is given but has incompatible level
        lengths.
    """
```

## Reference
- A Hierarchical Latent Vector Model for Learning Long-Term Structure in Music https://arxiv.org/pdf/1803.05428.pdf
- magenta/magenta https://github.com/magenta/magenta
- Groove MIDI Dataset https://magenta.tensorflow.org/datasets/groove
- magenta-demos Colab https://colab.research.google.com/github/magenta/magenta-demos/blob/master/colab-notebooks/MusicVAE.ipynb
- MusicVAE - Training stops at epoch 0 with no output or explanation. #1549  https://github.com/magenta/magenta/issues/1549
- tf.keras.utils.get_file https://www.tensorflow.org/api_docs/python/tf/keras/utils/get_file
- zipfile — ZIP 아카이브 작업 https://docs.python.org/ko/3/library/zipfile.html