In [0]:
RUNNING_IN_COLAB = True

if RUNNING_IN_COLAB:
  from google.colab import drive
  drive.mount('/content/drive')

In [0]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import Sequential
from tensorflow.keras.layers import LSTM, Dense, TimeDistributed, Bidirectional, Lambda
import numpy as np
from music21 import *
from copy import deepcopy
import random
import pickle
import os
import time

In [0]:
MODELS_DIRECTORY = '../models'
PICKLES_DIRECTORY = 'pickles'
COMPOSITIONS_DIRECTORY = '../outputs/compositions'

if RUNNING_IN_COLAB:  # access the shared drive instead
  MODELS_DIRECTORY = '/content/drive/Shared drives/melodyAI/outputs/models'
  PICKLES_DIRECTORY = '/content/drive/Shared drives/melodyAI/pickles'
  COMPOSITIONS_DIRECTORY = '/content/drive/Shared drives/melodyAI/outputs/compositions'


if not os.path.exists(MODELS_DIRECTORY):
    os.makedirs(MODELS_DIRECTORY)
if not os.path.exists(PICKLES_DIRECTORY):
    os.makedirs(PICKLES_DIRECTORY)
if not os.path.exists(COMPOSITIONS_DIRECTORY):
    os.makedirs(COMPOSITIONS_DIRECTORY)

In [0]:
assert os.path.exists(MODELS_DIRECTORY)
assert os.path.exists(PICKLES_DIRECTORY)
assert os.path.exists(COMPOSITIONS_DIRECTORY)

# Retrieving data

In [0]:
# load the datasets
with open(f"{PICKLES_DIRECTORY}/short_seqs_duration.pickle", 'rb') as short_duration,\
     open(f"{PICKLES_DIRECTORY}/short_seqs_pitch.pickle", 'rb') as short_pitch:
    short_seqs_duration = pickle.load(short_duration)
    short_seqs_pitch = pickle.load(short_pitch)

with open(f"{PICKLES_DIRECTORY}/medium_seqs_duration.pickle", 'rb') as medium_duration,\
     open(f"{PICKLES_DIRECTORY}/medium_seqs_pitch.pickle", 'rb') as medium_pitch:
    medium_seqs_duration = pickle.load(medium_duration)
    medium_seqs_pitch = pickle.load(medium_pitch)
    
with open(f"{PICKLES_DIRECTORY}/long_seqs_duration.pickle", 'rb') as long_duration,\
     open(f"{PICKLES_DIRECTORY}/long_seqs_pitch.pickle", 'rb') as long_pitch:
    long_seqs_duration = pickle.load(long_duration)
    long_seqs_pitch = pickle.load(long_pitch)

In [0]:
# retrieve the pitches and durations that were used to build the data set
with open(f'{PICKLES_DIRECTORY}/durations.pickle', 'rb') as d, open(f'{PICKLES_DIRECTORY}/pitches.pickle', 'rb') as p:
    durations = pickle.load(d)
    pitches = pickle.load(p)
    
# retrieve the mapping from pitch/duration values to one-hot vector indices
with open(f'{PICKLES_DIRECTORY}/duration_indices.pickle', 'rb') as d, open(f'{PICKLES_DIRECTORY}/pitch_indices.pickle', 'rb') as p:
    duration_indices = pickle.load(d)
    pitch_indices = pickle.load(p)
    
# retrieve the likelihood of starting with a given pitch/duration
# these are used in sampling to find the starting pitch/duration of each composition
with open(f'{PICKLES_DIRECTORY}/starting_pitch_likelihood.pickle', 'rb') as p, open(f'{PICKLES_DIRECTORY}/starting_duration_likelihood.pickle', 'rb') as d:
    starting_pitch_likelihood = pickle.load(p)
    starting_duration_likelihood = pickle.load(d)
    
num_durations = len(durations)
num_pitches = len(pitches)
short_seq_len = short_seqs_duration.shape[1]
medium_seq_len = medium_seqs_duration.shape[1]
long_seq_len = long_seqs_duration.shape[1]

# Experimenting with how temperature affects a softmax distribution

In [0]:
def softmax(x):
    """
    Simply performs the softmax operation on an input vector.
    
    Parameters:
        x: a vector of logits.
        
    Returns: 
        np.array: array representing the softmax distribution of the input logits.
    """
    return np.exp(x) / np.sum(np.exp(x), axis=0)

logits = np.array([1,2,3,4])
logits_temp_p0 = logits / 0.01
logits_temp_p2 = logits / 0.2
logits_temp_p4 = logits / 0.4
logits_temp_p6 = logits / 0.6
logits_temp_p8 = logits / 0.8
logits_temp_p10 = logits / 1.0
logits_temp_p15 = logits / 1.5
logits_temp_p150 = logits / 150

print(f"RAW: {softmax(logits)}")
print(f"TEMP 0.0: {softmax(logits_temp_p0)}")
print(f"TEMP 0.2: {softmax(logits_temp_p2)}")
print(f"TEMP 0.4: {softmax(logits_temp_p4)}")
print(f"TEMP 0.6: {softmax(logits_temp_p6)}")
print(f"TEMP 0.8: {softmax(logits_temp_p8)}")
print(f"TEMP 1.0: {softmax(logits_temp_p10)}")
print(f"TEMP 1.5: {softmax(logits_temp_p15)}")
print(f"TEMP 150: {softmax(logits_temp_p150)}")

# Model

### Helper functions

In [0]:
def timestep_to3d(x):
    """
    Takes a 1D vector (a vector representing a single timestep) and converts it to a 3D vector (which is required by an LSTM network).
    
    Parameters:
        x: a 1D vector which is the one-hot encoding of a value at a single timestep.
        
    Returns: 
        np.array: a 3D vector corresponding to a one-hot encoding of a single timestep.
    """
    return np.reshape(x, (1, 1, x.shape[0]))


def vectorize(index, vec_size):
    """
    Creates a one-hot vector representation for a single time step given the index position of the value to encode.
    
    Parameters:
        index: the index of the returned vector which should be set to 1.
        vec_size: how large the returned vector should be (how many possible values for the feature).
        
    Returns: 
        np.array: a 1D array which is the resulting one-hot encoding of a value at a single time step.
    """
    index = int(index)
    vec = np.zeros(vec_size, np.float32)
    vec[index] = 1.0
    return vec


def unvectorize(x):
    """
    Returns the index of the one-hot encoded value.
    This works because only one value will be nonzero in a vector, so argmax will return this index of the encoded value.
    
    Parameters:
        x: a one-hot encoded vector.
        
    Returns: 
        integer: an index corresponding to the values which is encoded by the vector.
    """
    return np.argmax(x)

### Model building and training

In [0]:
# single layer Unidirectional or Bidirectional LSTM; will easily allow us to test various configurations
def get_model(num_values, lstm_cells=500, bidirectional=True, temperature=1.0, optimizer="adam"):
    """
    Creates and compiles the LSTM model.
    
    Parameters:
        num_values (int): The size of the one-hot vector at a time step.
        lstm_cells (int): The number of LSTM cells in the model.
        bidirectional (boolean): Whether to construct a bidirectional LSTM (as opposed to unidirectional).
        temperature (float): Value by the Lambda layer to divide output logits by.
        optimizer (string | tf.keras.optimizers.Optimizer): Which optimization algorith to use.
        
    Returns:
        tf.keras.Model: A compiled LSTM model.
    """
    
    model = Sequential()
    # only dif. betwn. bi. LSTM and uni. LSTM is the presence/absence of Bidirectional wrapper
    # hidden layer 1; 20  units; input (# timesteps, # features); return a sequence of each time step's outputs
    # input_shape first value None makes it variable (we don't have fixed length sequences)
    # output of LSTM cell uses tanh activation, recurrent connections use sigmoid
    if bidirectional:
        model.add(Bidirectional(LSTM(lstm_cells, input_shape=(None, num_values), return_sequences=True)))
    else:
        model.add(LSTM(lstm_cells, input_shape=(None, num_values), return_sequences=True))
        
    # so that we can divibe by temperature before feeding through softmax
    model.add(Lambda(lambda x: x / temperature))
        
    # TimeDistributed is a wrapper allowing one output per time step; 
    # ...requires hidden layer to have return_sequences == True
    model.add(TimeDistributed(Dense(num_values, activation='softmax')))
    model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy', 'categorical_crossentropy'])
    return model

In [0]:
def train_model(model, X, bidirectional=True, epochs=15, batch_size=32, verbose=1):
    """
    Trains an LSTM model.
    
    Parameters:
        model (tf.keras.Model): The model which is to be trained.
        X (np.array): A 3D vector (samples, timesteps, values) to train the network on.
        bidirectional (boolean): Indicates whether the model is a bidirectional.
        epochs (int): The number of epochs over which to train the model.
        batch_size (int): The number of samples tito show the odel before updating weights.
        verbose (int): The amount of information to print while training.
        
        Returns: None
    """
    Y = deepcopy(X)
    if not bidirectional:
        X = X[0:-1] # do not input the final time step in unidirectional LSTM
        Y = Y[1:] # labels include all time steps but the first one in unidir. LSTM
    model.fit(X, Y, epochs=epochs, batch_size=batch_size, verbose=verbose)

# Composing with the model

In [0]:
def sample_distribution(dist_vec, num_categories):
    """
    Sample a softmax distribution vector.
    
    Parameters:
        dist_vec (np.array): A logit vector from which to take a sample from.
        num_categories (int): Number of categories to sample from (used for reshaping).
        
    Returns:
        int: The index of the output value which was sampled.
    """
    return tf.random.categorical(dist_vec.reshape(1, num_categories), 1).numpy().flatten()[0]

def compose(pitch_model, duration_model, pitch_prompt, duration_prompt, length=50):
    '''
    Composes a piece of music (in the format of a music21.stream.Stream object).
    
    Arguments:
        pitch_model (tf.keras.Model): The trained model for pitch predictions.
        duration_model (tf.keras.Model): The trained model for duration predictions.
        pitch_prompt (int): The first pitch of the piece (index of the one-hot encoded pitch vector).
        duration_prompt (int): The first duration of the piece (index of the one-hot encoded duration vector).
        length (int): How many time steps to generate.
       
       Returns:
           music21.stream.Stream(): The composed piece.
    '''    
    
    # the lists that hold the indices of the values to index in to pitches/durations lists
    generated_pitches, generated_durations = [pitch_prompt], [duration_prompt]
    
    current_pitch, current_duration = pitch_prompt, duration_prompt
    for t in range(length):
        # model only accepts 3D inputs
        pitch_vec = timestep_to3d(vectorize(current_pitch, num_pitches))
        duration_vec = timestep_to3d(vectorize(current_duration, num_durations))
        
        # predict the output distributions
        pitch_pred = pitch_model.predict(pitch_vec)
        duration_pred = duration_model.predict(duration_vec)
        # sample the distributions (returns the index of the one-hot vectors)
        next_pitch = sample_distribution(pitch_pred, num_pitches)
        next_duration = sample_distribution(duration_pred, num_durations)
        generated_pitches.append(next_pitch)
        generated_durations.append(next_duration)
        
        # get ready for next iteration
        current_pitch, current_duration = next_pitch, next_duration
        
    
    composed_stream = stream.Stream()
    for pair in list(zip(generated_pitches, generated_durations)):
        p = pitch.Pitch(midi=pitches[pair[0]])
        d = duration.Duration(durations[pair[1]])
        n = note.Note()
        n.pitch = p
        n.duration = d
        composed_stream.append(n)
    
    return composed_stream

# Generation infrastructure

In [0]:
bidirectional_vals = [True, False]
lstm_cell_vals = [500, 1000, 2000]
temperature_vals = [0.2, 0.5, 1.0, 10.0]
epoch_batch_vals = [(1, 1), (5, 8), (10, 16), (30, 32)] # [(epochs, batch_size), ...]

def generate_param_sets(start=None, end=None):
    """
    Creates sets of possible parameter combinations that will be tested over.
    Optionally allows to pass in indices to return a sub set of param sets in case training in to be done at separate times.
    
    Parameters:
        start (integer): The starting index of the range of sets to return from all possible sets.
        end (int): The ending index (inclusive) of the renage of sets to return from all possible sets.
        *NOTE that start and end must either both be None or integers; only suppling one argument causes an exception to be raised.
    """
    sets = []
    for bidirectional in bidirectional_vals:
        for lstm_cells in lstm_cell_vals:
            for temperature in temperature_vals:
                for epochs, batch_size in epoch_batch_vals:
                    s = {
                        "bidirectional": bidirectional,
                        "lstm_cells": lstm_cells,
                        "temperature": temperature,
                        "epochs": epochs,
                        "batch_size": batch_size
                    }
                    sets.append(s)
                    
    if start is None:
      start = 0
    if end is None or end > len(sets):
      end = len(sets) - 1  # subtract 1 because of the +1 in the end slice

    return sets[start:end+1]  # +1 to make the index passed in inclusive

In [0]:
class MetadataModel:
    """
    A wrapper class for LSTM models that is used to gather metadata from training.
    """
    
    def __init__(self, num_features, lstm_cells, bidirectional, temperature, epochs,\
                 batch_size, verbose=0, id=None):
        self._model = get_model(num_features, lstm_cells, bidirectional, temperature)
        self._num_features = num_features
        self._lstm_cells = lstm_cells
        self._bidirectional = bidirectional
        self._temperature = temperature
        self._epochs = epochs
        self._batch_size = batch_size
        self.verbose = verbose # no underscore because this should be mutable
        self._name = '_'.join([str(bidirectional), str(lstm_cells), str(temperature), str(epochs), str(batch_size)])
        if id is not None:
            self.name += f"_{id}"
        self._total_training_time = 0.0 # in seconds
    
    # ---- setting properties so that these attributes are immutable ----
    @property
    def name(self):
        return self._name
    
    @property
    def model(self):
        return self._model
        
    @property
    def num_features(self):
        return self._num_features
    
    @property
    def lstm_cells(self):
        return self._lstm_cells
    
    @property
    def bidirectional(self):
        return self._bidirectional
    
    @property
    def temperature(self):
        return self._temperature
    
    @property
    def epochs(self):
        return self._epochs
    
    @property
    def batch_size(self):
        return self._batch_size
        
    # returns a string version of training time
    @property
    def training_duration(self):
        total_time = self._total_training_time
        hours = int(total_time // 3600)  # 3600 seconds/hour
        minutes = int((total_time - (3600 * hours)) // 60) # subtract the hours from remaining time; 60 sec/min
        seconds = (total_time - (3600 * hours)) - (60 * minutes)
        return "{}h {}m {:.2f}s".format(hours, minutes, seconds)
            
    def train(self, X):
        start_time = time.time()
        train_model(self.model, X, self.bidirectional, self.epochs, self.batch_size, self.verbose)
        end_time = time.time()
        self._total_training_time += end_time - start_time

In [0]:
def sample_start_pitch():
    """
    Sample from the dictionary containing the probabilities of starting a piece with a given pitch.
    
    Parameters:
        dictionary (dict(int: float)): The dictionary whose values are probabilities, and whose keys are the MIDI pitch.
                                       This dictionary must be sorted by key.
        
    Returns:
        int: The index of the MIDI pitch in the pitches list.
    """
    logits = np.array(list(starting_pitch_likelihood.values()))
    return np.argmax(np.random.multinomial(1, logits))
    
def sample_start_duration():
    """
    Sample from the dictionary containing the probabilities of starting a piece with a given duration.
    
    Parameters:
        dictionary (dict(float: float)): The dictionary whose values are probabilities, and whose keys are the quarter length duration.
                                         This dictionary must be sorted by key.
        
    Returns:
        np.array: The index of the sampled duration in the durations list.
    """
    logits = np.array(list(starting_duration_likelihood.values()))
    return np.argmax(np.random.multinomial(1, logits))

In [0]:
def run_tests(pitch_short, pitch_medium, pitch_long, duration_short, duration_medium,\
              duration_long, param_set_start=None, param_set_end=None,\
              num_compositions=5, verbose=0):
    """
    Creates, trains, and composes with networks of a multitude of parameter combinations.
    
    Parameters:
        pitch_short (np.array): The short pitch sequences.
        pitch_medium (np.array): The medium pitch sequences.
        pitch_long (np.array): The long pitch sequences.
        duration_short (np.array): The short duration sequences.
        duration_medium (np.array): The medium duration sequences.
        duration_long (np.array): The long duration sequences.
        param_set_start (int): The first of the generated param. sets to be tested on.
        param_set_end (int): The last of the generated param. sets to be tested on.
        num_compositions (int): How many pieces to compose for each model being tested.
        verbose (int): The amount of information to print throughout the test.
        
        Returns: None
    """
    
    param_sets = generate_param_sets(param_set_start, param_set_end)
    pitch_models, duration_models = [], []
    model_no = 0 # which iteration of model we are on
    if param_set_start is not None:
      model_no = param_set_start
    if not os.path.exists("models"):
            os.mkdir("models")
            
    test_start_time=time.time()            
    for params in param_sets:
        bidirectional = params["bidirectional"]
        lstm_cells = params["lstm_cells"]
        temperature = params["temperature"]
        epochs = params["epochs"]
        batch_size = params["batch_size"]
        
        if verbose:
            print(f"Model {model_no}:\n\tbidirectional – {bidirectional}\n\tlstm cells – {lstm_cells}\
            \n\ttemperature – {temperature}\n\tepochs – {epochs}\n\tbatch size – {batch_size}")
        
        pitch_model = MetadataModel(num_pitches, lstm_cells, bidirectional, temperature, epochs, batch_size, verbose)
        duration_model = MetadataModel(num_durations, lstm_cells, bidirectional, temperature, epochs, batch_size, verbose)
        
        # train the pitch network
        if verbose:
            print('\n\tTraining pitch generation network...')
            print('\t\tOn short data set:')
        pitch_model.train(pitch_short)
        if verbose:
            print('\t\tOn medium data set:')
        pitch_model.train(pitch_medium)
        if verbose:
            print('\t\tOn long data set:')
        pitch_model.train(pitch_long)
        pitch_model.model.save(f"{MODELS_DIRECTORY}/{model_no}_pitch_{pitch_model.name}.h5")
        if verbose:
            print(f"\t\tPitch model training complete: saved at {MODELS_DIRECTORY}/{model_no}_pitch_{pitch_model.name}.h5")
            print(f"\tTotal training time: {pitch_model.training_duration}'")
        
        # train the duration network
        if verbose:
            print('\n\tTraining rhythm (duration) generation network...')
            print('\t\tOn short data set:')
        duration_model.train(duration_short)
        if verbose:
            print('\t\tOn medium data set:')
        duration_model.train(duration_medium)
        if verbose:
            print('\t\tOn long data set:')
        duration_model.train(duration_long)
        duration_model.model.save(f"{MODELS_DIRECTORY}/{model_no}_duration_{duration_model.name}.h5")
        if verbose:
            print(f"\t\tPitch model training complete: saved at {MODELS_DIRECTORY}/{model_no}_duration_{duration_model.name}.h5")
            print(f"\tTotal training time: {duration_model.training_duration}'")
        
        if verbose:
            print(f'\tModel {model_no} training complete')
            print(f'\t\tTotal time to train pitch and duration models: {duration_model.training_duration + pitch_model.training_duration}')
        
        # compose outputs and save them
        if not os.path.exists(f"{COMPOSITIONS_DIRECTORY}/model_no_{model_no}"):
            os.makedirs(f"{COMPOSITIONS_DIRECTORY}/model_{model_no}")
        comp_count = 1
        for c in range(num_compositions):
            pitch_prompt = sample_start_pitch()
            duration_prompt = sample_start_duration()
            composition = compose(pitch_model.model, duration_model.model, pitch_prompt, duration_prompt, length=100)
            composition.write('musicxml', f'{COMPOSITIONS_DIRECTORY}/model_{model_no}/composition_{comp_count}.mxl')
            composition.write('midi', f'{COMPOSITIONS_DIRECTORY}/model_{model_no}/composition_{comp_count}.mid')
            comp_count += 1
        print(f"\t\tCompositions successfully written to {COMPOSITIONS_DIRECTORY}/model_{model_no}")
            
        model_no = model_no + 1
        print('–' * 50 + "\n\n")
        
        
    test_end_time=time.time()
    total_time = test_end_time - test_start_time
    hours = int(total_time // 3600)  # 3600 seconds/hour
    minutes = int((total_time - (3600 * hours)) // 60) # subtract the hours from remaining time; 60 sec/min
    seconds = (total_time - (3600 * hours)) - (60 * minutes)
    print(f"TRAINING COMPLETE – elapsed time: {hours}h {minutes}m {seconds}s")
    

In [0]:
run_tests(short_seqs_pitch, medium_seqs_pitch, long_seqs_pitch, short_seqs_duration, medium_seqs_duration, long_seqs_duration, verbose=1)