Copyright 2017 Google LLC.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

# MusicVAE: A Hierarchical Latent Vector Model for Learning Long-Term Structure in Music.
### ___Adam Roberts, Jesse Engel, Colin Raffel, Curtis Hawthorne, and Douglas Eck___

[MusicVAE](https://g.co/magenta/music-vae) learns a latent space of musical scores, providing different modes
of interactive musical creation, including:

* Random sampling from the prior distribution.
* Interpolation between existing sequences.
* Manipulation of existing sequences via attribute vectors.

Examples of these interactions can be generated below, and selections can be heard in our
[YouTube playlist](https://www.youtube.com/playlist?list=PLBUMAYA6kvGU8Cgqh709o5SUvo-zHGTxr).

For short sequences (e.g., 2-bar "loops"), we use a bidirectional LSTM encoder
and LSTM decoder. For longer sequences, we use a novel hierarchical LSTM
decoder, which helps the model learn longer-term structures.

We also model the interdependencies between instruments by training multiple
decoders on the lowest-level embeddings of the hierarchical decoder.

For additional details, check out our [blog post](https://g.co/magenta/music-vae) and [paper](https://goo.gl/magenta/musicvae-paper).
___

This colab notebook is self-contained and should run natively on google cloud. The [code](https://github.com/tensorflow/magenta/tree/master/magenta/models/music_vae) and [checkpoints](http://download.magenta.tensorflow.org/models/music_vae/checkpoints.tar.gz) can be downloaded separately and run locally, which is required if you want to train your own model.

# Basic Instructions

1. Double click on the hidden cells to make them visible, or select "View > Expand Sections" in the menu at the top.
2. Hover over the "`[ ]`" in the top-left corner of each cell and click on the "Play" button to run it, in order.
3. Listen to the generated samples.
4. Make it your own: copy the notebook, modify the code, train your own models, upload your own MIDI, etc.!

# Environment Setup
Includes package installation for sequence synthesis. Will take a few minutes.


In [None]:
#@title Setup Environment
#@test {"output": "ignore"}

import glob

BASE_DIR = "gs://download.magenta.tensorflow.org/models/music_vae/colab2"

print('Installing dependencies...')
!apt-get update -qq && apt-get install -qq libfluidsynth1 fluid-soundfont-gm build-essential libasound2-dev libjack-dev
!pip install -q pyfluidsynth
!pip install -qU magenta

# Hack to allow python to pick up the newly-installed fluidsynth lib.
# This is only needed for the hosted Colab environment.
import ctypes.util
orig_ctypes_util_find_library = ctypes.util.find_library
def proxy_find_library(lib):
  if lib == 'fluidsynth':
    return 'libfluidsynth.so.1'
  else:
    return orig_ctypes_util_find_library(lib)
ctypes.util.find_library = proxy_find_library


print('Importing libraries and defining some helper functions...')
from google.colab import files
import magenta.music as mm
from magenta.models.music_vae import configs
from magenta.models.music_vae.trained_model import TrainedModel
import numpy as np
import os
import tensorflow.compat.v1 as tf

tf.disable_v2_behavior()

# Necessary until pyfluidsynth is updated (>1.2.5).
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

def play(note_sequence):
  mm.play_sequence(note_sequence, synth=mm.fluidsynth)

def interpolate(model, start_seq, end_seq, num_steps, max_length=32,
                assert_same_length=True, temperature=0.5,
                individual_duration=4.0):
  """Interpolates between a start and end sequence."""
  note_sequences = model.interpolate(
      start_seq, end_seq,num_steps=num_steps, length=max_length,
      temperature=temperature,
      assert_same_length=assert_same_length)

  print('Start Seq Reconstruction')
  play(note_sequences[0])
  print('End Seq Reconstruction')
  play(note_sequences[-1])
  print('Mean Sequence')
  play(note_sequences[num_steps // 2])
  print('Start -> End Interpolation')
  interp_seq = mm.sequences_lib.concatenate_sequences(
      note_sequences, [individual_duration] * len(note_sequences))
  play(interp_seq)
  mm.plot_sequence(interp_seq)
  return interp_seq if num_steps > 3 else note_sequences[num_steps // 2]

def download(note_sequence, filename):
  mm.sequence_proto_to_midi_file(note_sequence, filename)
  files.download(filename)

print('Done')

Installing dependencies...
Importing libraries and defining some helper functions...


Import requested from: 'numba.decorators', please update to use 'numba.core.decorators' or pin to Numba version 0.48.0. This alias will not be present in Numba version 0.50.0.
  from numba.decorators import jit as optional_jit
Import of 'jit' requested from: 'numba.decorators', please update to use 'numba.core.decorators' or pin to Numba version 0.48.0. This alias will not be present in Numba version 0.50.0.
  from numba.decorators import jit as optional_jit


Instructions for updating:
non-resource variables are not supported in the long term
Done


In [None]:
import os
import re
import tarfile
import tempfile

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
%cd /content/drive/MyDrive/Magenta/magenta

/content/drive/.shortcut-targets-by-id/1KPdTL1Qxr1axhsrKGYsHmrgaYhBBxUzi/Magenta/magenta


In [None]:
ls data/checkpoints/

cat-mel_2bar_big.tar


# 2-Bar Melody Model

The pre-trained model consists of a single-layer bidirectional LSTM encoder with 2048 nodes in each direction, a 3-layer LSTM decoder with 2048 nodes in each layer, and Z with 512 dimensions. The model was given 0 free bits, and had its beta valued annealed at an exponential rate of 0.99999 from 0 to 0.43 over 200k steps. It was trained with scheduled sampling with an inverse sigmoid schedule and a rate of 1000. The final accuracy is 0.95 and KL divergence is 58 bits.

In [None]:
#@title Load the pre-trained model.
mel_2bar_config = configs.CONFIG_MAP['cat-mel_2bar_big']
mel_2bar = TrainedModel(mel_2bar_config, batch_size=4, checkpoint_dir_or_path=BASE_DIR + '/checkpoints/mel_2bar_big.ckpt')

# Create and load model

In [None]:
# Parameters
config_name = 'cat-mel_2bar_big'
checkpoint_dir_or_path= '/content/drive/MyDrive/Magenta/magenta/data/checkpoints/cat-mel_2bar_big.tar'
batch_size = 512
var_name_substitutions = None

# train params 
train_dir = '/content/drive/MyDrive/Magenta/magenta/data/tmp/train-9-24-01/train/'


In [None]:
config = configs.CONFIG_MAP[config_name]
if tf.gfile.IsDirectory(checkpoint_dir_or_path):
  checkpoint_path = tf.train.latest_checkpoint(checkpoint_dir_or_path)
else:
  checkpoint_path = checkpoint_dir_or_path
config.hparams.batch_size = batch_size

with tf.Graph().as_default():
  model = config.model
  model.build(
      config.hparams,
      config.data_converter.output_depth,
      is_training=True
  )

INFO:tensorflow:Building MusicVAE model with BidirectionalLstmEncoder, CategoricalLstmDecoder, and hparams:
{'max_seq_len': 32, 'z_size': 512, 'free_bits': 0, 'max_beta': 0.5, 'beta_rate': 0.99999, 'batch_size': 512, 'grad_clip': 1.0, 'clip_mode': 'global_norm', 'grad_norm_clip_to_zero': 10000, 'learning_rate': 0.001, 'decay_rate': 0.9999, 'min_learning_rate': 1e-05, 'conditional': True, 'dec_rnn_size': [2048, 2048, 2048], 'enc_rnn_size': [2048], 'dropout_keep_prob': 1.0, 'sampling_schedule': 'inverse_sigmoid', 'sampling_rate': 1000, 'use_cudnn': False, 'residual_encoder': False, 'residual_decoder': False, 'control_preprocessing_rnn_size': [256]}
INFO:tensorflow:
Encoder Cells (bidirectional):
  units: [2048]

INFO:tensorflow:
Decoder Cells:
  units: [2048, 2048, 2048]

Instructions for updating:
Use `tf.cast` instead.


In [None]:
# Input placeholders
temperature = tf.placeholder(tf.float32, shape=())

if config.hparams.z_size:
  z_input = tf.placeholder(
      tf.float32, shape=[batch_size, config.hparams.z_size]
  )
else:
  z_input = None

if config.data_converter.control_depth > 0:
  c_input = tf.placeholder(
      tf.float32, shape=[None, config.data_converter.control_depth]
  )
else:
  c_input = None

inputs = tf.placeholder(
    tf.float32,
    shape=[batch_size, None, config.data_converter.input_depth]
)
controls = tf.placeholder(
    tf.float32,
    shape=[batch_size, None, config.data_converter.control_depth]
)
inputs_length = tf.placeholder(
    tf.int32,
    shape=[batch_size] + list(config.data_converter.length_shape)
)
max_length = tf.placeholder(tf.int32, shape=())

In [None]:
# Output placeholders
outputs, decoder_results = model.sample(
    batch_size,
    max_length,
    z=z_input,
    c_input=c_input,
    temperature=temperature
)

if config.hparams.z_size:
  q_z = model.encode(inputs, inputs_length, controls)
  mu = q_z.loc
  mu = q_z.scale.diag
  z = q_z.sample()



Instructions for updating:
Please use `keras.layers.Bidirectional(keras.layers.RNN(cell))`, which is equivalent to this API
Instructions for updating:
Please use `keras.layers.RNN(cell)`, which is equivalent to this API
Instructions for updating:
Do not call `graph_parents`.


In [None]:
var_map = None
if var_name_substitutions is not None:
  var_map = {}
  for v in tf.global_variables():
    var_namr = v.name[:-2]  # Strip ':0' suffix
    for pattern, substitution in var_name_substitutions:
      var_name = re.sub(pattern, substitution, var_name)
    if var_name != v.name[:-2]:
      tf.logging.info('Renaming `%s` to `%s`.', v.name[:-2], var_name)
    var_map[var_name] = v

In [None]:
 # Restore graph
sess = tf.Session()
saver = tf.train.Saver(var_map)
if (os.path.exists(checkpoint_path) and
    tarfile.is_tarfile(checkpoint_path)):
  tf.logging.info('Unbundling checkpoint.')
  with tempfile.TemporaryDirectory() as temp_dir:
    tar = tarfile.open(checkpoint_path)
    tar.extractall(temp_dir)
    # Assume only a single checkpoint is in the directory.
    for name in tar.getnames():
      if name.endswith('.index'):
        checkpoint_path = os.path.join(temp_dir, name[0:-6])
        break
    saver.restore(sess, checkpoint_path)
else:
  saver.restore(sess, checkpoint_path)

INFO:tensorflow:Unbundling checkpoint.
INFO:tensorflow:Restoring parameters from /tmp/tmpivrs340r/cat-mel_2bar_big.ckpt


# Start Training

In [None]:
tf.gfile.MakeDirs(train_dir)

In [None]:
tf.config.list_logical_devices()

[LogicalDevice(name='/device:CPU:0', device_type='CPU'),
 LogicalDevice(name='/device:GPU:0', device_type='GPU')]

In [None]:
def train(train_dir,
          config,
          dataset_fn,
          checkpoints_to_keep=5,
          keep_checkpoint_every_n_hours=1,
          num_steps=None,
          master='',
          num_sync_workers=0,
          num_ps_tasks=0,
          task=0):
  print("These are the arguments====================================")
  print(train_dir,
          config,
          dataset_fn,
          checkpoints_to_keep,
          keep_checkpoint_every_n_hours,
          num_steps,
          master,
          num_sync_workers,
          num_ps_tasks,
          task)
          #100 1 200000  0 0 0
  print("These are the arguments====================================")
  """Train loop."""
  tf.gfile.MakeDirs(train_dir)
  is_chief = (task == 0)
  if is_chief:
    _trial_summary(
        config.hparams, config.train_examples_path or config.tfds_name,
        train_dir)
  with tf.Graph().as_default():
    with tf.device(tf.train.replica_device_setter(
        num_ps_tasks, merge_devices=True)):

      model = config.model
      model.build(config.hparams,
                  config.data_converter.output_depth,
                  is_training=True)

      optimizer = model.train(**_get_input_tensors(dataset_fn(), config))

      hooks = []
      if num_sync_workers:
        optimizer = tf.train.SyncReplicasOptimizer(
            optimizer,
            num_sync_workers)
        hooks.append(optimizer.make_session_run_hook(is_chief))

      grads, var_list = list(zip(*optimizer.compute_gradients(model.loss)))
      global_norm = tf.global_norm(grads)
      tf.summary.scalar('global_norm', global_norm)

      if config.hparams.clip_mode == 'value':
        g = config.hparams.grad_clip
        clipped_grads = [tf.clip_by_value(grad, -g, g) for grad in grads]
      elif config.hparams.clip_mode == 'global_norm':
        clipped_grads = tf.cond(
            global_norm < config.hparams.grad_norm_clip_to_zero,
            lambda: tf.clip_by_global_norm(  # pylint:disable=g-long-lambda
                grads, config.hparams.grad_clip, use_norm=global_norm)[0],
            lambda: [tf.zeros(tf.shape(g)) for g in grads])
      else:
        raise ValueError(
            'Unknown clip_mode: {}'.format(config.hparams.clip_mode))
      train_op = optimizer.apply_gradients(
          list(zip(clipped_grads, var_list)),
          global_step=model.global_step,
          name='train_step')

      logging_dict = {'global_step': model.global_step,
                      'loss': model.loss}

      hooks.append(tf.train.LoggingTensorHook(logging_dict, every_n_iter=100))
      if num_steps:
        hooks.append(tf.train.StopAtStepHook(last_step=num_steps))

      scaffold = tf.train.Scaffold(
          saver=tf.train.Saver(
              max_to_keep=checkpoints_to_keep,
              keep_checkpoint_every_n_hours=keep_checkpoint_every_n_hours))
      tf_slim.training.train(
          train_op=train_op,
          logdir=train_dir,
          scaffold=scaffold,
          hooks=hooks,
          save_checkpoint_secs=60,
          master=master,
          is_chief=is_chief)


def evaluate(train_dir,
             eval_dir,
             config,
             dataset_fn,
             num_batches,
             master=''):
  """Evaluate the model repeatedly."""
  tf.gfile.MakeDirs(eval_dir)

  _trial_summary(
      config.hparams, config.eval_examples_path or config.tfds_name, eval_dir)
  with tf.Graph().as_default():
    model = config.model
    model.build(config.hparams,
                config.data_converter.output_depth,
                is_training=False)

    eval_op = model.eval(
        **_get_input_tensors(dataset_fn().take(num_batches), config))

    hooks = [
        tf_slim.evaluation.StopAfterNEvalsHook(num_batches),
        tf_slim.evaluation.SummaryAtEndHook(eval_dir)
    ]
    tf_slim.evaluation.evaluate_repeatedly(
        train_dir,
        eval_ops=eval_op,
        hooks=hooks,
        eval_interval_secs=60,
        master=master)