In [2]:
import pypianoroll as ppr 
import numpy as np
import os
from random import random
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.layers import Dense, LSTM, Flatten, Input, Bidirectional, TimeDistributed, Activation, Concatenate, Embedding, MaxPooling1D, CategoryEncoding
from tensorflow.keras.utils import pad_sequences, timeseries_dataset_from_array, split_dataset
from tensorflow.keras.models import Model 
from tensorflow.keras.optimizers import Adam

import calendar
import time



In [45]:
def resample(empties_boolean, prune_percent=0.7):
    resampled_inclusion_beats = [ ]
    for isempty in empties_boolean:
        include = True
        if isempty :
            if random() < prune_percent:  #70% pruning of empty beats
                include = False
    
        resampled_inclusion_beats += [include]

    return resampled_inclusion_beats


def store_dataset_as_batches(dataset, dataset_name, timestamp):

    os.makedirs(f'lpd_5_batched/inputs_{timestamp}/', exist_ok=True)
    os.makedirs(f'lpd_5_batched/outputs_{timestamp}/', exist_ok=True)

    for batch_id, batch in enumerate(dataset):
        inputs, outputs = batch 
        try:
            np.save(f'lpd_5_batched/inputs_{timestamp}/{dataset_name}_{batch_id}.npy', inputs)
            np.save(f'lpd_5_batched/outputs_{timestamp}/{dataset_name}_{batch_id}.npy', outputs)
        except Exception as E:
            print(E)



def process_dataset_in_chunks(dir, input_sequence_len=2400, output_sequence_len=None, batch_size=64, topn=-1, resolution=24, prune_rest_note_percent=0.3, encoder_decoder=False):
    samples = os.listdir(dir)[:topn]
    chunk_size = len(samples)//100 #100th of total sample count

    gmt = time.gmtime()
    ts = calendar.timegm(gmt)
    for chunkid in tqdm(range(0, len(samples),chunk_size)):

        for trackid in range(chunk_size):
            track = ppr.load(os.path.join(dir, samples[chunkid+trackid])).binarize().set_resolution(resolution).stack()

            # Move axis of tracks
            track = np.moveaxis(track, (0, 1, 2), (1, 0, 2)) #(5, 128)
    
            # Concatenate extra dimension at 0 for empty
            track = np.concatenate([np.zeros(track.shape[:-1] + (1,)), track], axis=-1)
            track[np.any(track, axis=-1)==False, 0] = 1 

            # Argmax results, one pitch at a time step for an instrument, ignores chords
            track = track.argmax(axis=-1)

            #print("Shape before resampling : ", d.shape)
            
            # Resample from empty beats
            empty_beats = (np.sum(track, axis=1) == 0)
            inclusion_beats = resample(empty_beats, prune_percent=prune_rest_note_percent)
            
            track= track[inclusion_beats]

            #print("Shape after resampling : ", d.shape)
            
            try:
                if encoder_decoder:
                    input_track = track[:-output_sequence_len]
                    output_track = track[input_sequence_len:]
                else:
                    input_track = track[:-1]
                    output_track = track[1:]
                

                input_dataset = timeseries_dataset_from_array(input_track, None, sequence_length=input_sequence_len, sequence_stride=1, batch_size=batch_size)
                output_dataset = timeseries_dataset_from_array(output_track, None, sequence_length=input_sequence_len, sequence_stride=1, batch_size=batch_size)
                dataset_len = len(input_dataset)
               
                dataset = zip(input_dataset, output_dataset)
                #else:
                #    dataset = timeseries_dataset_from_array(track, track[input_sequence_len:], sequence_length=input_sequence_len, sequence_stride=1, batch_size=batch_size)
                #    dataset_len = len(dataset)

                if not(dataset_len):
                    continue           
            except Exception as E:
                continue

            store_dataset_as_batches(dataset, dataset_name=f'Pr{os.path.basename(dir)}-Ch{chunkid}-Tr{trackid}', timestamp=ts)
    
    return ts

                        


format_targets = lambda y: tuple(tf.unstack(tf.experimental.numpy.moveaxis(y, (0, 1, 2), (1, 2, 0))))

def load_music_batches(input_dir, output_dir, encoder_decoder=True):

    while 1:
        for inp_batch, output_batch in zip(os.listdir(input_dir), os.listdir(output_dir)):
            
            try:
                inputs, targets = np.load(os.path.join(input_dir, inp_batch)), np.load(os.path.join(output_dir, output_batch))
                
                if encoder_decoder:
                    prompt_inputs = np.concatenate([inputs[:, -2:-1], targets[:, :-1]], axis=1)#none, 2400, 5
                    yield [inputs, prompt_inputs], format_targets(targets)
                else:
                    yield inputs, format_targets(targets)
            except Exception as E:
                continue


import subprocess

def midi_to_wav(midi_path, output_wav_path):
    try:
        # Run Timidity++ command to convert MIDI to WAV
        subprocess.run(["timidity", midi_path, "-Ow", "-o", output_wav_path], check=True)
        print("Conversion completed successfully.")
    except subprocess.CalledProcessError as e:
        print(f"Error: {e}")
        print("Conversion failed.")


def multitrack_to_midi(multitrack, output_path):
    # Check and convert tracks if necessary
    for i, track in enumerate(multitrack.tracks):
        if not isinstance(track, (ppr.BinaryTrack, ppr.StandardTrack)):
            print(f"Converting track {i} to StandardTrack...")
            multitrack.tracks[i] = track.to_pianoroll().to_track()

    # Write the multitrack to a MIDI file
    multitrack.write(output_path)

from random import random, randint
def compose_music(music_model, original_source, topn=5, print_gen=False, encoder_decoder=False):  #cue-shape : (cue_len, 5)
    
    until = randint(0, 100)
    for _ in range(until):
        if encoder_decoder:
            x = next(original_source)[0][0]  
        else:
            x = next(original_source)[0]
    
    cue = np.expand_dims(x[randint(0, x.shape[0]-1)], axis=0)
    
    
  
    composition = [cue[:, -1]]      #List[(1, 5,)]
    gen = 1
    while True:
        if print_gen:
            print("Generation : ", gen)
        gen += 1
        pcomp = np.expand_dims( np.concatenate(composition),axis=0)

        if encoder_decoder:
            pred = np.concatenate(music_model( [cue, pcomp] ))  #(5, 1, 129)
        else:
            pred = np.concatenate(music_model(pcomp))  #(5, 1, 129)


        preds = []
        for instrument in range(5):
            probs = pred[instrument, -1]
            exclude_pred = np.argsort(probs)[:-topn]
            probs[exclude_pred] = 0.
            probs = probs/np.sum(probs)
            preds += [np.random.choice(129, (1,), p=probs)]

        preds = np.array(preds)
        currcomp = preds.T #(5, 129)
        composition += [currcomp]
    
        yield np.concatenate(composition)
    

def get_avg_tempo(dir='lpd_5/lpd_5_full/0', topn=1000):
    samples = os.listdir(dir)[:topn]
    tempo = 0.
    count = 0
    for sample in samples:
        with np.load(os.path.join(dir, sample)) as data:
            tempo += np.sum(data['tempo'])
            count += data['tempo'].shape[0]
    return tempo/count

def make_track(composition, tempo=120):


    tracks = []
    tempo = np.full(composition.shape[0], tempo)  #get the tempo

    track_data = {0 : ['Drums', 0], 1: ['Piano', 0], 2: ['Guitar', 24], 3:['Bass', 32], 4:['Strings', 48]} #{"is_drum": false, "program": 0, "name": "Piano"}, "0": {"is_drum": true, "program": 0, "name": "Drums"}, "3": {"is_drum": false, "program": 32, "name": "Bass"}, "2": {"is_drum": false, "program": 24, "name": "Guitar"}, "4": {"is_drum": false, "program": 48, "name": "Strings"}, "beat_resolution": 24}'

    track_names = [t[0] for t in track_data.values()]

    # Create a Track object for each track in the multitrack representation
    for i, track_name in enumerate(track_names):

        piano_roll = CategoryEncoding(129, output_mode='one_hot')(composition[:, i]).numpy()[:, 1:]
        
        # Create a Track object without providing the piano_roll argument
        track = ppr.BinaryTrack(name=track_name)
        
        # Assign piano roll data to the Track object
        track.pianoroll = piano_roll  # Assuming piano_roll is a single-track piano roll
        track.program = track_data[i][1]  # Specify the program number if necessary
        
        if track_name == 'Drums':
            track.is_drum = True
        # Append the Track object to the list
        tracks.append(track)

    # Create a Multitrack object and assign the tracks to it
    multitrack = ppr.Multitrack(tracks=tracks, tempo=tempo, resolution=8)

    return multitrack



In [22]:


pitches = 129 #including rest note
instruments = 5

def recurrent_encoder_decoder(pitches=129, instruments=5):
    Xinp = Input((None, instruments))
    Xpromptinp = Input((None, instruments))
    X = Bidirectional(LSTM(100, return_sequences=True))(Xinp)
    _, *internal_state = LSTM(100, return_state=True)(X)
    Y = LSTM(100, return_sequences=True)(Xpromptinp, initial_state=internal_state)
    Y = TimeDistributed(Dense(50, 'relu'))(Y)
    Out = []
    for instrument in range(instruments):
        Out += [TimeDistributed(Dense(pitches, 'softmax'), name=f'instrument_{instrument+1}')(Y)]
    
    In = [Xinp, Xpromptinp]
    model = Model(In, Out)
    model.compile(Adam(1e-3), loss=['sparse_categorical_crossentropy']*instruments, metrics=['accuracy'])
    return model

def recurrent(pitches=129, instruments=5):
    Xinp = Input((None, instruments))
    X = LSTM(200, return_sequences=True)(Xinp)
    X = LSTM(100, return_sequences=True)(X)
    X = TimeDistributed(Dense(50, 'relu'))(X)
    Out = []
    for instrument in range(instruments):
        Out += [TimeDistributed(Dense(pitches, 'softmax'), name=f'instrument_{instrument+1}')(X)]
    
    In = Xinp 
    model = Model(In, Out)
    model.compile(Adam(1e-3), loss=['sparse_categorical_crossentropy']*instruments, metrics=['accuracy'])
    return model



model = recurrent()
model.summary()






Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_6 (InputLayer)        [(None, None, 5)]            0         []                            
                                                                                                  
 lstm_8 (LSTM)               (None, None, 200)            164800    ['input_6[0][0]']             
                                                                                                  
 lstm_9 (LSTM)               (None, None, 100)            120400    ['lstm_8[0][0]']              
                                                                                                  
 time_distributed_3 (TimeDi  (None, None, 50)             5050      ['lstm_9[0][0]']              
 stributed)                                                                                 

In [36]:
#output_sequence_len = 0 if not encoder_decoder else >=1
timestamp = process_dataset_in_chunks('./lpd_5/lpd_5_full/0', input_sequence_len=8*100, batch_size=128, topn=600, resolution=8, prune_rest_note_percent=0.5, encoder_decoder=False)

  2%|▏         | 2/100 [00:02<02:08,  1.31s/it]


KeyboardInterrupt: 

In [38]:
timestamp

1712735727

In [10]:
#timestamp = 1712393018

In [41]:
music_loader = load_music_batches(f'lpd_5_batched/inputs_{timestamp}', f'lpd_5_batched/outputs_{timestamp}', encoder_decoder=False)
steps = len(os.listdir(f'lpd_5_batched/inputs_{timestamp}'))-1

In [42]:
history = model.fit(music_loader, steps_per_epoch=steps, epochs=1)

 366/9289 [>.............................] - ETA: 9:33:34 - loss: 11.4563 - instrument_1_loss: 1.2591 - instrument_2_loss: 2.4645 - instrument_3_loss: 2.1594 - instrument_4_loss: 2.4989 - instrument_5_loss: 3.0744 - instrument_1_accuracy: 0.7762 - instrument_2_accuracy: 0.4894 - instrument_3_accuracy: 0.5335 - instrument_4_accuracy: 0.4286 - instrument_5_accuracy: 0.3069

KeyboardInterrupt: 

In [283]:
model.save('encoder-decoder-bilstm-500topn-ep1')

INFO:tensorflow:Assets written to: encoder-decoder-bilstm-500topn-ep1/assets


INFO:tensorflow:Assets written to: encoder-decoder-bilstm-500topn-ep1/assets


In [6]:
model = tf.keras.saving.load_model('encoder-decoder-bilstm-500topn-ep1')



In [12]:
one_batch = next(music_loader)

In [59]:
composer = compose_music(music_model=model, original_source=music_loader, topn=6)


In [61]:
composition_length = 50*8
for _ in range(composition_length):
    composition = next(composer)

generated_track = make_track(composition, tempo=60)


output_midi_path = f'generated_track_{timestamp}.mid'
output_audio_path = f'generated_track_{timestamp}.wav'

multitrack_to_midi(generated_track, output_midi_path)
midi_to_wav(output_midi_path, output_audio_path)


original_track = ppr.load('lpd_5/lpd_5_full/0/0a0a2b0e4d3b7bf4c5383ba025c4683e.npz').binarize().set_resolution(8)

output_midi_path = 'original_track.mid'
output_audio_path = 'original_track.wav'


multitrack_to_midi(original_track, output_midi_path)
midi_to_wav(output_midi_path, output_audio_path)

Playing generated_track_1712735727.mid
MIDI file: generated_track_1712735727.mid
Format: 1  Tracks: 6  Divisions: 220
Track name: Drums
Track name: Piano
Track name: Guitar
Track name: Bass
Track name: Strings
Playing time: ~55 seconds
Notes cut: 0
Notes lost totally: 0
Conversion completed successfully.
Playing original_track.mid
MIDI file: original_track.mid
Format: 1  Tracks: 6  Divisions: 220
Track name: Drums
Track name: Piano
Track name: Guitar
Track name: Bass
Track name: Strings
Playing time: ~176 seconds
Notes cut: 0
Notes lost totally: 0
Conversion completed successfully.


In [249]:
composition

array([[ 0, 62,  0,  0,  0],
       [ 0, 57,  0,  0,  0],
       [ 0, 52,  0,  0,  0],
       ...,
       [ 0,  0, 58, 46,  0],
       [ 0,  0, 60, 46,  0],
       [ 0, 48, 58, 34,  0]])

In [230]:
ind = 60
pred_batch = model.predict([x[ind:ind+1] for x in one_batch[0]])
pred_composition = [pred_batch[inst][0].argmax(-1) for inst in range(instruments)]
true_composition = [one_batch[1][inst][ind].numpy() for inst in range(instruments)]
pred_composition



[array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 

In [231]:
true_composition

[array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 