In [1]:
import os
import json
import numpy as np
import music21 as m21
import tensorflow.keras as keras

# Constantes

In [13]:
SUPPORTED_DURATIONS = [
    0.25, # Semicorchea/Step  (ignoramos semicorchea con punto porque usa fusas)
    0.5,  # Corchea
    0.75, # Corchea con punto
    1,    # Negra
    1.25, # Negra ligada a semi-corchea
    1.5,  # Negra con punto
    1.75, # Negra con punto ligada a semi-corchea
    2,    # Blanca 
    2.25, # Blanca ligada a semi-corchea
    2.5,  # Blanca ligada a corchea
    2.75, # Blanca ligada a corchea con punto
    3,    # Blanca con punto
    3.25, # Blanca con punto ligada a semi-corchea
    3.5,  # Blanca con punto ligada a corchea
    3.75, # Blanca con punto ligada a corchea con punto
    4     # Redonda
]
STEP_DURATION = 0.25   # In relation to a quarter note (basically a 16th note)

# Pre-processing
SEQUENCE_LENGTH = 64
DELIMITER_SYMBOL = '/'
NON_MIDI_SYMBOLS = ['r', '_', DELIMITER_SYMBOL]
NUMBER_OF_MIDI_VALUES = 128

# Paths
RAW_DATASET_PATH = 'data/raw_dataset'
PREPROCESSED_DATASET_DESTINATION = 'data/preprocessed_dataset/individual_scores'
MERGED_DATASET_DESTINATION = 'data/preprocessed_dataset/merged_preprocessed_dataset.txt'
LOOKUP_TABLE_DESTINATION = 'data/lookup_table.json'
DEFAULT_MODEL_PATH = 'model.h5'


## Preprocesamiento de datos

In [9]:
def preprocess_data():

    scores = load_training_data()

    # Create and save time series for individual scores
    preprocess_individual_scores(scores)

    # Create single file dataset by merging all time series scores
    create_merged_dataset()

def load_training_data():

    scores = []
    for path, _, files in os.walk(RAW_DATASET_PATH):
        for file in files:
            if file[-4:] == ".krn":
                score = m21.converter.parse(os.path.join(path, file))
                scores.append(score)
    return scores

def preprocess_individual_scores(scores):

    for i, score in enumerate(scores):
        
        # Filter score with unsupported durations
        duration_complaint = check_durations(score)
        if not duration_complaint: continue

        # Transpose score to Cmaj/Amin keys
        transposed_score = transpose_music_to_CA(score)

        # Encode score in time-series representation
        encoded_score = encode_music(transposed_score)
        
        preprocessed_score_path = f'{PREPROCESSED_DATASET_DESTINATION}/{i}-preprocessed_score.txt'
        with open(preprocessed_score_path, 'w') as fp:
            fp.write(encoded_score)

def check_durations(score):

    for musical_event in score.flat.notesAndRests:
        if musical_event.duration.quarterLength not in SUPPORTED_DURATIONS: return False

    return True

def transpose_music_to_CA(score, key='n/a', mode='n/a'):
    
    # No user input (training)
    if key == 'n/a':
    
        # Get key by metadata
        parts = score.getElementsByClass(m21.stream.Part)
        measures_part0 = parts[0].getElementsByClass(m21.stream.Measure) 
        key = measures_part0[0][4] # Here is where key resides if in metadata

        if not isinstance(key, m21.key.Key):
            key = score.analyze("key") 

        mode = key.mode

    # Get interval
    if mode=='major':
        interval = m21.interval.Interval(get_pitch(key), m21.pitch.Pitch('C'))  
    elif mode=='minor': 
        interval = m21.interval.Interval(get_pitch(key), m21.pitch.Pitch('A'))  
    else: # Modal music, no need to transpose (should not happen or very marginal case)
        interval = m21.interval.Interval("P1") 

    transposed_score = score.transpose(interval)
    return transposed_score

def get_pitch(key):

    if isinstance(key, m21.key.Key): return key.tonic
    return m21.pitch.Pitch(key)

def encode_music(transposed_score):

    # Encode seed in time series string
    encoded_score = []

    for musical_event in transposed_score.flat.notesAndRests:

        if isinstance(musical_event, m21.note.Note):
            event_type = musical_event.pitch.midi  
        else:
            event_type = "r"

        # Get event duration
        num_of_steps = int(musical_event.duration.quarterLength / STEP_DURATION) # Event duration

        # Encode event and it's duration
        encoded_score += [event_type] + ["_"] * (num_of_steps - 1)


    # Make string out of whole list 
    encoded_score = " ".join(map(str, encoded_score))
    return encoded_score

def create_merged_dataset():
    merged_timeseries = ''
    song_separator = (DELIMITER_SYMBOL + ' ') * SEQUENCE_LENGTH

    for path, _, files in os.walk(PREPROCESSED_DATASET_DESTINATION):

        for file in files:
            file_path = os.path.join(path, file)

            with open(file_path, 'r') as fp: 
                single_timeseries_score = fp.read()

            merged_timeseries = merged_timeseries + single_timeseries_score + " " + song_separator


    with open(MERGED_DATASET_DESTINATION, 'w') as fp:
        fp.write(merged_timeseries)

preprocess_data()

## Entrenamiento

In [10]:
NUM_UNITS = [256, 256]
LOSS = "sparse_categorical_crossentropy"
LEARNING_RATE = 0.001
EPOCHS = 1
BATCH_SIZE = 64

def train():

    inputs, targets = generate_training_sequences()

    model = build_model()

    model.fit(inputs, targets, epochs=EPOCHS, batch_size=BATCH_SIZE)

    model.save(DEFAULT_MODEL_PATH)


def build_model():

    output_units = get_vocabulary_size()

    input = keras.layers.Input(shape=(None, output_units))
    x = keras.layers.LSTM(NUM_UNITS[0], return_sequences=True)(input)
    x = keras.layers.Dropout(0.2)(x)

    x = keras.layers.LSTM(NUM_UNITS[1])(x)
    x = keras.layers.Dropout(0.2)(x)

    output = keras.layers.Dense(output_units, activation="softmax")(x)

    model = keras.Model(input, output)

    model.compile(loss=LOSS,
                  optimizer=keras.optimizers.Adam(learning_rate=LEARNING_RATE),
                  metrics=["accuracy"])

    model.summary()

    return model

def generate_training_sequences():
    sequence_length = SEQUENCE_LENGTH
    with open(MERGED_DATASET_DESTINATION) as f:
      songs = f.read()
    int_songs = convert_songs_to_int(songs)

    inputs = []
    targets = []

    num_sequences = len(int_songs) - sequence_length
    for i in range(num_sequences):
        inputs.append(int_songs[i:i+sequence_length])
        targets.append(int_songs[i+sequence_length])

    vocabulary_size = get_vocabulary_size()
    inputs = keras.utils.to_categorical(inputs, num_classes=vocabulary_size)
    targets = np.array(targets)

    return inputs, targets

def convert_songs_to_int(songs):
    with open(LOOKUP_TABLE_DESTINATION, "r") as fp:
        mappings = json.load(fp)

    songs = songs.split()

    int_songs = [mappings[x] for x in songs]

    return int_songs

def get_vocabulary_size():
  with open(LOOKUP_TABLE_DESTINATION, "r") as fp:
    mappings = json.load(fp)
  return len(mappings.keys())

train()

Model: "model_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_3 (InputLayer)        [(None, None, 131)]       0         
                                                                 
 lstm_3 (LSTM)               (None, None, 256)         397312    
                                                                 
 dropout_3 (Dropout)         (None, None, 256)         0         
                                                                 
 lstm_4 (LSTM)               (None, 256)               525312    
                                                                 
 dropout_4 (Dropout)         (None, 256)               0         
                                                                 
 dense_2 (Dense)             (None, 131)               33667     
                                                                 
Total params: 956,291
Trainable params: 956,291
Non-trainab

## Predicción de melodías

In [15]:
def load_model():
    model =  keras.models.load_model(DEFAULT_MODEL_PATH)
    mappings = json.load(open(LOOKUP_TABLE_DESTINATION))
    start_symbols = ['/'] * SEQUENCE_LENGTH
    return model, mappings, start_symbols

model, mappings, start_symbols = load_model()

def generate_melody( 
      seed, 
      num_steps=500, 
      max_sequence_length=SEQUENCE_LENGTH, 
      temperature=1.0,
      model=model,
      mappings=mappings,
      start_symbols=start_symbols
    ):
        seed = seed.split()
        melody = seed
        seed = start_symbols + seed
        seed = [mappings[symbol] for symbol in seed]

        for _ in range(num_steps):

            seed = seed[-max_sequence_length:]

            onehot = keras.utils.to_categorical(seed, num_classes=len(mappings))
            
            onehot = onehot[np.newaxis, ...]

            probabilities = model.predict(onehot)[0]

            output_int = sample_with_temperature(probabilities, temperature)

            seed.append(output_int)

            output_symbol = [k for k, v in mappings.items() if v == output_int][0]

            # check whether we're at the end of a melody
            if output_symbol == "/":
                break

            melody.append(output_symbol)

        return melody

def sample_with_temperature(probabilites, temperature):

        predictions = np.log(probabilites) / temperature
        probabilites = np.exp(predictions) / np.sum(np.exp(predictions))

        choices = range(len(probabilites))
        index = np.random.choice(choices, p=probabilites)

        return index


def save_melody(
      melody,
      step_duration=0.25,
      format="midi",
      file_name="mel.mid",
      key="C",
      mode="major",
      tempo=120
    ):
        stream = m21.stream.Stream()

        start_symbol = None
        step_counter = 1

        for i, symbol in enumerate(melody):

            # handle case in which we have a note/rest and its not the end of the melody
            if symbol != "_" or i + 1 == len(melody):

                # dealing with note/rest beyond the first one
                if start_symbol is not None:

                    quarter_length_duration = step_duration * step_counter

                    if start_symbol == "r":
                        m21_event = m21.note.Rest(quarterLength=quarter_length_duration)

                    else:
                        m21_event = m21.note.Note(int(start_symbol), quarterLength=quarter_length_duration)

                    stream.append(m21_event)

                    step_counter = 1

                start_symbol = symbol

            # handle case in which we have a prolongation sign "_"
            else:
                step_counter += 1
        stream = transpose_music_from_CA(stream, key, mode)

        tempo_factor = 120 / tempo
        
        stream = stream.scaleOffsets(tempo_factor).scaleDurations(tempo_factor)
        stream.write(format, file_name)


def transpose_music_from_CA(score, key, mode):
  if key == 'C' and mode== 'major' or key == 'A' and mode== 'minor':
    return score

  if mode=='major':
    interval = m21.interval.Interval(m21.pitch.Pitch('C'), m21.pitch.Pitch(key))
  elif mode=='minor':
    interval = m21.interval.Interval(m21.pitch.Pitch('A'), m21.pitch.Pitch(key))
  else:
    interval = m21.interval.Interval("P1")

  return score.transpose(interval)


def encode_midi_input(key, mode, midi_file):

    raw_input_midi_seed = m21.converter.parse(midi_file) 

    seed_is_duration_complaint = check_durations(raw_input_midi_seed)
    if(not seed_is_duration_complaint):
        raise ValueError("Input MIDI file contains unsupported note/rest durations.")

    transposed_seed = transpose_music_to_CA(raw_input_midi_seed, key=key, mode=mode)
    encoded_seed = encode_music(transposed_seed)

    return encoded_seed




## Ejemplo predicción

In [None]:
# Input variables
key = "C"
mode = "mode"
temperature = 0.1
tempo = 80
midi_seed = 'test_melody_Cmin.mid'

seed = encode_midi_input(key, mode, midi_seed)
melody = generate_melody(seed, temperature=temperature)
save_melody(seed.split(" "), file_name="prediction.mid", tempo=tempo)