In [None]:
# TO RUN THE WHOLE CODE WITH THE DATASET AVAILABLE, GO TO THE GITHUB PAGE https://github.com/carlosds27/CSC413-Emo-Music
# TO GENERATE MUSIC, RUN ALL OF THE CODE BELOW EXCEPT THE TRAINING BLOCK (THERE ARE SOME EXAMPLES ON HOW TO GENERATE MUSIC)

In [1]:
import os
import glob
from pathlib import Path
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import *
from keras.callbacks import ModelCheckpoint
from keras.utils import to_categorical
from music21 import *
import random

In [2]:
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))
os.environ["TF_GPU_ALLOCATOR"] = 'cuda_malloc_async'
gpus = tf.config.list_physical_devices('GPU')

Num GPUs Available:  1


In [3]:
midi_path = "EMOPIA_2.2/midis"
q1_midi = []
q2_midi = []
q3_midi = []
q4_midi = []

In [4]:
# Preprocess Data
for q in ["Q1","Q2","Q3","Q4"]:
    notes = []
    for f in glob.glob(os.path.join(midi_path, q + "*")):
        # Translate the midi file into a stream of midi objects (notes and chords)
        midi_file = converter.parse(f)
        notes_to_parse = None
        # Check if there are instruments
        parts = instrument.partitionByInstrument(midi_file)
        if parts: 
            # file has instrument parts
            notes_to_parse = parts.parts[0].recurse()
        else: 
            # file has notes in a flat structure
            notes_to_parse = midi.flat.notes
        for n in notes_to_parse:
            if isinstance(n, note.Note):
                # if note then just append to notes
                notes.append(str(n.pitch))
            elif isinstance(n, chord.Chord):
                # if chord use . to indicate it's a chord
                notes.append('.'.join(str(k) for k in n.normalOrder))
    if q == "Q1":
        q1_midi = notes
    elif q == "Q2":
        q2_midi = notes
    elif q == "Q3":
        q3_midi = notes
    elif q == "Q4":
        q4_midi = notes


In [5]:
overall_notes = q1_midi + q2_midi + q3_midi + q4_midi
# 500 notes is roughly 2.5 minutes?
pitch_names = sorted(set(k for k in overall_notes))
# Note to integer mapping
note_vocab = dict((note, number) for number, note in enumerate(pitch_names))
n_vocab = len(pitch_names)
print(n_vocab)

685


In [8]:
def create_train_data(seq_len):
    x_train = []
    t_train = []
    for q in ["Q1","Q2","Q3","Q4"]:
        x = []
        t = []
        notes = []
        if q == "Q1":
            notes = q1_midi
        elif q == "Q2":
            notes = q2_midi 
        elif q == "Q3":
            notes = q3_midi
        elif q == "Q4":
            notes = q4_midi
        for i in range(0, len(notes) - seq_len):
            input_seq = notes[i:i+seq_len]
            output = notes[i+seq_len]
            x.append([note_vocab[k] for k in input_seq])
            t.append(note_vocab[output])
        # reshape x so that it fits well with LSTM
        x = np.reshape(x, (len(x), seq_len, 1))
        x = x / float(n_vocab)
        t = to_categorical(t, num_classes=n_vocab)
        x_train.append(x)
        t_train.append(t)
    return x_train, t_train

In [12]:
# Testing print out shape and values
# print(len(x_train))
# print(n_vocab)

In [6]:
def create_LSTM_model(seq_len):
    model = Sequential([
        LSTM(512, input_shape=(seq_len, 1), return_sequences=True),
        Dropout(0.3),
        LSTM(512, return_sequences=True),
        Dropout(0.3),
        LSTM(512),
        Dense(256),
        Dropout(0.3),
        Dense(n_vocab),
        Activation('softmax')
    ])
    return model

In [14]:
tm1 = create_LSTM_model(100)
print(tm1.summary())
# tm1.compile(loss='categorical_crossentropy', optimizer='rmsprop')
# tm1.fit(x_train[0][:10], t_train[0][:10], epochs=10, batch_size=2, verbose=1)
# embedding_layer = tm1.layers[0]
# embeddings = embedding_layer.get_weights()[0]
# print(embeddings)

2023-12-03 22:05:03.868148: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:901] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2023-12-03 22:05:03.868284: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:901] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2023-12-03 22:05:03.868350: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:901] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, 100, 512)          1052672   
                                                                 
 dropout (Dropout)           (None, 100, 512)          0         
                                                                 
 lstm_1 (LSTM)               (None, 100, 512)          2099200   
                                                                 
 dropout_1 (Dropout)         (None, 100, 512)          0         
                                                                 
 lstm_2 (LSTM)               (None, 512)               2099200   
                                                                 
 dense (Dense)               (None, 256)               131328    
                                                                 
 dropout_2 (Dropout)         (None, 256)               0

In [None]:
# TRAINING BLOCK
seq_lens = [200, 100, 50]
batch_sizes = [128, 64]
for s in seq_lens:
    for bs in batch_sizes:
        
        print("Initializing Training process...")
        x_train, t_train = create_train_data(s)
        
        for i in range(len(x_train)):
            
            print("Creating our model...")
            model = create_LSTM_model(s)
            model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
            print("Finished compiling our model")

            # Create a checkpoint path
            checkpoint_path = f"Model/lstm{i+1}-{s}s-{bs}bs.h5"
            model_path = Path(checkpoint_path)
            model_exist = model_path.is_file()
            checkpoint = ModelCheckpoint(filepath=checkpoint_path, monitor='loss', verbose=1, save_best_only=True, save_weights_only=True, mode='min')

            if model_exist:
                # If checkpoint exist, continue training with pretrained weights
                print("Continue Training with pre-trained weights")
                model.load_weights(checkpoint_path)

            print(f"Start Training with {s=} {bs=} for Q{i+1}")
            history = model.fit(x_train[i], t_train[i], epochs=200, batch_size=bs, callbacks=[checkpoint])

            print("Finished Training")
            
            plot_name = f'Graph/lstm{i+1}-{s}s-{bs}bs.png'
            plt.plot(history.history['loss'])
            plt.title('Model Loss')
            plt.ylabel('Loss')
            plt.xlabel('Epoch')
            plt.legend(['Train'], loc='upper left')
            plt.savefig(plot_name)
            plt.show()

In [None]:
import inference as inf_mod
xt, yt = inf_mod.predict("midi_like", "ar_va", "dataset/sample_data/example_generative.mid")

In [112]:
seq_len = 100
batch_size = 128
x, t = create_train_data(seq_len)

In [109]:
def load_model(q, seq_len, batch_size):
    model = create_LSTM_model(seq_len)
    model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
    model.load_weights(f'Model/lstm{q}-{seq_len}s-{batch_size}bs.h5')
    return model

In [108]:
def generate_prediction(q, model, music_len):
    start = np.random.randint(0, len(x[q-1]))
    # print(start)
    pattern = x[q-1][start]
    pattern = pattern * float(n_vocab)
    # print(pattern * float(n_vocab))
    output = []
    # generate 500 notes
    for _ in range(music_len):
        input = np.reshape(pattern, (1, len(pattern), 1))
        input = input / float(n_vocab)
        # print(input[0])
        prediction = model.predict(input, verbose=0)
        index = np.argmax(prediction)
        
        result = pitch_names[index]
        output.append(result)
        print(index)
        pattern = np.append(pattern, [index])
        pattern = pattern[1:len(pattern)]
    return output

In [13]:
def generate_music(pred, filename, qs=[0,0,0,0]):
    if sum(qs) == 0:
        qs[q-1] = 1
    offset = 0
    output_notes = []
    for p in pred:
        # if a chord
        if ('.' in p) or p.isdigit():
            notes_in_chord = p.split('.')
            notes = []
            for n in notes_in_chord:
                new_note = note.Note(int(n))
                new_note.storedInstrument = instrument.Piano()
                notes.append(new_note)
            new_chord = chord.Chord(notes)
            new_chord.offset = offset
            output_notes.append(new_chord)
        # pattern is a note
        else:
            new_note = note.Note(p)
            new_note.offset = offset
            new_note.storedInstrument = instrument.Piano()
            output_notes.append(new_note)
        # increase offset each iteration
        offsets = [0.4, 0.3, 0.5, 0.5]
        probabilities = qs
        t = random.choices(range(len(offsets)), weights=probabilities)[0]
        inc = offsets[t]
        offset += inc
    music_stream = stream.Stream(output_notes)
    music_stream.write('midi', fp=f'Gen-Music/{filename}.mid')

In [19]:
def generate_combination_prediction(qs, seq_len, batch_size, n):
    if sum(qs) != 1.0 or len(qs) != 4:
        print("qs total should be == 1 and len(qs) == 4")
        return []
    model1 = load_model(1, seq_len, batch_size)
    model2 = load_model(2, seq_len, batch_size)
    model3 = load_model(3, seq_len, batch_size)
    model4 = load_model(4, seq_len, batch_size)
    q = qs.index(max(qs)) # use the maximum as start
    start = np.random.randint(0, len(x[q]))
    # print(start)
    pattern = x[q][start]
    pattern = pattern * float(n_vocab)
    # print(pattern * float(n_vocab))
    output = []
    # generate 500 notes
    for _ in range(n):
        input = np.reshape(pattern, (1, len(pattern), 1))
        input = input / float(n_vocab)
        # print(input[0])
        p1 = model1.predict(input, verbose=0)
        p2 = model2.predict(input, verbose=0)
        p3 = model3.predict(input, verbose=0)
        p4 = model4.predict(input, verbose=0)
        prediction = (qs[0]*p1) + (qs[1]*p2) + (qs[2]*p3) + (qs[3]*p4)
        index = np.argmax(prediction)
        
        result = pitch_names[index]
        output.append(result)
        print(index)
        pattern = np.append(pattern, [index])
        pattern = pattern[1:len(pattern)]
    return output

In [None]:
# Generate music from each model
seq_len = 100
batch_size = 128
for q in range(1, 5):
    model = load_model(q, seq_len, batch_size)
    pred = generate_prediction(q, model, 200)
    generate_music(pred, f"q{q}-{seq_len}s-{batch_size}bs")

In [None]:
qs = [0.2, 0.5, 0.2, 0.1] # Change this to preferable emotion
pred = generate_combination_prediction(qs, seq_len, batch_size, 200)
generate_music(pred, f"q{q}-{seq_len}s-{batch_size}bs", qs)

In [None]:
y, xy = inf_mod.predict("midi_like", "ar_va", f"Music/q5-{seq_len}s-{batch_size}bs.mid")

In [21]:
def create_music_from_emotion(emotion, n):
    # Generate music from emotion provided or user input with desired length
    qs = []
    emotion = emotion.lower()
    if emotion not in ["happy", "calm", "sad", "angry"]:
        print("There are only Happy, Calm, Sad, Angry currently")
        t = input("Input your own quadrant value (e.g q1[happy] q2[angry] q3[sad] q4[calm]) (sum should be 1):")
        qs = [float(x) for x in t.split()]
    if emotion == "happy":
        qs = [1.0, 0, 0, 0]
    elif emotion == "calm":
        qs = [0, 0, 0, 1.0]
    elif emotion == "sad":
        qs = [0, 0, 1.0, 0]
    elif emotion == "angry":
        qs = [0, 1.0, 0, 0]
    pred = generate_combination_prediction(qs, 100, 128, n)
    if pred == []:
        return
    generate_music(pred, f"{emotion}-generated-song", qs)
    print(f"File saved in Gen-Music/{emotion}-generated-song.mid")
    return

In [None]:
# Create a angry song with a length of 200 notes
create_music_from_emotion("angry", 200)

In [None]:
# If emotion doesn't exist input your own array of emotions 
create_music_from_emotion("scared", 200)

In [None]:
# Run all the code above to generate this prediction !!!
# IMPORTANT !!!

In [101]:
# Try to train on volume
# import pretty_midi
# def extract_features_from_midi(file_path):
#     midi_data = pretty_midi.PrettyMIDI(file_path)
    
#     # Extracting volume information
#     volumes = [note.velocity for instrument in midi_data.instruments for note in instrument.notes]
    
#     return volumes

# def volume_features_dataset():
#     vol_list = []
#     for q in ["Q1","Q2","Q3","Q4"]:
#         vol = []
#         for f in glob.glob(os.path.join(midi_path, q + "*")):
#             v = extract_features_from_midi(f)
#             vol.append(v)
#         vol_list.append(vol)
#     return vol_list

In [102]:
# volume_arrs = volume_features_dataset()
# for i in range(4):
#     volume_arr = np.concatenate(volume_arrs[i])
#     volume_arr = volume_arr / 127.0
#     x_vol_train = np.reshape(volume_arr, (len(volume_arr)//seq_len, seq_len, 1))
#     t_vol_train = volume_arr[seq_len:]
#     model = Sequential()
#     model.add(LSTM(64, input_shape=(seq_len, 1)))
#     model.add(Dense(1, activation='linear'))
#     model.compile(optimizer='adam', loss='mean_squared_error')
#     # Create a checkpoint path
#     cp = f"Model/vol-lstm{i+1}-{seq_len}s-{batch_size}bs.h5"
#     model_path = Path(checkpoint_path)
#     model_exist = model_path.is_file()
#     checkpoint = ModelCheckpoint(filepath=checkpoint_path, monitor='loss', verbose=1, save_best_only=True, save_weights_only=True, mode='min')
#     model.fit(x_vol_train, t_vol_train, epochs=10, batch_size=batch_size, callbacks=[checkpoint])
# Tried (no time)