In [1]:
import tensorflow as tf
from music21 import converter, instrument, stream, roman, midi, key, interval
import music21
import music21.chord as chord_module
import music21.note as note_module
import glob
import numpy as np
import json
import pickle
from keras.models import Sequential, load_model
from keras.layers import Dense, Dropout, LSTM, Activation, Embedding, Concatenate, Input, Bidirectional, Attention
from keras.layers import BatchNormalization as BatchNorm
from tensorflow.keras.models import Model
from keras.callbacks import ModelCheckpoint
from keras.optimizers import Adam, RMSprop
from tensorflow.keras.callbacks import EarlyStopping
import re

2024-05-01 12:49:03.109629: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-05-01 12:49:03.174917: I tensorflow/core/util/port.cc:104] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-05-01 12:49:03.461548: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: :/home/jupyter-franky/.conda/envs/tf/lib/
2024-05-01 12:49:03.461577: W tensorflow/compiler/xla/st

In [2]:
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))


Num GPUs Available:  1


2024-05-01 12:49:04.453881: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:981] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2024-05-01 12:49:04.456569: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:981] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2024-05-01 12:49:04.456630: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:981] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero


In [3]:
file_path = 'dataset/pop_909/*.mid'

In [4]:
duration_mapping = {
    0.25: 0,  # Sixteenth note
    0.5: 1,   # Eighth note
    0.75: 2,
    1.0: 3,   # Quarter note
    1.25: 4,
    1.5: 5,   # Dotted quarter note
    1.75: 6,
    2.0: 7,   # Half note
    2.25: 8,
    2.5: 9,   
    2.75: 10,
    3.0: 11,   
    3.25: 12,  
    3.5: 13,   
    3.75: 14, 
    4.0: 15    # Whole note
}

In [5]:
def get_duration(duration, duration_mapping=duration_mapping):
    min_diff = float('inf')
    nearest_duration = None
    for key in duration_mapping:
        diff = abs(duration - key)
        if diff < min_diff:
            min_diff = diff
            nearest_duration = key
    return duration_mapping[nearest_duration]


def simplify_roman_name(roman_numeral):
    """
    Simplify roman numeral chord names.
    """
    chord_name = roman_numeral.figure
    # Match everything up to the first digit
    match = re.match(r"^[^\d]*\d", chord_name)
    if match:
        simplified_name = match.group(0) 
    else:
        simplified_name = chord_name
    return simplified_name
 
def extract_chords(stream, transposed_key):
    all_chords = []
    for measure in stream.measures(numberStart=0, numberEnd=None):
        note_durations = {}
        for element in measure.notesAndRests:
            if isinstance(element, chord_module.Chord):
                for single_note in element.pitches:
                    midi_value = single_note.midi
                    duration = get_duration(float(element.duration.quarterLength), duration_mapping)
                    note_durations[midi_value] = duration
            elif isinstance(element, note_module.Note):
                midi_value = element.pitch.midi
                duration = get_duration(float(element.duration.quarterLength), duration_mapping)
                note_durations[midi_value] = duration
            elif isinstance(element, note_module.Rest):
                continue
        
        simplified_name = 'rest_or_no_chord'
        if note_durations:
            sorted_notes = sorted(note_durations, key=note_durations.get, reverse=True)
            if len(sorted_notes) >= 2:
                freq = chord_module.Chord(sorted_notes[:4])
                rn = roman.romanNumeralFromChord(freq, transposed_key)
                simplified_name = simplify_roman_name(rn)
        for _ in range(4):
            all_chords.append([simplified_name])
    return all_chords


def extract_notes(stream, duration_mapping=duration_mapping):
    all_notes = []
    all_durations = []
    for measure in stream.measures(numberStart=0, numberEnd=None):
        measure_notes = {}
        
        for element in measure.notesAndRests:
            beat = element.beat
            if beat not in measure_notes:
                measure_notes[beat] = []
            measure_notes[beat].append(element)
        for beat, notes in measure_notes.items():
            beat_notes = []
            beat_durations = []
            for note in notes:
                if isinstance(note, note_module.Note):
                    midi_value = note.pitch.midi
                    duration = get_duration(float(note.duration.quarterLength), duration_mapping)
                    beat_notes.append(midi_value)
                    beat_durations.append(duration)
                elif isinstance(note, note_module.Rest):
                    duration = get_duration(float(note.duration.quarterLength), duration_mapping)
                    beat_notes.append('R')
                    beat_durations.append(duration)
                else:
                    beat_notes.append('R')
                    beat_durations.append(0)
            all_notes.append(beat_notes)
            all_durations.append(beat_durations)
    return all_notes, all_durations

def add_padding(notes_list, target_length):
    if len(notes_list) < target_length:
        pad_length = target_length - len(notes_list)
        notes_list.extend(['<padding>' for _ in range(pad_length)])
    return notes_list

def extract_parts(file_path):
    try:
        midi = converter.parse(file_path)
        original_key = midi.analyze('key')
        transposed_key = original_key
        # if str(original_key) != 'C major' or str(original_key) != 'a minor':
        #     transposed_key = key.Key('C')
        #     note_interval = interval.Interval(original_key.tonic, transposed_key.tonic)
        #     midi = midi.transpose(note_interval)
        # else:
        #     transposed_key = original_key
            
        melody_stream = stream.Stream()
        bridge_stream = stream.Stream()
        piano_stream = stream.Stream()

        for part in midi.parts:
            if part.partName == 'MELODY':
                melody_stream = part
            elif part.partName == 'BRIDGE':
                bridge_stream = part
            elif part.partName == 'PIANO':
                piano_stream = part

        all_notes, all_durations = extract_notes(melody_stream)
        # bridge_notes = extract_notes(bridge_stream)
        all_chords = extract_chords(piano_stream, transposed_key)

        max_length = max(len(all_notes), len(all_chords), len(all_durations))
        all_notes = add_padding(all_notes, max_length)
        all_durations = add_padding(all_durations, max_length)
        # bridge_notes = pad_or_truncate(bridge_notes, max_length)
        all_chords = add_padding(all_chords, max_length)
        
        all_notes.append('<end>')
        all_durations.append('<end>')
        all_chords.append('<end>')
        return all_notes, all_durations, all_chords

    except Exception as e:
        print(f"Error processing file: {file_path}")
        print(f"Error message: {str(e)}")
        return None, None, None

In [6]:
def preprocess_sequences(notes, durations, chords, sequence_length, notes_mapping, chords_mapping, duration_mapping):
    encoded_notes = []
    encoded_durations = []
    for i, note_list in enumerate(notes):
        if isinstance(note_list, list) and note_list:
            note_str = str(note_list)
            encoded_note = notes_mapping.get(str(note_list), -1)
            note = note_list[0]
            encoded_notes.append(encoded_note)
            encoded_durations.append(durations[i][0])
        elif isinstance(note_list, str) and note_list in ['<padding>', '<end>']:
            encoded_note = notes_mapping.get(note_list, -1)
            encoded_notes.append(encoded_note)
            encoded_durations.append(0)
        else:
            encoded_notes.append(notes_mapping.get('<padding>', -1))
            encoded_durations.append(0)
            
    encoded_chords = convert_to_int(chords, chords_mapping)
    print(len(encoded_notes), len(encoded_chords))
    
    network_input_notes = []
    network_input_durations = []
    network_input_chords = []
    network_output_notes = []
    network_output_durations = []
    network_output_chords = []

    # Duplicate each chord 4 times
    # encoded_chords_duplicated = [chord for chord in encoded_chords for _ in range(4)]
    print(encoded_notes[:20])
    print(encoded_durations[:20])
    print(encoded_chords[:20])
    for i in range(len(encoded_chords) - sequence_length):
        sequence_in_notes = encoded_notes[i:i + sequence_length]
        sequence_out_note = encoded_notes[i + sequence_length]
        sequence_in_durations = encoded_durations[i:i + sequence_length]
        sequence_out_duration = encoded_durations[i + sequence_length]
        sequence_in_chords = encoded_chords[i:i + sequence_length]
        sequence_out_chord = encoded_chords[i + sequence_length]

        network_input_notes.append(sequence_in_notes)
        network_input_durations.append(sequence_in_durations)
        network_input_chords.append(sequence_in_chords)
        network_output_notes.append(sequence_out_note)
        network_output_durations.append(sequence_out_duration)
        network_output_chords.append(sequence_out_chord)

    n_vocab_notes = len(set(encoded_notes))
    n_vocab_chords = len(set(encoded_chords))
    n_vocab_durations = len(duration_mapping)
    
    return np.array(network_input_notes), np.array(network_input_durations), np.array(network_input_chords), np.array(network_output_notes), np.array(network_output_durations), np.array(network_output_chords), n_vocab_notes, n_vocab_chords, n_vocab_durations
    
def create_mappings(items_list, file_path):
    unique_items = []
    for item in items_list:
        if isinstance(item, tuple):
            note, duration = item
        else:
            note = item
        unique_items.append(str(note))
        
    unique_items = list(set(unique_items))  
    if '<padding>' not in unique_items:
        unique_items.append('<padding>')
    
    mappings = {item: number for number, item in enumerate(unique_items)}
    
    with open(file_path, "w") as file:
        json.dump(mappings, file)
    return mappings

def convert_to_int(items, mapping_file):
    int_notes = []
    with open(mapping_file, "r") as fp:
        mappings = json.load(fp)
    for item in items:
        # if isinstance(item, list):
        #     item_str = (str(item)
        # else:
        #     item_str = item
        item_str = str(item)
        int_notes.append(mappings.get(item_str, -1))
    return int_notes

def convert_notes_durations_to_int(items, note_mapping_path, duration_mapping):
    encoded_notes = []
    encoded_durations = []
    with open(note_mapping_path, "r") as fp:
        notes_mapping = json.load(fp)
        
    for item in items:
        note, duration = item
        encoded_durations.append(duration)
    
        encoded_note = notes_mapping.get(str(note), -1)
        encoded_notes.append(encoded_note)
    return encoded_notes, encoded_durations
# def convert_to_int(items, mapping_file):
#     int_items = []
#     with open(mapping_file, "r") as fp:
#         mappings = json.load(fp)
#     for item in items:
#         if isinstance(item, tuple):
#             note, duration = item
#             if isinstance(note, list):
#                 note_str = '_'.join(str(i) for i in note)
#             else:
#                 note_str = str(note)
#             int_note = mappings.get(note_str, -1)
#             int_items.append((int_note, duration))
#         else:
#             int_items.append(mappings.get(str(item), -1))
#     return int_items



In [31]:
midi_files = glob.glob(file_path)
print(len(midi_files))
# midi_files = midi_files[:10]
all_files_chords = []
all_files_notes = []
all_files_durations = []
count = 0
for file in midi_files:
    all_notes, all_durations, all_chords = extract_parts(file)
    if all_notes is not None and all_chords is not None and all_durations is not None:
        all_files_chords.extend(all_chords)
        all_files_notes.extend(all_notes)
        all_files_durations.extend(all_durations)
        count += 1
        print(count)
    else:
        print(f"Skipping file: {file_path} due to extraction error.")


908
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
Error processing file: dataset/pop_909/513.mid
Error message: 131778635447408
Skipping file: dataset/pop_909/*.mid due to extraction error.
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
Error processing file: dataset/pop_909/320.mid
Error message: 131778649523104
Skipping file: dataset/pop_909/*.mid due to extraction error.
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
Error processing file: dataset/pop_909/1

In [7]:
with open('mappings_model/all_chords.pkl', 'rb') as file:
    all_files_chords = pickle.load(file)
with open('mappings_model/all_durations.pkl', 'rb') as file:
    all_files_durations = pickle.load(file)
with open('mappings_model/all_notes.pkl', 'rb') as file:
    all_files_notes = pickle.load(file)
    
print(len(all_files_chords), len(all_files_notes), len(all_files_durations))
print(all_files_chords[:10])

992890 992890 992890
[['rest_or_no_chord'], ['rest_or_no_chord'], ['rest_or_no_chord'], ['rest_or_no_chord'], ['rest_or_no_chord'], ['rest_or_no_chord'], ['rest_or_no_chord'], ['rest_or_no_chord'], ['vi6'], ['vi6']]


In [8]:
note_mapping_path = "mappings_model/note_mappings.txt"
chord_mapping_path = "mappings_model/chord_mappings.txt"

note_mapping = create_mappings(all_files_notes, note_mapping_path)
chord_mapping = create_mappings(all_files_chords, chord_mapping_path)
print(" ")


 


In [9]:
print(all_files_notes[:10])
print(all_files_chords[:10])
print(all_files_durations[:10])

[['R'], ['R'], ['R'], ['R'], ['R'], ['R'], ['R'], ['R'], ['R'], ['R']]
[['rest_or_no_chord'], ['rest_or_no_chord'], ['rest_or_no_chord'], ['rest_or_no_chord'], ['rest_or_no_chord'], ['rest_or_no_chord'], ['rest_or_no_chord'], ['rest_or_no_chord'], ['vi6'], ['vi6']]
[[3], [3], [3], [3], [3], [3], [3], [3], [3], [3]]


In [12]:
all_files_notes = [tuple_element for sublist in all_files_notes for tuple_element in sublist]


In [10]:
sequence_length = 64
network_input_notes, network_input_durations, network_input_chords, network_output_notes, network_output_durations, network_output_chords, n_vocab_notes, n_vocab_chords, n_vocab_durations = preprocess_sequences(all_files_notes, all_files_durations, all_files_chords, sequence_length, note_mapping, chord_mapping_path, duration_mapping)

992890 992890
[403, 403, 403, 403, 403, 403, 403, 403, 403, 403, 403, 403, 403, 403, 403, 403, 403, 403, 403, 68]
[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2, 0]
[447, 447, 447, 447, 447, 447, 447, 447, 613, 613, 613, 613, 410, 410, 410, 410, 410, 410, 410, 410]


In [11]:
print(f'Network input: {network_input_durations[:10]}, Network_output: {network_output_durations[:10]},  Length: {n_vocab_chords}')


Network input: [[3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 2 0 0 0 0 0 1 0 0 0 0 1 0 2 0 0 0 0
  0 0 0 0 0 0 1 3 2 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0]
 [3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 2 0 0 0 0 0 1 0 0 0 0 1 0 2 0 0 0 0 0
  0 0 0 0 0 1 3 2 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0]
 [3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 2 0 0 0 0 0 1 0 0 0 0 1 0 2 0 0 0 0 0 0
  0 0 0 0 1 3 2 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0]
 [3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 2 0 0 0 0 0 1 0 0 0 0 1 0 2 0 0 0 0 0 0 0
  0 0 0 1 3 2 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0]
 [3 3 3 3 3 3 3 3 3 3 3 3 3 3 2 0 0 0 0 0 1 0 0 0 0 1 0 2 0 0 0 0 0 0 0 0
  0 0 1 3 2 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0]
 [3 3 3 3 3 3 3 3 3 3 3 3 3 2 0 0 0 0 0 1 0 0 0 0 1 0 2 0 0 0 0 0 0 0 0 0
  0 1 3 2 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 3]
 [3 3 3 3 3 3 3 3 3 3 3 3 2 0 0 0 0 0 1 0 0 0 0 1 0 2 0 0 0 0 0 0 0 0 0 0
  1 3 2 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 3 2]
 [3 3 3 3 3 3 3 3 3 3 3 2 0 0 0 0 0 1 0 0 0 0 1 0 2 0 

In [12]:
print(len(network_input_notes), len(network_input_durations), len(network_input_chords))

992826 992826 992826


In [34]:
with open('mappings_model/all_chords.pkl', 'wb') as file:
    pickle.dump(all_files_chords, file)
with open('mappings_model/all_notes.pkl', 'wb') as file:
    pickle.dump(all_files_notes, file)
with open('mappings_model/all_durations.pkl', 'wb') as file:
    pickle.dump(all_files_durations, file)

In [13]:

def hierarchical_lstm_model(sequence_length, n_vocab_notes, n_vocab_chords, n_vocab_durations, note_embedding_dim, chord_embedding_dim, duration_embedding_dim, lstm_units, dropout_rate, learning_rate):
    note_input = Input(shape=(sequence_length,), name='note_input')
    note_embedding = Embedding(input_dim=n_vocab_notes + 1, output_dim=note_embedding_dim, input_length=sequence_length, name='note_embedding')(note_input)

    duration_input = Input(shape=(sequence_length,), name='duration_input')
    duration_embedding = Embedding(input_dim=n_vocab_durations + 1, output_dim=duration_embedding_dim, input_length=sequence_length, name='duration_embedding')(duration_input)
    note_duration_embedding = Concatenate(name='note_duration_concat')([note_embedding, duration_embedding])
    note_lstm = LSTM(lstm_units, return_sequences=True, name='note_lstm')(note_duration_embedding)
    note_lstm_dropout = Dropout(dropout_rate, name='note_dropout')(note_lstm)

    chord_input = Input(shape=(sequence_length,), name='chord_input')
    chord_embedding = Embedding(input_dim=n_vocab_chords + 1, output_dim=chord_embedding_dim, input_length=sequence_length, name='chord_embedding')(chord_input)
    chord_lstm = LSTM(lstm_units, return_sequences=True, name='chord_lstm')(chord_embedding)
    chord_lstm_dropout = Dropout(dropout_rate, name='chord_dropout')(chord_lstm)

    combined = Concatenate(name='concatenate')([note_lstm_dropout, chord_lstm_dropout])
    combined_lstm = Bidirectional(LSTM(lstm_units, name='combined_lstm'))(combined)
    combined_dropout = Dropout(dropout_rate, name='combined_dropout')(combined_lstm)

    chord_output = Dense(n_vocab_chords + 1, activation='softmax', name='chord_output')(combined_dropout)
    note_output = Dense(n_vocab_notes + 1, activation='softmax', name='note_output')(combined_dropout)
    duration_output = Dense(n_vocab_durations + 1, activation='softmax', name='duration_output')(combined_dropout)

    # Model compilation
    model = Model(inputs=[note_input, duration_input, chord_input], outputs=[note_output, duration_output, chord_output])
    model.compile(optimizer=Adam(learning_rate=learning_rate, clipnorm=1.0),
                  loss={'note_output': 'sparse_categorical_crossentropy',
                        'duration_output': 'sparse_categorical_crossentropy',
                        'chord_output': 'sparse_categorical_crossentropy'
                        },
                  loss_weights={'note_output': 1.0, 'duration_output': 1.0, 'chord_output': 1.0, })

    return model
# def hierarchical_lstm_model_with_attention(sequence_length, n_vocab_notes, n_vocab_chords, note_embedding_dim, chord_embedding_dim, lstm_units, dropout_rate, learning_rate):
#     note_input = Input(shape=(sequence_length,), name='note_input')
#     note_embedding = Embedding(input_dim=n_vocab_notes + 1, output_dim=note_embedding_dim, input_length=sequence_length, name='note_embedding')(note_input)
#     note_lstm = LSTM(lstm_units, return_sequences=True, name='note_lstm')(note_embedding)
#     note_lstm_dropout = Dropout(dropout_rate, name='note_dropout')(note_lstm)

#     chord_input = Input(shape=(sequence_length,), name='chord_input')
#     chord_embedding = Embedding(input_dim=n_vocab_chords + 1, output_dim=chord_embedding_dim, input_length=sequence_length, name='chord_embedding')(chord_input)
#     chord_lstm = LSTM(lstm_units, return_sequences=True, name='chord_lstm')(chord_embedding)
#     chord_lstm_dropout = Dropout(dropout_rate, name='chord_dropout')(chord_lstm)

#     combined = Concatenate(name='concatenate')([note_lstm_dropout, chord_lstm_dropout])
#     combined_lstm = Bidirectional(LSTM(lstm_units // 2, return_sequences=True, name='combined_lstm'))(combined)
#     # attention = Attention(name='attention')([combined_lstm, combined_lstm])
    
#     combined_dropout = Dropout(dropout_rate, name='combined_dropout')(combined_lstm)

#     chord_output = Dense(n_vocab_chords + 1, activation='softmax', name='chord_output')(combined_dropout)
#     note_output = Dense(n_vocab_notes + 1, activation='softmax', name='note_output')(combined_dropout)

#     model = Model(inputs=[note_input, chord_input], outputs=[note_output, chord_output])
#     model.compile(optimizer=Adam(learning_rate=learning_rate), loss={'note_output': 'sparse_categorical_crossentropy', 'chord_output': 'sparse_categorical_crossentropy'}, loss_weights={'note_output': 1.0, 'chord_output': 1.0})
#     return model

In [14]:
EPOCHS = 30
BATCH_SIZE = 128
learning_rate = 0.0005
sequence_length = 64
note_embedding_dim = 300
chord_embedding_dim = 100
duration_embedding_dim = 50
lstm_units = 512
dropout_rate = 0.4
MODEL_PATH = "mappings_model/model1.h5"
output_path = "mappings_model/output1.mid"

In [15]:
# Check for NaN values in network_input_notes
if np.isnan(network_output_notes).any():
    print("NaN values found in network_input_notes!")
else:
    print("No NaN values found in network_input_notes.")

# Check for NaN values in network_input_chords
if np.isnan(network_output_chords).any():
    print("NaN values found in network_input_chords!")
else:
    print("No NaN values found in network_input_chords.")

# Check for NaN values in network_input_durations
if np.isnan(network_output_durations).any():
    print("NaN values found in network_input_durations!")
else:
    print("No NaN values found in network_input_durations.")

No NaN values found in network_input_notes.
No NaN values found in network_input_chords.
No NaN values found in network_input_durations.


In [16]:
def train(continue_training=False):
    network_input = [network_input_notes, network_input_durations, network_input_chords]
    network_output = [network_output_notes, network_output_durations, network_output_chords]
    
    if continue_training:
        # Load the previously trained model
        model = load_model(MODEL_PATH)
    else:
        # Create a new model
        model = hierarchical_lstm_model(sequence_length, n_vocab_notes, n_vocab_chords, n_vocab_durations, note_embedding_dim, chord_embedding_dim, duration_embedding_dim, lstm_units, dropout_rate, learning_rate)
    
    early_stopping = EarlyStopping(
        monitor='loss',  # Monitor the validation loss
        patience=10,  # Number of epochs to wait for improvement
        verbose=1,  # Print messages when early stopping is triggered
        mode='min'  # Look for a minimum value of the monitored metric
    )
    
    
    model.compile(optimizer=Adam(learning_rate=learning_rate, clipnorm=1.0),
                  loss={'note_output': 'sparse_categorical_crossentropy',
                        'duration_output': 'sparse_categorical_crossentropy',
                        'chord_output': 'sparse_categorical_crossentropy'},
                  loss_weights={'note_output': 1.0, 'chord_output': 1.0, 'duration_output': 1.0})


    model.fit(network_input, network_output,
              epochs=EPOCHS,
              batch_size=BATCH_SIZE,
              callbacks=[early_stopping])
    
    model.save(MODEL_PATH)


train(True)

2024-05-01 12:49:45.659433: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-05-01 12:49:45.660152: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:981] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2024-05-01 12:49:45.660240: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:981] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2024-05-01 12:49:45.660283: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:981] successful NUMA node read from SysFS had negative value (-1), but there must be at

Epoch 1/30


2024-05-01 12:49:49.330525: I tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:428] Loaded cuDNN version 8100


   3/7757 [..............................] - ETA: 4:23 - loss: 33.8467 - note_output_loss: 24.1851 - duration_output_loss: 1.2486 - chord_output_loss: 8.4130    

2024-05-01 12:49:49.670975: I tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:630] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.


Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


In [17]:
def softmax(x):
    return np.exp(x) / np.sum(np.exp(x), axis=0)

def temperature_sampling(predictions, temperature=1.0):
    scaled = np.log(predictions) / temperature
    probabilities = softmax(scaled)
    choice = np.random.choice(len(probabilities), p=probabilities) # choose according to probability
    return choice
    
def predict_notes(model_path, starting_notes, starting_durations, starting_chords, sequence_length, n_notes=300, temperature=1.0):
    # Load the model
    model = load_model(model_path)

    prediction_output_notes = []  
    prediction_output_durations = []
    
    starting_chords_int = convert_to_int(starting_chords, chord_mapping_path)
    # starting_notes_int, starting_durations_int = convert_notes_durations_to_int(starting_notes, note_mapping_path, duration_mapping)
    starting_notes_int = convert_to_int(starting_notes, note_mapping_path)
    starting_durations_int = [duration[0] for duration in starting_durations]
    
    with open(note_mapping_path, "r") as fp:
        note_mappings = json.load(fp)
        
    reverse_note_mappings = {value: key for key, value in note_mappings.items()}
    reverse_duration_mappings = {value: key for key, value in duration_mapping.items()}

    prediction_output_notes.extend([reverse_note_mappings[note] for note in starting_notes_int])
    prediction_output_durations.extend(reverse_duration_mappings[duration] for duration in starting_durations_int)
    
    for _ in range(n_notes):
        input_sequence_chords = np.array(starting_chords_int[-sequence_length:]).reshape(1, sequence_length)
        input_sequence_notes = np.array(starting_notes_int[-sequence_length:]).reshape(1, sequence_length)
        input_sequence_durations = np.array(starting_durations_int[-sequence_length:]).reshape(1, sequence_length)

        prediction_notes, prediction_durations, prediction_chords = model.predict([input_sequence_notes, input_sequence_durations, input_sequence_chords], verbose=0)
        next_note = temperature_sampling(prediction_notes[0], temperature)  
        next_duration = temperature_sampling(prediction_durations[0], temperature)  
        next_chord = temperature_sampling(prediction_chords[0], temperature) 
        
        starting_notes_int.append(next_note)
        starting_durations_int.append(next_duration)
        starting_chords_int.append(next_chord)
        
        prediction_output_notes.append(reverse_note_mappings[next_note])
        prediction_output_durations.append(reverse_duration_mappings[next_duration])
    print(starting_chords_int)
    # res.extend(prediction_output_notes)

    return prediction_output_notes, prediction_output_durations, starting_chords_int

In [18]:
starting_notes = all_files_notes[1400:1464]
starting_chords = all_files_chords[1400:1464]
starting_durations = all_files_durations[1400:1464]


In [19]:
print(starting_durations)

[[9], [0], [1], [0], [0], [0], [1], [1], [0], [1], [1], [0], [0], [6], [2], [9], [0], [1], [0], [0], [0], [1], [1], [0], [3], [6], [0], [1], [1], [0], [1], [1], [0], [3], [2], [0], [14], [0], [1], [9], [0], [1], [0], [0], [0], [0], [0], [2], [1], [1], [0], [0], [2], [0], [1], [0], [0], [0], [1], [9], [0], [1], [0], [0]]


In [20]:
sequence_length = 64
n_notes = 400
prediction_output_notes, prediction_output_durations, predicted_chords = predict_notes(MODEL_PATH, starting_notes, starting_durations, starting_chords, sequence_length, n_notes, temperature=1.18)


[39, 39, 574, 574, 574, 574, 609, 609, 609, 609, 429, 429, 429, 429, 39, 39, 39, 39, 574, 574, 574, 574, 377, 377, 377, 377, 58, 58, 58, 58, 270, 270, 270, 270, 325, 325, 325, 325, 23, 23, 23, 23, 285, 285, 285, 285, 148, 148, 148, 148, 447, 447, 447, 447, 377, 377, 377, 377, 58, 58, 58, 58, 447, 447, 447, 447, 368, 368, 368, 368, 285, 285, 285, 285, 429, 429, 429, 429, 148, 148, 148, 148, 429, 429, 429, 429, 574, 574, 574, 574, 447, 447, 447, 447, 447, 447, 447, 447, 285, 285, 285, 285, 57, 57, 57, 57, 54, 54, 54, 54, 148, 148, 148, 148, 418, 418, 418, 418, 410, 410, 410, 410, 643, 643, 643, 643, 57, 57, 57, 57, 447, 447, 447, 447, 285, 285, 285, 285, 447, 447, 447, 447, 23, 23, 23, 23, 447, 447, 447, 447, 447, 447, 447, 447, 447, 447, 447, 447, 58, 58, 58, 58, 447, 447, 447, 447, 147, 147, 147, 147, 447, 447, 447, 447, 39, 39, 39, 39, 368, 368, 368, 368, 325, 325, 325, 325, 447, 447, 447, 447, 377, 377, 377, 377, 447, 447, 447, 447, 175, 175, 175, 175, 447, 447, 447, 447, 175, 175, 1

In [21]:
print(len(prediction_output_notes), len(prediction_output_durations), len(predicted_chords))

464 464 464


In [22]:
print(starting_notes[:20])

[[71], ['R'], [71], [71], [71], ['R'], [76], [76], ['R'], [74], [74], ['R'], [71], [74], [74], [79], ['R'], [79], [79], [79]]


In [23]:
print(prediction_output_notes, prediction_output_durations[:20])

['[71]', "['R']", '[71]', '[71]', '[71]', "['R']", '[76]', '[76]', "['R']", '[74]', '[74]', "['R']", '[71]', '[74]', '[74]', '[79]', "['R']", '[79]', '[79]', '[79]', "['R']", '[79]', '[76]', "['R']", '[74]', '[71]', '[71]', '[71]', '[71]', "['R']", '[69]', '[69]', "['R']", '[69]', '[67]', '[67]', '[69]', '[69]', '[62]', '[71]', "['R']", '[71]', '[72]', '[72]', "['R']", '[74]', '[74]', "['R']", '[72]', '[72]', "['R']", '[71]', '[71]', "['R']", '[69]', '[69]', '[69]', "['R']", '[67]', '[71]', "['R']", '[71]', '[71]', '[71]', "['R']", '[72]', '[74]', "['R']", '[72]', '[74]', "['R']", '[76]', '[76]', '[69]', '[69]', '[67]', "['R']", '[79]', '[79]', "['R']", '[76]', '[74]', "['R']", '[74]', '[74]', "['R']", '[72]', '[69]', '[69]', '[69]', '[69]', '[71]', '[67]', "['R']", '[69]', '[71]', "['R']", '[71]', '[71]', '[71]', "['R']", '[71]', '[69]', "['R']", '[69]', '[67]', "['R']", '[69]', '[69]', '[69]', '[67]', "['R']", '[72]', '[74]', "['R']", '[74]', '[74]', '[74]', "['R']", '[72]', '[74]', 

In [24]:
def create_midi(predicted_notes, predicted_durations, output_path=output_path):
    s = stream.Stream()
    for i, note_str in enumerate(predicted_notes):
        if note_str == '<end>':
            break
        elif note_str == '<padding>':
            continue
        else:
            note = eval(note_str)
            if note == ['R']:
                r = note_module.Rest()
                r.quarterLength = int(predicted_durations[i])
                s.append(r)

            else:
                n = note_module.Note()
                if isinstance(note, list) and len(note) > 0 and isinstance(note[0], int):
                    n.pitch.midi = note[0]
                    n.quarterLength = float(predicted_durations[i])
                    s.append(n)

    s.write('midi', fp=output_path)


In [25]:
create_midi(prediction_output_notes, prediction_output_durations, output_path)