# Basic Instructions

1. Double click on the hidden cells to make them visible, or select "View > Expand Sections" in the menu at the top.
2. Hover over the "`[ ]`" in the top-left corner of each cell and click on the "Play" button to run it, in order.
3. Listen to the generated samples.
4. Make it your own: copy the notebook, modify the code, train your own models, upload your own MIDI, etc.!

# 환경 설정
주피터노트북 환경에서 apt-get 커멘드가 동작 하지않아 코렙 환경에서 Colab Notebook Pre-trained Models 을 참조


In [28]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [26]:
'''!apt-get update -qq && apt-get install -qq libfluidsynth1 fluid-soundfont-gm build-essential libasound2-dev libjack-dev
!pip install -q pyfluidsynth
!pip install magenta==2.1.0
!pip install -qU magenta'''

from google.colab import files
import ctypes.util

import glob
import os
import numpy as np
import pandas as pd
import tensorflow.compat.v1 as tf
import magenta.music as mm
import collections
import note_seq
import h5py

from magenta.models.music_vae.trained_model import TrainedModel
from magenta.scripts.convert_dir_to_note_sequences import convert_directory
import magenta.models.music_vae as musicvae
from magenta.models.music_vae import configs
from magenta.common import merge_hparams
from magenta.models.music_vae import data
from magenta.models.music_vae import data_hierarchical
from magenta.models.music_vae import lstm_models
from magenta.models.music_vae.base_model import MusicVAE

In [5]:
orig_ctypes_util_find_library = ctypes.util.find_library
def proxy_find_library(lib):
  if lib == 'fluidsynth':
    return 'libfluidsynth.so.1'
  else:
    return orig_ctypes_util_find_library(lib)
ctypes.util.find_library = proxy_find_library


print('Importing libraries and defining some helper functions...')


tf.disable_v2_behavior()

# Necessary until pyfluidsynth is updated (>1.2.5).
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

def play(note_sequence):
  mm.play_sequence(note_sequence, synth=mm.fluidsynth)

def interpolate(model, start_seq, end_seq, num_steps, max_length=32,
                assert_same_length=True, temperature=0.5,
                individual_duration=4.0):
  """Interpolates between a start and end sequence."""
  note_sequences = model.interpolate(
      start_seq, end_seq,num_steps=num_steps, length=max_length,
      temperature=temperature,
      assert_same_length=assert_same_length)

  print('Start Seq Reconstruction')
  play(note_sequences[0])
  print('End Seq Reconstruction')
  play(note_sequences[-1])
  print('Mean Sequence')
  play(note_sequences[num_steps // 2])
  print('Start -> End Interpolation')
  interp_seq = mm.sequences_lib.concatenate_sequences(
      note_sequences, [individual_duration] * len(note_sequences))
  play(interp_seq)
  mm.plot_sequence(interp_seq)
  return interp_seq if num_steps > 3 else note_sequences[num_steps // 2]

Instructions for updating:
non-resource variables are not supported in the long term


Importing libraries and defining some helper functions...


In [35]:
'''Project Attributes'''

midi_data = "./drive/MyDrive/Colab Notebooks/musicVAE/groove" #data file root
midi_dir = "./drive/MyDrive/Colab Notebooks/musicVAE/groove/info.csv" #csv file root
tfrecord_root = "./drive/MyDrive/Colab Notebooks/musicVAE/music_tfrecord"# tfrecord root
check_point_dir = "./drive/MyDrive/Colab Notebooks/musicVAE/checkpoints/" #check_point root
BASE_DIR = "gs://download.magenta.tensorflow.org/models/music_vae/colab2" #2_check_point root
musicvae_model_name = 'cat-drums_2bar_small' # pretrained MusicVAE model
data_path = './drive/MyDrive/Colab Notebooks/musicVAE/music_tfrecord'

In [None]:
HParams = contrib_training.HParams


class Config(collections.namedtuple(
    'Config',
    ['model', 'hparams', 'note_sequence_augmenter', 'data_converter',
     'train_examples_path', 'eval_examples_path', 'tfds_name'])):

  def values(self):
    return self._asdict()

Config.__new__.__defaults__ = (None,) * len(Config._fields)


def update_config(config, update_dict):
  config_dict = config.values()
  config_dict.update(update_dict)
  return Config(**config_dict)


CONFIG_MAP = {}


# Melody
CONFIG_MAP['cat-mel_2bar_small'] = Config(
    model=MusicVAE(lstm_models.BidirectionalLstmEncoder(),
                   lstm_models.CategoricalLstmDecoder()),
    hparams=merge_hparams(
        lstm_models.get_default_hparams(),
        HParams(
            batch_size=512,
            max_seq_len=32,  # 2 bars w/ 16 steps per bar
            z_size=256,
            enc_rnn_size=[512],
            dec_rnn_size=[256, 256],
            free_bits=0,
            max_beta=0.2,
            beta_rate=0.99999,
            sampling_schedule='inverse_sigmoid',
            sampling_rate=1000,
        )),
    note_sequence_augmenter=data.NoteSequenceAugmenter(transpose_range=(-5, 5)),
    data_converter=data.OneHotMelodyConverter(
        valid_programs=data.MEL_PROGRAMS,
        skip_polyphony=False,
        max_bars=100,  # Truncate long melodies before slicing.
        slice_bars=2,
        steps_per_quarter=4),
    train_examples_path='./drive/MyDrive/Colab Notebooks/musicVAE/music_tfrecord',
    eval_examples_path=None,
)

In [11]:
# data load
df = pd.read_csv(midi_dir)
df = pd.DataFrame(df)
df.head(5)

Unnamed: 0,drummer,session,id,style,bpm,beat_type,time_signature,midi_filename,audio_filename,duration,split
0,drummer1,drummer1/eval_session,drummer1/eval_session/1,funk/groove1,138,beat,4-4,drummer1/eval_session/1_funk-groove1_138_beat_...,drummer1/eval_session/1_funk-groove1_138_beat_...,27.872308,test
1,drummer1,drummer1/eval_session,drummer1/eval_session/10,soul/groove10,102,beat,4-4,drummer1/eval_session/10_soul-groove10_102_bea...,drummer1/eval_session/10_soul-groove10_102_bea...,37.691158,test
2,drummer1,drummer1/eval_session,drummer1/eval_session/2,funk/groove2,105,beat,4-4,drummer1/eval_session/2_funk-groove2_105_beat_...,drummer1/eval_session/2_funk-groove2_105_beat_...,36.351218,test
3,drummer1,drummer1/eval_session,drummer1/eval_session/3,soul/groove3,86,beat,4-4,drummer1/eval_session/3_soul-groove3_86_beat_4...,drummer1/eval_session/3_soul-groove3_86_beat_4...,44.716543,test
4,drummer1,drummer1/eval_session,drummer1/eval_session/4,soul/groove4,80,beat,4-4,drummer1/eval_session/4_soul-groove4_80_beat_4...,drummer1/eval_session/4_soul-groove4_80_beat_4...,47.9875,test


In [57]:
convert_directory(midi_data,tfrecord_root, True)

In [None]:
'''
@https://github.com/maxwells-daemons/accompany-music-vae/blob/master/data_utils/tfrecord_to_hdf5.py
Turn a tfrecord of NoteSequences into an HDF5 dataset with instruments split
and the pretrained trio mode's latent vectors.
'''

from copy import deepcopy
from time import time
import itertools as it

import click
import logging
from pprint import pformat

import numpy as np
import h5py

import tensorflow as tf
tf.logging.set_verbosity(tf.logging.ERROR)  # noqa

import magenta.music as mm
from magenta.models.music_vae import configs
from magenta.models.music_vae.trained_model import TrainedModel

from constants import (TIMESTEPS, DIM_MELODY, DIM_BASS, DIM_DRUMS, DIM_TRIO)

# Constants
MODEL_NAME = 'cat-drums_2bar_small'


@click.command()
@click.argument('input_file', type=click.Path(exists=True))
@click.argument('output_file', type=click.Path(exists=False))
@click.option('--include_all_instruments', type=bool, default=False)
@click.option('--chunk_size', type=click.IntRange(min=1), default=128,
              help='Number of MIDI files to read at once.')
@click.option('--buffer_size', type=click.IntRange(min=1), default=50000,
              help='Number of examples to make room for at a time.')
@click.option('--batch_size', type=click.IntRange(min=1), default=256,
              help='Batch size for the pretrained model.')
@click.option('--checkpoint', type=click.Path(),
              default='./models/pretrained/{}.ckpt'.format(MODEL_NAME),
              help='Checkpoint to use for the pretrained model.')
@click.option('--log_period', type=click.IntRange(min=0), default=1,
              help='How many chunks pass between logging lines.')
@click.option('--log_file', type=click.Path(),
              default='logs/split_dataset.log')
def main(input_file, output_file,
         include_all_instruments, chunk_size, buffer_size, batch_size,
         checkpoint, log_period, log_file):
    args = locals()
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    log = logging.getLogger(__name__)
    handler = logging.FileHandler(log_file)
    handler.setFormatter(formatter)
    handler.setLevel(logging.DEBUG)
    log.addHandler(handler)
    log.setLevel(logging.DEBUG)

    log.info('Generating melody dataset with args:\n' + pformat(args))
    total_start_time = time()
    ns_gen = mm.note_sequence_io.note_sequence_record_iterator(input_file)
    ns_iter = iter(ns_gen)
    config = configs.CONFIG_MAP[MODEL_NAME]
    trio_converter = config.data_converter

    log.debug('Creating HDF5 store...')
    start_time = time()
    with h5py.File(output_file, 'w') as data_file:
        dataset_size = buffer_size
        ds_melody = data_file.create_dataset(
            'melody',
            (dataset_size, TIMESTEPS, DIM_MELODY),
            maxshape=(None, TIMESTEPS, DIM_MELODY),
            dtype=np.bool
        )
        ds_code = data_file.create_dataset(
            'code',
            (dataset_size, config.hparams.z_size),
            maxshape=(None, config.hparams.z_size),
            dtype=np.float32
        )

        if include_all_instruments:
            ds_trio = data_file.create_dataset(
                'trio',
                (dataset_size, TIMESTEPS, DIM_TRIO),
                maxshape=(None, TIMESTEPS, DIM_TRIO),
                dtype=np.bool
            )
            ds_bass = data_file.create_dataset(
                'bass',
                (dataset_size, TIMESTEPS, DIM_BASS),
                maxshape=(None, TIMESTEPS, DIM_BASS),
                dtype=np.bool
            )
            ds_drums = data_file.create_dataset(
                'drums',
                (dataset_size, TIMESTEPS, DIM_DRUMS),
                maxshape=(None, TIMESTEPS, DIM_DRUMS),
                dtype=np.bool
            )

        log.debug('Done creating HDF5 store (time: {0:.1f}s)'
                  .format(time() - start_time))

        log.debug('Loading model...')
        start_time = time()
        model = TrainedModel(config, batch_size=batch_size,
                             checkpoint_dir_or_path=checkpoint)
        log.debug('Done loading model (time: {0:.1f}s)'
                  .format(time() - start_time))

        log.info('Beginning dataset creation...')
        i_chunk = 0
        i_example = 0
        try:
            while True:
                i_chunk += 1
                log.disabled = i_chunk % log_period != 0 or not log_period
                chunk_time = time()

                log.debug('Processing a chunk of NoteSequences...')
                start_time = time()

                note_sequences = list(it.islice(ns_iter, chunk_size))
                if not note_sequences:
                    break

                trio_tensors = map(
                    lambda seq: trio_converter.to_tensors(seq).outputs,
                    note_sequences
                )
                trio_tensors = it.chain.from_iterable(trio_tensors)
                trio_tensors = list(
                    filter(lambda t: t.shape == (TIMESTEPS, DIM_TRIO),
                           trio_tensors)
                )

                # Ensure an example doesn't overflow the allocated space
                trio_tensors = trio_tensors[:buffer_size]
                n_tensors = len(trio_tensors)
                i_last = n_tensors + i_example

                melody_tensors = list(map(lambda t: t[:, :DIM_MELODY],
                                          trio_tensors))

                if include_all_instruments:
                    bass_tensors = list(map(
                        lambda t: t[:, DIM_MELODY:DIM_MELODY + DIM_BASS],
                        trio_tensors
                    ))
                    drums_tensors = list(map(lambda t: t[:, -DIM_DRUMS:],
                                             trio_tensors))

                log.debug('Done processing NoteSequences (time: {0:.1f}s)'
                          .format(time() - start_time))

                log.debug('Running encoder...')
                start_time = time()
                _, codes, _ = model.encode_tensors(deepcopy(trio_tensors),
                                                   [TIMESTEPS] * n_tensors)
                log.debug('Done running encoder (time: {0:.1f}s)'
                          .format(time() - start_time))

                if i_last >= dataset_size:
                    dataset_size += buffer_size
                    log.info('Resizing datasets to size:', dataset_size)
                    ds_melody.resize((dataset_size, TIMESTEPS, DIM_MELODY))
                    ds_code.resize((dataset_size, config.hparams.z_size))

                    if include_all_instruments:
                        ds_trio.resize((dataset_size, TIMESTEPS, DIM_TRIO))
                        ds_bass.resize((dataset_size, TIMESTEPS, DIM_BASS))
                        ds_drums.resize((dataset_size, TIMESTEPS, DIM_DRUMS))

                log.debug('Writing examples to HDF5...')
                start_time = time()
                ds_melody[i_example:i_last, :, :] = np.array(melody_tensors)
                ds_code[i_example:i_last, :] = np.array(codes)

                if include_all_instruments:
                    ds_trio[i_example:i_last, :, :] = np.array(trio_tensors)
                    ds_bass[i_example:i_last, :, :] = np.array(bass_tensors)
                    ds_drums[i_example:i_last, :, :] = np.array(drums_tensors)

                log.debug('Done writing examples to HDF5 (time: {0:.1f}s)'
                          .format(time() - start_time))

                i_example += n_tensors

                log.info(('Chunk {0} wrote {1} examples ' +
                         '(total: {2}; time: {3:.1f}s)')
                         .format(i_chunk, n_tensors, i_example,
                                 time() - chunk_time))
        except StopIteration:
            pass

    log.debug('Finished writing data')
    log.debug('Resizing datasets...')
    dataset_size = i_example
    ds_melody.resize((dataset_size, TIMESTEPS, DIM_MELODY))
    ds_code.resize((dataset_size, config.hparams.z_size))
    if include_all_instruments:
        ds_trio.resize((dataset_size, TIMESTEPS, DIM_TRIO))
        ds_bass.resize((dataset_size, TIMESTEPS, DIM_BASS))
        ds_drums.resize((dataset_size, TIMESTEPS, DIM_DRUMS))
    log.debug('Done resizing datasets...')

    total_time = time() - total_start_time
    log.info('Finished creating HDF5 dataset')
    log.info('Total examples: {}'.format(i_example))
    log.info('Total chunks: {}'.format(i_chunk))
    log.info('Total time: {0:.1f}s'.format(total_time))
    log.info('Done!')


if __name__ == '__main__':
    main()

In [34]:
import keras.layers
import keras.models

def get_model(name='encoder', optimizer=None):
    '''
    Get the compiled surrogate encoder model.
    Parameters
    ----------
    name : str (option)
        The model name.
    optimizer : default rmsprop optimizer.
    '''
    input_layer = keras.layers.Input(shape=(256, 90), name='input')
    for i in range(2):
        layer = keras.layers.Bidirectional(
            keras.layers.LSTM(128,
                         return_sequences = True,
                         name='bi_lstm_{}'.format(i)))(layer)
    output_layer = keras.layers.Dense(512, activation='linear', name='output')(input_layer)
    model = keras.models.Model(inputs=input_layer, 
                               outputs=output_layer)

    optimizer = keras.optimizers.rmsprop(lr=0.0005, clipnorm=1.)
    model.compile(optimizer=optimizer, loss='mean_squared_error')
    model.name = name

    return model

    '''@출처 : https://github.com/maxwells-daemons/accompany-music-vae/blob/master/model.py'''

In [None]:
def train_model(model, batch_size, epochs, data_path):
    '''
    Train a surrogate encoder model.
    Parameters
    ----------
    model : None, keras Model, The model to train.
        If None, initializes a new default model.
    batch_size : int
        Batch size for training.
    epochs : int
        Number of epochs to train for.
    data_path : str path to hdf5
        Path to the file of training data.
    Raises
    ------
    AssertionError
        If model is a str but not a valid path.
    '''

    if not model:
        model = get_model()
    elif isinstance(model, str):
        assert(os.path.exists(model))
        model = keras.load_model(model)
    # Otherwise, assume the model is a Keras model

    # TODO: LambdaCallback to produce and save samples at each epoch
    checkpointer = keras.callbacks.ModelCheckpoint(
        os.path.join(
            check_point_dir,
            model.name + '_train_{epoch:02d}-{val_loss:.4f}.hdf5'
        ),
        save_best_only=False, verbose=1
    )
    callbacks = checkpointer

    data_file = h5py.File(data_path, 'r')
    data_dir = os.path.dirname(data_path)
    train_seq = HDF5Sequence(
        data_file, batch_size,
        index_path=os.path.join(data_dir, 'train_indices.csv'))
    val_seq = HDF5Sequence(
        data_file, batch_size,
        index_path=os.path.join(data_dir, 'val_indices.csv'))

    model.fit_generator(train_seq, steps_per_epoch=len(train_seq),
                        validation_data=val_seq, validation_steps=len(val_seq),
                        max_queue_size=128, workers=32, epochs=20,
                        callbacks=callbacks)


if __name__ == '__main__':
    train_model()

In [33]:
def generate_accompaniment(seq, surrogate_encoder, musicvae=None,
                           stitch=True, extract_melody=True,
                           remove_controls=True, temperature=0.1):
    '''
    Generate accompaniment for an input sequence.
    Parameters
    ----------
    seq : str path to midi or NoteSequence
        The input sequence.
    surrogate_encoder : keras Model
        The model to map melodies to latent vectors.
    musicvae : None or Magenta MusicVAE (optional)
        The MusicVAE to use for decoding.
        If None, loads the default MusicVAE.
        NOTE: For quickly performing inference on multiple input batches,
        preload the default MusicVAE outside this function and pass it in.
    stitch : bool (optional)
        Whether to stitch in the original melody or leave the decoded sequence.
    extract_melody : bool (optional)
        Whether to treat the input as a trio and extract the melody.
    remove_controls : bool (optional)
        Whether to delete tempo changes, time changes, etc from the base midi.
    temperature : float (optional)
        Temperature to use in the trio decoder.
    Returns
    -------
    NoteSequence
        The input sequence along with generated accompaniment.
    '''

    config = configs.CONFIG_MAP['cat-mel_2bar_small']
    melody_converter = config.data_converter._melody_converter

    musicvae = trained_model.TrainedModel(
        config, batch_size = 4,
        checkpoint_dir_or_path= os.join(
            BASE_DIR + musicvae_model_name + '.ckpt')

    # If the sequence is provided as a MIDI path, load it
    if isinstance(seq, str):
        midi = None
        with open(seq, 'rb') as midi_file:
            midi = midi_file.read()
        seq = mm.midi_to_sequence_proto(midi)

    if remove_controls:
        del seq.tempos[1:]
        del seq.time_signatures[1:]
        del seq.control_changes[1:]

    if extract_melody:
        seq = strip_to_melody(seq)

    # Convert the input NoteSequence to a single-instrument tensor
    melody_tracks = melody_converter.to_tensors(seq).outputs
    instrument_counts = [np.sum(melody_tracks[i][:, 1:])
                         for i in range(len(melody_tracks))]
    instrument_idx = np.argmax(instrument_counts)
    melody_tensor = melody_tracks[instrument_idx]

    # Slice the melody into non-overlapping windows
    windows = [melody_tensor[i * TIMESTEPS:(i+1) * TIMESTEPS, :]
               for i in range(melody_tensor.shape[0] // TIMESTEPS + 1)]
    windows[-1] = np.pad(windows[-1],
                         [(0, max(0, TIMESTEPS - windows[-1].shape[0])),
                          (0, 0)],
                         mode='constant')
    windows_stacked = np.stack(windows)

    # Perform inference
    latent_codes = surrogate_encoder.predict(windows_stacked)
    decoded_sequences = musicvae.decode(latent_codes, temperature=temperature)
    decoded = concatenate_sequences(decoded_sequences)

    # Stitch the original melody and the new accompaniment together.
    if stitch:
        melody_tensor_padded = np.stack(windows_stacked, axis=0)
        melody_padded = concatenate_sequences(
            melody_converter.to_notesequences(melody_tensor_padded))
        out = remove_melody(decoded)
        out.MergeFrom(melody_padded)
        return out

    return decoded