In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Reshape, Dropout, LSTM, Bidirectional, Flatten, Concatenate, BatchNormalization, LeakyReLU, Concatenate
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from keras import utils  
from music21 import converter, instrument, note, chord, stream
from pathlib import Path
import matplotlib.pyplot as plt
import librosa
import random

2024-12-19 15:44:32.191604: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
def get_notes(path_dataset):
    """ Get all the notes and chords from the midi files """
    notes = []

    for file in Path(path_dataset).glob("*.mid"):
        midi = converter.parse(file)

        print("Parsing %s" % file)

        notes_to_parse = midi.flat.notes

        for element in notes_to_parse:
            if isinstance(element, note.Note):
                notes.append(str(element.pitch))
            elif isinstance(element, chord.Chord):
                notes.append('.'.join(str(n) for n in element.normalOrder))

    return notes


In [3]:
def prepare_sequences(notes, n_vocab, sequence_length=10):
    """ Prepare the sequences used by the Neural Network """
    # Create a dictionary to map pitches to integers
    pitch_names = sorted(set(notes))
    
    note_to_int = dict((note, number) for number, note in enumerate(pitch_names))
    # Create input sequences and corresponding outputs
    network_input = []
    network_output = []
    
    # print(notes)
    # Create input sequences and the one-hot encoded output sequence
    for i in range(0, len(notes) - sequence_length, 1):
        sequence_in = notes[i:i + sequence_length]
        sequence_out = notes[i + sequence_length]
        
        # Convert input sequence to integer representation
        network_input.append([note_to_int[char] for char in sequence_in])
        
        # Convert output note to integer
        network_output.append(note_to_int[sequence_out])
    
    # Reshape input to be compatible with LSTM layers
    n_patterns = len(network_input)
    
    # Normalize input
    network_input = np.reshape(network_input, (n_patterns, sequence_length, 1))
    network_input = network_input / float(n_vocab)
    
    # One-hot encode the output
    network_output = utils.to_categorical(network_output)
    
    return network_input, network_output


In [4]:
# def create_midi(prediction_output, filename):
#     """ convert the output from the prediction to notes and create a midi file
#         from the notes """
#     offset = 0
#     output_notes = []

#     # create note and chord objects based on the values generated by the model
#     for item in prediction_output:
#         pattern = item[0]
#         # pattern is a chord
#         if ('.' in pattern) or pattern.isdigit():
#             notes_in_chord = pattern.split('.')
#             notes = []
#             for current_note in notes_in_chord:
#                 new_note = note.Note(int(current_note))
#                 new_note.storedInstrument = instrument.Piano()
#                 notes.append(new_note)
#             new_chord = chord.Chord(notes)
#             new_chord.offset = offset
#             output_notes.append(new_chord)
#         # pattern is a note
#         else:
#             new_note = note.Note(pattern)
#             new_note.offset = offset
#             new_note.storedInstrument = instrument.Piano()
#             output_notes.append(new_note)

#         # increase offset each iteration so that notes do not stack
#         offset += 0.5

#     midi_stream = stream.Stream(output_notes)
#     midi_stream.write('midi', fp='{}.mid'.format(filename))

In [9]:
class ConditionalMusicGAN:
    def __init__(self, rows, acapella_feature_dim=128):
        self.seq_length = rows
        self.seq_shape = (self.seq_length, 1)
        self.latent_dim = 1000
        self.acapella_feature_dim = acapella_feature_dim
        
        self.disc_loss = []
        self.gen_loss = []
        
        # Optimizers with different learning rates
        d_lr, d_beta_1 = 2.5932849036781864e-05, 0.467
        g_lr, g_beta_1 = 3.2136735964331895e-05, 0.390
        
        self.d_optimizer = Adam(d_lr, d_beta_1)
        self.g_optimizer = Adam(g_lr, g_beta_1)

        # Build condition embedding
        self.condition_input, self.condition_embedding = self.build_condition_embedding()

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(
            loss='binary_crossentropy', 
            optimizer=self.d_optimizer, 
            metrics=['accuracy']
        )

        # Build the generator
        self.generator = self.build_generator()

        # Combined model for training generator
        z = Input(shape=(self.latent_dim,))
        condition_z = Input(shape=(self.acapella_feature_dim,))
        
        self.discriminator.trainable = False
        generated_seq = self.generator([z, condition_z])
        validity = self.discriminator([generated_seq, condition_z])

        self.combined = Model([z, condition_z], validity)
        self.combined.compile(
            loss='binary_crossentropy', 
            optimizer=self.g_optimizer
        )

    def build_condition_embedding(self):
        condition_input = Input(shape=(self.acapella_feature_dim,))
        condition_embedding = Dense(128, activation='relu')(condition_input)
        return condition_input, condition_embedding

    def build_discriminator(self):
        seq_input = Input(shape=self.seq_shape)
        condition_input = Input(shape=(self.acapella_feature_dim,))
        condition_embedding = Dense(128, activation='relu')(condition_input)
        
        x = Concatenate()([Flatten()(seq_input), condition_embedding])
        x = Dense(512)(x)
        x = LeakyReLU(alpha=0.2)(x)
        x = Dense(256)(x)
        x = LeakyReLU(alpha=0.2)(x)
        x = Dense(100)(x)
        x = LeakyReLU(alpha=0.2)(x)
        x = Dropout(0.5)(x)
        validity = Dense(1, activation='sigmoid')(x)
        
        return Model([seq_input, condition_input], validity)

    def build_generator(self):
        noise_input = Input(shape=(self.latent_dim,))
        condition_input = Input(shape=(self.acapella_feature_dim,))
        x = Concatenate()([noise_input, condition_input])
        x = Dense(256)(x)
        x = LeakyReLU(alpha=0.2)(x)
        x = BatchNormalization(momentum=0.8)(x)
        x = Dense(512)(x)
        x = LeakyReLU(alpha=0.2)(x)
        x = BatchNormalization(momentum=0.8)(x)
        x = Dense(1024)(x)
        x = LeakyReLU(alpha=0.2)(x)
        x = BatchNormalization(momentum=0.8)(x)
        x = Dense(np.prod(self.seq_shape), activation='tanh')(x)
        seq = Reshape(self.seq_shape)(x)
        
        return Model([noise_input, condition_input], seq)

    def extract_acapella_features(self, audio_path):
        # Load the audio file
        y, sr = librosa.load(audio_path)
        
        # Extract 128 MFCCs (instead of 13)
        mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=128)
        
        # Extract additional spectral features (can also increase dimensionality here if needed)
        spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)
        
        # Flatten the features to a 1D vector and ensure consistent dimensionality
        mfcc_means = np.mean(mfccs, axis=1)  # Average MFCC coefficients across time
        spectral_centroid_mean = np.mean(spectral_centroids)
        
        # Concatenate features to make a single vector
        features = np.concatenate([mfcc_means, [spectral_centroid_mean]])
        
        # Normalize the features (optional)
        features = (features - np.mean(features)) / np.std(features)
        
        # Ensure the features have exactly 128 elements (you can add more features or pad if necessary)
        while len(features) < 128:
            features = np.concatenate([features, np.zeros(128 - len(features))])
        
        # If the features exceed 128, you can trim them
        features = features[:128]
        
        return features

    def train(self, epochs, acapella_directory, batch_size=128, sample_interval=50):
        # Load acapella features
        acapella_paths = list(Path(acapella_directory).glob('*.wav'))
        
        if not acapella_paths:
            raise ValueError("No acapella files found in the specified directory")
    
        # Load music data (you'll need to implement these functions)
        notes = get_notes("dataset/instrumental")
        n_vocab = len(set(notes))
        X_train, y_train = prepare_sequences(notes, n_vocab)

        real = np.ones((batch_size, 1))
        fake = np.zeros((batch_size, 1))

        for epoch in range(epochs):
            current_acapella_path = np.random.choice(acapella_paths)
            acapella_features = self.extract_acapella_features(current_acapella_path)
            
            # Expand features to batch size
            acapella_features_batch = np.tile(acapella_features, (batch_size, 1))
        
            # Train Discriminator
            idx = np.random.randint(0, X_train.shape[0], batch_size)
            real_seqs = X_train[idx]

            noise = np.random.normal(0, 1, (batch_size, self.latent_dim))
            gen_seqs = self.generator.predict([noise, acapella_features_batch])

            # Discriminator training
            d_loss_real = self.discriminator.train_on_batch(
                [real_seqs, acapella_features_batch], real
            )
            d_loss_fake = self.discriminator.train_on_batch(
                [gen_seqs, acapella_features_batch], fake
            )
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            # Train Generator
            noise = np.random.normal(0, 1, (batch_size, self.latent_dim))
            g_loss = self.combined.train_on_batch(
                [noise, acapella_features_batch], real
            )

            # Logging
            if epoch % sample_interval == 0:
                print(f"{epoch} [D loss: {d_loss[0]}, acc.: {100*d_loss[1]}%] [G loss: {g_loss}]")
                self.disc_loss.append(d_loss[0])
                self.gen_loss.append(g_loss)

        # Save the generator model to an .h5 file after training
        self.generator.save('conditional_music_gan_generator.h5')
        print("Generator model saved as 'conditional_music_gan_generator.h5'")

        # Optionally save the discriminator as well
        self.discriminator.save('conditional_music_gan_discriminator.h5')
        print("Discriminator model saved as 'conditional_music_gan_discriminator.h5'")

        # Return the saved models if needed
        return self.generator, self.discriminator

    def generate(self, input_notes, acapella_features, filename='generated_music'):
        pitchnames = sorted(set(input_notes))
        int_to_note = dict((number, note) for number, note in enumerate(pitchnames))
        
        noise = np.random.normal(0, 1, (1, self.latent_dim))
        condition_input = np.tile(acapella_features, (1, 1))
        
        # Generate sequence using the trained generator
        predictions = self.generator.predict([noise, condition_input])
        
        # Convert the generated sequence into music notes
        pred_notes = [x*242+242 for x in predictions[0]]
        
        pred_notes_mapped = []
        for x in pred_notes:
            index = int(x)
            if index in int_to_note:
                pred_notes_mapped.append(int_to_note[index])
            else:
                pred_notes_mapped.append('C5')

        # Create the MIDI file
        self.create_midi(pred_notes_mapped, filename)

    def create_midi(self, prediction_output, filename):
        offset = 0
        output_notes = []

        # Define a list of instrument classes to choose from
        instrument_classes = [
            instrument.AcousticBass, instrument.AcousticGuitar,instrument.Alto, instrument.Baritone, instrument.Bass,
            instrument.BassDrum, instrument.BassTrombone, instrument.BrassInstrument,
            instrument.Choir, instrument.Clarinet, instrument.Contrabass,
            instrument.ElectricBass, instrument.ElectricGuitar, instrument.ElectricOrgan, instrument.ElectricPiano,
            instrument.Flute, instrument.FretlessBass, instrument.Guitar
        ]

        # Track the last instrument used to avoid redundant instrument changes
        last_instrument = None

        # Create note and chord objects based on the values generated by the model
        for pattern in prediction_output:
            # Choose an instrument class for this note or chord
            instr_class = random.choice(instrument_classes)
            instr = instr_class()

            # Add the instrument change only if it is different from the last instrument
            if type(last_instrument) != type(instr):
                output_notes.append(instr)
                last_instrument = instr

            # Pattern is a chord
            if ('.' in pattern) or pattern.isdigit():
                notes_in_chord = pattern.split('.')
                notes = []
                for current_note in notes_in_chord:
                    new_note = note.Note(int(current_note))
                    notes.append(new_note)
                new_chord = chord.Chord(notes)
                new_chord.offset = offset
                output_notes.append(new_chord)
            # Pattern is a note
            else:
                new_note = note.Note(pattern)
                new_note.offset = offset
                output_notes.append(new_note)

            # Increase offset each iteration so that notes do not stack
            offset += 0.5

        midi_stream = stream.Stream(output_notes)
        midi_stream.write('midi', fp='{}.mid'.format(filename))

    def plot_loss(self):
        plt.figure(figsize=(10, 5))
        plt.plot(self.disc_loss, c='red', label='Discriminator Loss')
        plt.plot(self.gen_loss, c='blue', label='Generator Loss')
        plt.title("Conditional GAN Loss per Epoch")
        plt.legend()
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.savefig('Conditional_GAN_Loss.png', transparent=True)
        plt.close()

In [10]:
# Main execution
SEQUENCE_LENGTH = 10
EPOCHS = 50
BATCH_SIZE = 16
SAMPLE_INTERVAL = 1

# Run the Conditional GAN
gan = ConditionalMusicGAN(rows=SEQUENCE_LENGTH)
gan.train(
    epochs=EPOCHS, 
    acapella_directory='dataset/vocals',
    batch_size=BATCH_SIZE, 
    sample_interval=SAMPLE_INTERVAL
)

ValueError: zero-size array to reduction operation maximum which has no identity

In [None]:
# Generate music after training
# Example acapella path and features
acapella_path = 'test.wav'  # Replace with an actual path
acapella_features = gan.extract_acapella_features(acapella_path)

# Generate music using the trained generator
input_notes = get_notes_from_midi('AnyConv.com__test.mid')
gan.generate(input_notes, acapella_features,)