In [1]:
#@title Setup
#%%capture

#@test {"output": "ignore"}



import glob

BASE_DIR = "gs://download.magenta.tensorflow.org/models/music_vae/colab2"

print('Installing dependencies...')
!apt-get update -qq && apt-get install -qq libfluidsynth1 fluid-soundfont-gm build-essential libasound2-dev libjack-dev
!pip install -q pyfluidsynth
!pip install -qU magenta



# Hack to allow python to pick up the newly-installed fluidsynth lib.
# This is only needed for the hosted Colab environment.
import ctypes.util
orig_ctypes_util_find_library = ctypes.util.find_library
def proxy_find_library(lib):
  if lib == 'fluidsynth':
    return 'libfluidsynth.so.1'
  else:
    return orig_ctypes_util_find_library(lib)
ctypes.util.find_library = proxy_find_library


print('Importing libraries and defining some helper functions...')
#from google.colab impo#rt files
import magenta.music as mm
from magenta.models.music_vae import configs
from magenta.models.music_vae.trained_model import TrainedModel
import numpy as np
import os
import tensorflow.compat.v1 as tf
import note_seq
import pretty_midi

tf.disable_v2_behavior()

# Necessary until pyfluidsynth is updated (>1.2.5).
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

def play(note_sequence):
  mm.play_sequence(note_sequence, synth=mm.fluidsynth)

def plot(note_sequence):
  note_seq.plot_sequence(note_sequence)

def interpolate(model, start_seq, end_seq, num_steps, max_length=32,
                assert_same_length=True, temperature=0.5,
                individual_duration=4.0):
  """Interpolates between a start and end sequence."""
  note_sequences = model.interpolate(
      start_seq, end_seq,num_steps=num_steps, length=max_length,
      temperature=temperature,
      assert_same_length=assert_same_length)

  print('Start Seq Reconstruction')
  play(note_sequences[0])
  print('End Seq Reconstruction')
  play(note_sequences[-1])
  print('Mean Sequence')
  play(note_sequences[num_steps // 2])
  print('Start -> End Interpolation')
  interp_seq = mm.sequences_lib.concatenate_sequences(
      note_sequences, [individual_duration] * len(note_sequences))
  play(interp_seq)
  mm.plot_sequence(interp_seq)
  return interp_seq if num_steps > 3 else note_sequences[num_steps // 2]

def download(note_sequence, filename):
  mm.sequence_proto_to_midi_file(note_sequence, filename)
  files.download(filename)

print('Done')

Installing dependencies...
Selecting previously unselected package fluid-soundfont-gm.
(Reading database ... 144793 files and directories currently installed.)
Preparing to unpack .../fluid-soundfont-gm_3.1-5.1_all.deb ...
Unpacking fluid-soundfont-gm (3.1-5.1) ...
Selecting previously unselected package libfluidsynth1:amd64.
Preparing to unpack .../libfluidsynth1_1.1.9-1_amd64.deb ...
Unpacking libfluidsynth1:amd64 (1.1.9-1) ...
Setting up fluid-soundfont-gm (3.1-5.1) ...
Setting up libfluidsynth1:amd64 (1.1.9-1) ...
Processing triggers for libc-bin (2.27-3ubuntu1.2) ...
/sbin/ldconfig.real: /usr/local/lib/python3.6/dist-packages/ideep4py/lib/libmkldnn.so.0 is not a symbolic link

[K     |████████████████████████████████| 1.4MB 13.5MB/s 
[K     |████████████████████████████████| 204kB 48.0MB/s 
[K     |████████████████████████████████| 71kB 10.9MB/s 
[K     |████████████████████████████████| 1.5MB 54.5MB/s 
[K     |████████████████████████████████| 2.3MB 52.6MB/s 
[K     |██████

In [8]:
class CustomTrainedModel(TrainedModel):
  

    # we want to override the interpolation method in the future. We can also override any other method like sample/encode/decode if needed
  def interpolate(self, start_sequence, end_sequence, num_steps, length=None, temperature=1.0, assert_same_length=True):

    '''Interpolates between a start and an end NoteSequence.
    Args:
      start_sequence: The NoteSequence to interpolate from.
      end_sequence: The NoteSequence to interpolate to.
      num_steps: Number of NoteSequences to be generated, including the
        reconstructions of the start and end sequences.
      length: The maximum length of a sample in decoder iterations. Required
        if end tokens are not being used.
      temperature: The softmax temperature to use (if applicable).
      assert_same_length: Whether to raise an AssertionError if all of the
        extracted sequences are not the same length.
    Returns:
      A list of interpolated NoteSequences.
    Raises:
      AssertionError: If `assert_same_length` is True and any extracted
        sequences differ in length.
    '''

    #In this case the original interpolation method """Spherical linear interpolation.""". We can easily replace this one.
    def _slerp(p0, p1, t):
      """Spherical linear interpolation."""
      omega = np.arccos(np.dot(np.squeeze(p0/np.linalg.norm(p0)),
                                np.squeeze(p1/np.linalg.norm(p1))))
      so = np.sin(omega)
      return np.sin((1.0-t)*omega) / so * p0 + np.sin(t*omega)/so * p1

    _, mu, _ = self.encode([start_sequence, end_sequence], assert_same_length)
    z = np.array([_slerp(mu[0], mu[1], t) for t in np.linspace(0, 1, num_steps)])
    return self.decode(length=length, z=z, temperature=temperature)

  def distance(self, z_array):
    ''' Computes the distance vectors between points in the latent space.
    Args:
      z_array: The array of points. Must be of shape (n_samples, n_dims).
    Returns:
      displacement: List of vectors, each pointing from one point to the next.
      distance: Distance from each point to the next (norm of its vector).
    '''
    n_samples = np.shape(z_array)[0]
    displacement = [0]*(n_samples-1)
    distance = [0]*(n_samples-1)
    for i in range(n_samples-1):  
      displacement[i] = z_array[i+1] - z_array[i]
      distance[i] = np.linalg.norm(displacement[i])
    return displacement, distance

  def cos_angle(self, displacement):
    ''' Computes the cosine of the angle between pairs of neighboring 
    displacement vectors.
    Args:
      displacement: List of vectors between pairs of neighboring points in 
      the latent space. First output of distance().
    Returns:
      cos_angle: Numpy array of cosines of angles between neighboring 
      displacement vectors.
    '''
    n_displacements = len(displacement)
    cos_angle = np.zeros(n_displacements-1)
    for i in range(n_displacements-1):
      cos_angle[i] = np.dot(displacement[i+1], displacement[i]) / (np.linalg.norm(displacement[i+1]) * np.linalg.norm(displacement[i]))
    return cos_angle


In [15]:
def to_tensors(note_sequence):
    """Converts NoteSequence to unique, one-hot tensor sequences."""
    try:
      if note_sequence._steps_per_quarter:
        quantized_sequence = note_seq.quantize_note_sequence(
            note_sequence, note_sequence._steps_per_quarter)
        if (note_seq.steps_per_bar_in_quantized_sequence(quantized_sequence) !=
            note_sequence._steps_per_bar):
          return ConverterTensors()
      else:
        quantized_sequence = note_seq.quantize_note_sequence_absolute(
            note_sequence, self._steps_per_second)
    except (note_seq.BadTimeSignatureError, note_seq.NonIntegerStepsPerBarError,
            note_seq.NegativeTimeError) as e:
      return ConverterTensors()

    if (self._chord_encoding and not any(
        ta.annotation_type == CHORD_SYMBOL
        for ta in quantized_sequence.text_annotations)) or (
            self._condition_on_key and not quantized_sequence.key_signatures):
      # We are conditioning on chords and/or key but sequence does not have
      # them. Try to infer chords and optionally key.
      try:
        note_seq.infer_chords_for_sequence(
            quantized_sequence, add_key_signatures=self._condition_on_key)
      except note_seq.ChordInferenceError:
        return ConverterTensors()

    event_lists, unused_stats = self._event_extractor_fn(quantized_sequence)
    if self._pad_to_total_time:
      for e in event_lists:
        e.set_length(len(e) + e.start_step, from_left=True)
        e.set_length(quantized_sequence.total_quantized_steps)
    if self._slice_steps:
      sliced_event_lists = []
      for l in event_lists:
        for i in range(self._slice_steps, len(l) + 1, self._steps_per_bar):
          sliced_event_lists.append(l[i - self._slice_steps: i])
    else:
      sliced_event_lists = event_lists

    # We are going to dedupe the event lists. However, when conditioning on
    # chords and/or key, we want to include the same event list multiple times
    # if it appears with different chords or keys.
    all_sliced_lists = [sliced_event_lists]

    if self._chord_encoding:
      # Extract chord lists that correspond to event lists, i.e. for each event
      # we find the chord active at that time step.
      try:
        sliced_chord_lists = chords_lib.event_list_chords(
            quantized_sequence, sliced_event_lists)
      except chords_lib.CoincidentChordsError:
        return ConverterTensors()
      all_sliced_lists.append(sliced_chord_lists)

    if self._condition_on_key:
      # Extract key lists that correspond to event lists, i.e. for each event
      # we find the key active at that time step.
      if self._steps_per_second:
        steps_per_second = self._steps_per_second
      else:
        qpm = quantized_sequence.tempos[0].qpm
        steps_per_second = self._steps_per_quarter * qpm / 60.0
      sliced_key_lists = chords_lib.event_list_keys(
          quantized_sequence, sliced_event_lists, steps_per_second)
      all_sliced_lists.append(sliced_key_lists)

    all_unique_tuples = self._dedupe_and_sample(all_sliced_lists)
    if not all_unique_tuples:
      return ConverterTensors()

    unique_event_tuples = all_unique_tuples[0]
    unique_chord_tuples = all_unique_tuples[1] if self._chord_encoding else []
    unique_key_tuples = all_unique_tuples[-1] if self._condition_on_key else []

    if self._chord_encoding or self._condition_on_key:
      # We need to encode control sequences consisting of chords and/or keys.
      control_seqs = self._chords_and_keys_to_controls(
          unique_chord_tuples, unique_key_tuples)
      if not control_seqs:
        return ConverterTensors()
    else:
      control_seqs = []

    seqs = []
    for t in unique_event_tuples:
      seqs.append(np_onehot(
          [self._legacy_encoder_decoder.encode_event(e) for e in t] +
          ([] if self.end_token is None else [self.end_token]),
          self.output_depth, self.output_dtype))

    return ConverterTensors(inputs=seqs, outputs=seqs, controls=control_seqs)

In [9]:
# Define Model
hierdec_trio_16bar_config = configs.CONFIG_MAP['hierdec-trio_16bar']#['hierdec-mel_16bar']#
model_16bar = CustomTrainedModel(hierdec_trio_16bar_config, batch_size=4, checkpoint_dir_or_path=BASE_DIR + '/checkpoints/trio_16bar_hierdec.ckpt') #'/checkpoints/mel_16bar_hierdec.ckpt')#'/checkpoints/trio_16bar_hierdec.ckpt')

INFO:tensorflow:Building MusicVAE model with BidirectionalLstmEncoder, HierarchicalLstmDecoder, and hparams:
{'max_seq_len': 256, 'z_size': 512, 'free_bits': 256, 'max_beta': 0.2, 'beta_rate': 0.0, 'batch_size': 4, 'grad_clip': 1.0, 'clip_mode': 'global_norm', 'grad_norm_clip_to_zero': 10000, 'learning_rate': 0.001, 'decay_rate': 0.9999, 'min_learning_rate': 1e-05, 'conditional': True, 'dec_rnn_size': [1024, 1024], 'enc_rnn_size': [2048, 2048], 'dropout_keep_prob': 1.0, 'sampling_schedule': 'constant', 'sampling_rate': 0.0, 'use_cudnn': False, 'residual_encoder': False, 'residual_decoder': False, 'control_preprocessing_rnn_size': [256]}
INFO:tensorflow:
Encoder Cells (bidirectional):
  units: [2048, 2048]

Instructions for updating:
This class is equivalent as tf.keras.layers.StackedRNNCells, and will be replaced by that in Tensorflow 2.0.
INFO:tensorflow:
Hierarchical Decoder:
  input length: 256
  level output lengths: [16, 16]

INFO:tensorflow:
Decoder Cells:
  units: [1024, 1024]



In [None]:
l

In [45]:
# input: n (number of sequences), softmax temperature
# returns: a list of note sequences
n = 100
temperature = 1
for i in range (1):
  samples = model_16bar.sample(n=n, length=3,temperature=temperature)
print('Number of samples:',len(samples))

print('Sample object type:',type(samples[0]))

# note sequences can be plotted with plot() and played with play()
print('')
print('----- Actual samples -----')
for ns in samples[0:1]:
  plot(ns)
  play(ns)

Number of samples: 100
Sample object type: <class 'note_seq.protobuf.music_pb2.NoteSequence'>

----- Actual samples -----


In [47]:
for i in range(5):
  a = model_16bar.encode(samples)

In [11]:
download(samples[0], "ciao.midi")

NameError: ignored

In [8]:
print(type(samples))
z, mu, sigma =  model_16bar.encode(samples)

<class 'list'>


In [11]:
eh = note_seq.note_sequence_to_pretty_midi(samples[0])

In [14]:
eh

<pretty_midi.pretty_midi.PrettyMIDI at 0x7fa839527e10>

In [32]:
 # --- TrainedModel.encode ---
# input: a list of NoteSequences
# returns: tuple of shape (3, n_sequences, num_latent_dims), where the first dim is z/mu/sigma
#print(len(samples[0]))
samples = model_16bar.sample(n=1, length=3,temperature=1)
z, mu, sigma =  model_16bar.encode(samples)

#print('Shape of encoded latent vector:',np.shape(z))
'''
# --- TrainedModel.decode ---
# input: a list of latent vectors
# returns: a list of note sequences
reconstructed_samples = model_16bar.decode(z)
print('Number of samples:',len(reconstructed_samples))
print('Sample object type:',type(reconstructed_samples[0]))
print('')
print('----- Reconstructed samples -----')
for ns in reconstructed_samples:
  plot(ns)
  play(ns)
'''

"\n# --- TrainedModel.decode ---\n# input: a list of latent vectors\n# returns: a list of note sequences\nreconstructed_samples = model_16bar.decode(z)\nprint('Number of samples:',len(reconstructed_samples))\nprint('Sample object type:',type(reconstructed_samples[0]))\nprint('')\nprint('----- Reconstructed samples -----')\nfor ns in reconstructed_samples:\n plot(ns)\n play(ns)\n"

In [39]:
decoded_z = model_16bar.decode(z)

In [41]:
type(decoded_z[0])

note_seq.protobuf.music_pb2.NoteSequence

In [42]:
what = model_16bar.encode(decoded_z)

In [44]:
play(what[0])

AttributeError: ignored

In [35]:
type(samples)
type(samples[0])

note_seq.protobuf.music_pb2.NoteSequence

In [34]:
samples[0]

ticks_per_quarter: 220
tempos {
  qpm: 120.0
}
notes {
  pitch: 44
  velocity: 80
  end_time: 0.25
}
notes {
  pitch: 59
  velocity: 80
  start_time: 0.25
  end_time: 0.375
}
notes {
  pitch: 47
  velocity: 80
  start_time: 0.375
  end_time: 0.625
}
notes {
  pitch: 59
  velocity: 80
  start_time: 0.625
  end_time: 0.75
}
notes {
  pitch: 51
  velocity: 80
  start_time: 0.875
  end_time: 1.25
}
notes {
  pitch: 59
  velocity: 80
  start_time: 1.25
  end_time: 1.5
}
notes {
  pitch: 47
  velocity: 80
  start_time: 1.5
  end_time: 1.75
}
notes {
  pitch: 51
  velocity: 80
  start_time: 1.75
  end_time: 1.875
}
notes {
  pitch: 66
  velocity: 80
  start_time: 2.25
  end_time: 2.375
}
notes {
  pitch: 66
  velocity: 80
  start_time: 2.75
  end_time: 2.875
}
notes {
  pitch: 62
  velocity: 80
  start_time: 3.25
  end_time: 3.375
}
notes {
  pitch: 32
  velocity: 80
  start_time: 4.0
  end_time: 4.125
}
notes {
  pitch: 65
  velocity: 80
  start_time: 4.25
  end_time: 4.375
}
notes {
  pitch

In [None]:
from matplotlib import pyplot as plt

v, d = model_16bar.distance(mu)
c = model_16bar.cos_angle(v)

plt.figure(1)
plt.hist(d)
plt.title('Distance between neighboring vectors in z space')
plt.xlabel('Distance')

plt.figure(2)
plt.hist(c, bins=50, range=[-1,1])
plt.title('Angle between neighboring vectors in z space')
plt.xlabel('Cosine of angle')
plt.show()

In [None]:
#so i want to measure the distance between consecutive notesequences, not random samples.

In [None]:
#i want to take a midi file, convert into note sequences, and split them into smaller chuncks and then encode them again and see the result

In [17]:
midi_file = pretty_midi.PrettyMIDI('ciao.midi') #loading a midi file
# note_sequences = note_seq.midi_to_note_sequence(midi_file) #converting a midi file into a note_sequence
note_sequences = mm.midi_file_to_note_sequence('ciao.midi')
note_sequences = note_seq.split_note_sequence(note_sequences, 30)
for ns in note_sequences[0:0]:
  play(ns)
  plot(ns)

# extracted_tensors = model_16bar._config.data_converter.to_tensors(note_sequences[0])

In [18]:
type(note_sequences[0])

note_seq.protobuf.music_pb2.NoteSequence

In [None]:
len(note_sequences)

In [21]:
# z = model_16bar.encode(note_sequences)

In [16]:
z = model_16bar.encode(note_sequences[0:1])
print(z)

In [None]:
midi_file = pretty_midi.PrettyMIDI('ciao.mid') #loading a midi file
note_sequences = note_seq.midi_to_note_sequence(midi_file) #converting a midi file into a note_sequence

note_sequences = note_seq.split_note_sequence(note_sequences, 30)
for ns in note_sequences:
  plot(ns)
  play(ns)

In [None]:

try:
  z, m, signma = model_16bar.encode([note_sequences[1]], assert_same_length=True)
except ValueError:
  
print(note_seq.is_quantized_sequence(note_sequences[0]))

In [None]:
type(note_sequences[0])

In [22]:
z = [[0 for i in range(0,512)]]
z1 = z
z2 = z
z3 = z

In [14]:
len(z)

1

In [31]:
z = [[0 for i in range(0,512)]]
z1 = z
z2 = z
z3 = z
dimension = 9
z[0][dimension] = 1
z1[0][dimension] = 1/10
z2[0][dimension] = 1/100
z3[0][dimension] = 1/1000

z = model_16bar.decode(z)
z1 = model_16bar.decode(z1)
z2 = model_16bar.decode(z2)
z3 = model_16bar.decode(z3)



play(z[0])
play(z1[0])
play(z2[0])
play(z3[0])


plot(z[0])
plot(z1[0])
plot(z2[0])
plot(z3[0])

In [27]:
z = model_16bar.decode(z[0])
z1 = model_16bar.decode(z1)
z2 = model_16bar.decode(z2)
z3 = model_16bar.decode(z3)



ValueError: ignored

In [26]:
# download(random_point[0], "random_sample.midi")
plot(z[0])
plot(z1[0])
plot(z2[0])
plot(z3[0])

In [None]:
mm.