## Import and Pip Install

In [None]:
!pip3 install music21
!pip3 install torch torchvision torchaudio
!pip3 install numPy
!pip3 install pretty_midi
!pip3 install seaborn
!pip3 install matplotlib
!pip3 install pydub
!pip3 install midi2audio
!pip3 install requests
!pip3 install tensorflow

In [None]:
import music21
import torch
import numpy as np
import pandas as pd
import pretty_midi
import seaborn as sns
import matplotlib.pyplot as plt
import pathlib
import glob
import collections
import tensorflow as tf
from scipy.io.wavfile import write
from IPython.display import Audio

In [None]:
import requests
import zipfile

url = 'https://keymusician01.s3.amazonaws.com/FluidR3_GM.zip'
response = requests.get(url)

# Ensure the request was successful
if response.status_code == 200:
    with open("FluidR3_GM.zip", 'wb') as f:
        f.write(response.content)
else:
    print("Failed to retrieve the file")

with zipfile.ZipFile("FluidR3_GM.zip", 'r') as zip_ref:
    zip_ref.extractall("FluidR3_GM")


In [None]:
from midi2audio import FluidSynth

# Path to your SoundFont file
soundfont = 'FluidR3_GM/FluidR3_GM.sf2'

# Initialize FluidSynth with the specified SoundFont
fs = FluidSynth(soundfont)

## Download the Maestro dataset

In [None]:
data_dir = pathlib.Path('data/maestro-v2.0.0')
if not data_dir.exists():
  tf.keras.utils.get_file(
      'maestro-v2.0.0-midi.zip',
      origin='https://storage.googleapis.com/magentadata/datasets/maestro/v2.0.0/maestro-v2.0.0-midi.zip',
      extract=True,
      cache_dir='.', cache_subdir='data',
  )

The dataset contains about 1,200 MIDI files.

In [None]:
filenames = glob.glob(str(data_dir/'**/*.mid*'))
print('Number of files:', len(filenames))
print(filenames)

In [None]:
sample_file = filenames[2]
print(sample_file)
def play_midi(midi_filename, save_name):
    """
    Converts a MIDI file to an audio file and plays it.

    Args:
    midi_filename (str): The path of the MIDI file to be played.

    Returns:
    IPython.display.Audio: An audio widget for playing the converted audio in Jupyter notebooks.
    """ 
    # Convert the MIDI file to a WAV file. The output is saved as 'test.wav'.
    fs.midi_to_audio(midi_filename, save_name)

    # Return an audio widget that can play the generated WAV file.
    return Audio(save_name)
play_midi(sample_file, 'test.wav')

In [None]:
def extract_midi_data(midi_file):
    """
    Extracts note data along with instrument information from all instruments in a MIDI file.

    Args:
    midi_file (str): The path to the MIDI file.

    Returns:
    pd.DataFrame: A DataFrame containing the extracted note data and instrument information.
    """
    # Load the MIDI file
    pm = pretty_midi.PrettyMIDI(midi_file)

    # Initialize a dictionary to hold note data for all instruments
    all_notes = collections.defaultdict(list)

    # Process each instrument in the MIDI file
    for instrument in pm.instruments:
        # Extract instrument information
        program = instrument.program
        is_drum = instrument.is_drum
        name = pretty_midi.program_to_instrument_name(program)

        # Sort the notes by their start time for sequential processing
        sorted_notes = sorted(instrument.notes, key=lambda note: note.start)

        # Initialize variable to keep track of the previous note's start time
        prev_start = sorted_notes[0].start if sorted_notes else 0

        # Iterate over sorted notes to extract data
        for note in sorted_notes:
            start = note.start
            end = note.end
            all_notes['pitch'].append(note.pitch)          # Pitch of the note
            all_notes['start'].append(start)               # Start time of the note
            all_notes['end'].append(end)                   # End time of the note
            all_notes['step'].append(start - prev_start)   # Time since the start of the previous note
            all_notes['duration'].append(end - start)      # Duration of the note
            all_notes['velocity'].append(note.velocity)    # Velocity of the note

            # Add instrument information to each note
            all_notes['program'].append(program)           # Program number of the instrument
            all_notes['is_drum'].append(is_drum)           # Drum status of the instrument
            all_notes['instrument_name'].append(name)      # Name of the instrument

            prev_start = start

    # Convert the all_notes dictionary to a DataFrame and return
    return pd.DataFrame({name: np.array(value) for name, value in all_notes.items()})

# Example usage
midi_info = extract_midi_data(sample_file)
print(midi_info)



In [None]:
reversed_df = midi_info.iloc[::-1].reset_index(drop=True)
print(reversed_df)

In [None]:
def count_instrument(filenames):
    """
    Counts the number of MIDI files that contain more than one instrument.

    Args:
    filenames (list of str): A list of paths to MIDI files.

    Returns:
    int: The number of MIDI files with more than one instrument.
    """
    total = 0  # Correct initialization of the total count
    for midi_file in filenames:
        # Check if the number of instruments in the file is more than one
        pm = pretty_midi.PrettyMIDI(midi_file)
        if len(pm.instruments) > 1:
            total += 1
    return total
print(count_instrument(filenames[0:10]))

In [None]:
def plot_single_note(note, include_velocity=True):
    """
    Plot a single MIDI note.
    
    Args:
    note (pretty_midi.Note): The note to plot.
    include_velocity (bool): Whether to include velocity in color intensity.
    """
    x = note.start
    y = note.pitch
    width = note.end - note.start
    height = 1
    if include_velocity:
        color = (note.velocity / 127, 0, 1 - note.velocity / 127)
    else:
        color = 'blue'
    plt.gca().add_patch(plt.Rectangle((x, y), width, height, edgecolor='none', facecolor=color))

def plot_midi(midi_file, include_drum=False, figure_size=(12, 6), color_map=None):
    """
    Plots a MIDI file using pretty_midi.
    
    Args:
    midi_file (str): The path to the MIDI file.
    include_drum (bool): Whether to include drum instruments in the plot.
    figure_size (tuple): The size of the figure.
    color_map (function): A function to determine the color of the notes based on their properties.
    """
    try:
        midi_data = pretty_midi.PrettyMIDI(midi_file)
    except IOError:
        print(f"Error loading {midi_file}. Please check the file path.")
        return
    except ValueError:
        print(f"Invalid MIDI file: {midi_file}.")
        return

    plt.figure(figsize=figure_size)

    for instrument in midi_data.instruments:
        if instrument.is_drum and not include_drum:
            continue

        for note in instrument.notes:
            if color_map:
                plot_single_note(note, include_velocity=False)
                plt.gca().patches[-1].set_facecolor(color_map(note))
            else:
                plot_single_note(note)

    plt.xlabel('Time (s)')
    plt.ylabel('Pitch')
    plt.title('MIDI Visualization')
    plt.grid(True)
    plt.xlim(0, midi_data.get_end_time())
    plt.ylim(0, 128)  # MIDI pitches range from 0 to 127
    plt.show()

# Example usage
plot_midi(sample_file)


In [None]:
def create_midi_from_dataframe(midi_info, output_file):
    """
    Creates a MIDI file from a DataFrame containing MIDI data for multiple instruments.

    Args:
    midi_info (pd.DataFrame): The DataFrame containing MIDI data.
    output_file (str): The path where the new MIDI file will be saved.
    """

    # Create a new PrettyMIDI object
    new_midi = pretty_midi.PrettyMIDI()

    # Group the DataFrame by instrument attributes
    grouped = midi_info.groupby(['program', 'is_drum', 'instrument_name'])

    # Iterate over each group (instrument) and create a track for each
    for (program, is_drum, instrument_name), group in grouped:
        # Create a new instrument
        new_instrument = pretty_midi.Instrument(program=program, is_drum=is_drum, name=instrument_name)

        # Iterate over the rows in the group to create notes
        for index, row in group.iterrows():
            # Create a new Note object from the row data
            note = pretty_midi.Note(
                velocity=int(row['velocity']),
                pitch=int(row['pitch']),
                start=row['start'],
                end=row['end']
            )
            # Add the note to the instrument
            new_instrument.notes.append(note)

        # Add the instrument to the PrettyMIDI object
        new_midi.instruments.append(new_instrument)

    # Write to a MIDI file
    new_midi.write(output_file)

# Example usage
create_midi_from_dataframe(midi_info, 'test_new.midi')
# Assuming play_midi is a function you have defined to play MIDI files
play_midi('test_new.midi')


## Training dataset

In [None]:
head = filenames[0]
tail = filenames[1]

head_midi = extract_midi_data(head)
tail_midi = extract_midi_data(tail).iloc[::-1].reset_index(drop=True)  # reverse

# Ensure both DataFrames have the same length
min_length = min(len(head_midi), len(tail_midi))

head_midi = head_midi.iloc[:min_length].reset_index(drop=True)
tail_midi = tail_midi.iloc[:min_length].reset_index(drop=True)
key_order = ['pitch', 'step', 'duration', 'velocity']

print(head_midi.shape)
print(tail_midi.shape)

In [None]:
def one_instrument(midi_info):
    train_notes = np.stack([midi_info[key] for key in key_order], axis=1)
    notes_ds = tf.data.Dataset.from_tensor_slices(train_notes)
    return notes_ds
head_midi_single = one_instrument(head_midi)
tail_midi_single = one_instrument(tail_midi)

print(head_midi_single.element_spec)
print(tail_midi_single.element_spec)

for element in tail_midi_single:
    for ele in element:
        print(ele)

In [None]:
def create_sequences(dataset: tf.data.Dataset, seq_length: int, vocab_size=128) -> tf.data.Dataset:
    """
    Converts a dataset of individual notes into a dataset of note sequences for model training.

    Args:
    dataset (tf.data.Dataset): The original dataset, where each element is a single note.
    seq_length (int): The length of the sequences to be created.
    vocab_size (int): The size of the vocabulary, used for normalization. Default is 128.

    Returns:
    tf.data.Dataset: A TensorFlow Dataset containing sequences of notes, with each sequence paired with a label.
    """

    # Extend the sequence length to include labels
    seq_length = seq_length + 1

    # Create overlapping windows of the specified sequence length
    windows = dataset.window(seq_length, shift=1, stride=1, drop_remainder=True)

    # Flatten the windows into sequences
    flatten = lambda x: x.batch(seq_length, drop_remainder=True)
    sequences = windows.flat_map(flatten)
    
    # Function to normalize the pitch of the notes
    def scale_pitch(x):
        # Normalize only the pitch values, assuming they are in the first position
        x = x / [vocab_size, 1.0, 1.0, vocab_size]
        return x

    # Function to split each sequence into inputs and labels
    def split_labels(sequences):
        # The inputs are all but the last note in the sequence
        inputs = sequences[:-1]
        print(inputs)
        # The label is the last note in the sequence
        labels_dense = sequences[-1]

        # Creating a dictionary of labels for each feature
        labels = {key: labels_dense[i] for i, key in enumerate(key_order)}
        return scale_pitch(inputs), labels
    
    # Apply the splitting and scaling to each sequence
    return sequences.map(split_labels, num_parallel_calls=tf.data.AUTOTUNE)


In [None]:
seq_length = 25
vocab_size = 128
head_seq_ds = create_sequences(head_midi_single, seq_length, vocab_size)
head_seq_ds.element_spec

In [None]:
tail_seq_ds = create_sequences(tail_midi_single, seq_length, vocab_size)
tail_seq_ds.element_spec

The input shape will be
 [H1, H2, H3
  T1, T2, T3]
Target will be
[H4, T4]

In [None]:
def combine_datasets(ds1, ds2):
    """
    Combines two TensorFlow datasets into one, concatenating their main tensors to shape (50, 4), and also combines their labels.

    Args:
    ds1 (tf.data.Dataset): The first dataset to combine. Each element is a tuple of (input, label).
    ds2 (tf.data.Dataset): The second dataset to combine. Each element is a tuple of (input, label).

    Returns:
    tf.data.Dataset: A TensorFlow Dataset with combined elements and labels.
    """

    def concatenate_elements(element1, element2):
        # Extracting the main tensors and labels
        tensor1, label1 = element1
        tensor2, label2 = element2

        # Concatenate inputs along the first axis
        combined_tensor = tf.concat([tensor1, tensor2], axis=0)

        # Combine labels into a dictionary. Assuming label1 and label2 are structured similarly.
        combined_labels = {f'{key}0': label1[key] for key in label1}
        combined_labels.update({f'{key}1': label2[key] for key in label2})

        return combined_tensor, combined_labels

    # Zip the datasets to create pairs of elements
    zipped_datasets = tf.data.Dataset.zip((ds1, ds2))

    # Map the concatenate function over the dataset
    combined_dataset = zipped_datasets.map(concatenate_elements)

    return combined_dataset

# Example usage
combined_dataset = combine_datasets(head_seq_ds, tail_seq_ds)



In [None]:
def num_dataset(dataset):
    total = 0
    for elements in dataset:
        for element in elements:
            print(element)
            print("==")
        total += 1
    return total
print(num_dataset(combined_dataset))

In [None]:
for inputs, labels in combined_dataset.take(1):
    print("Label keys:", labels.keys())

In [None]:
def mse_with_positive_pressure(y_true: tf.Tensor, y_pred: tf.Tensor):
    mse = (y_true - y_pred) ** 2
    positive_pressure = 10 * tf.maximum(-y_pred, 0.0)
    return tf.reduce_mean(mse + positive_pressure)

In [None]:
input_shape = (seq_length * 2, 4)  # Combined length of two songs, 4 features each
learning_rate = 0.005

inputs = tf.keras.Input(shape=input_shape)
x = tf.keras.layers.LSTM(128)(inputs)

# Flatten the output of LSTM to map it to the dense layers
x = tf.keras.layers.Flatten()(x)

# Outputs for two notes, each with 4 features
output_features = ['pitch', 'step', 'duration', 'velocity']
outputs = {}
for i in range(2):  # Two notes
    for feature in output_features:
        # Assuming 128 units for 'pitch' and 1 unit for others as per your structure
        units = 128 if feature == 'pitch' or feature == 'velocity' else 1
        outputs[f'{feature}{i}'] = tf.keras.layers.Dense(units, name=f'{feature}{i}')(x)

model = tf.keras.Model(inputs=inputs, outputs=outputs)

# Define losses for each output
loss = {}

for key in outputs:
    if 'pitch' in key:
        loss[key] = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    if 'velocity' in key:
        loss[key] = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    else:
        loss[key] = mse_with_positive_pressure 

# optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
optimizer = tf.keras.optimizers.legacy.Adam(learning_rate=learning_rate)

model.compile(loss=loss, optimizer=optimizer)

model.summary()


In [None]:
batch_size = 64
buffer_size = num_dataset(combined_dataset) - seq_length
train_dataset = (combined_dataset.shuffle(buffer_size).batch(batch_size).cache().prefetch(tf.data.experimental.AUTOTUNE))
losses = model.evaluate(train_dataset, return_dict=True)
losses

In [None]:
model.compile(
    loss=loss,
    loss_weights={
        'pitch': 0.05,
        'step': 1.0,
        'duration':1.0,
    },
    optimizer=optimizer,
)

In [None]:
losses = model.evaluate(train_dataset, return_dict=True)
losses

In [None]:
callbacks = [
    tf.keras.callbacks.ModelCheckpoint(
        filepath='./training_checkpoints/ckpt_{epoch}',
        save_weights_only=True),
    tf.keras.callbacks.EarlyStopping(
        monitor='loss',
        patience=5,
        verbose=1,
        restore_best_weights=True),
]

In [None]:
%%time
epochs = 50

history = model.fit(
    train_dataset,
    epochs=epochs,
    callbacks=callbacks,
)

In [None]:
plt.plot(history.epoch, history.history['loss'], label='total loss')
plt.show()

## Generate Note

In [None]:
def extract_last_sequence(dataset, seq_length):
    last_sequence = None

    # Iterate over the dataset to get the last sequence
    for sequences, _ in dataset.take(1):
        # Ensure that the sequences tensor has the expected number of dimensions
        if len(sequences.shape) == 3:
            # Extract the last 'seq_length' elements as a single sequence
            last_sequence = sequences[:, -seq_length:, :]
        elif len(sequences.shape) == 2:
            # If there's no batch dimension, add it
            last_sequence = sequences[-seq_length:, :]
            last_sequence = tf.expand_dims(last_sequence, axis=0)
        else:
            raise ValueError("Unexpected shape of the sequences tensor")

    if last_sequence is None:
        raise ValueError("No sequences were extracted from the dataset")

    return last_sequence

# Usage
seq_length = 50  # Make sure this is correctly set to 50
last_seq = extract_last_sequence(combined_dataset, seq_length)
print("Extracted sequence shape:", last_seq.shape)


In [None]:
def print_model_input_shape(model: tf.keras.Model):
    if not model or not model.layers:
        print("Model or model layers are not defined.")
        return

    # Accessing the first layer of the model
    first_layer = model.layers[0]

    # Printing the input shape expected by the first layer
    # Note: The input shape could be a list in case of multiple input layers
    if isinstance(first_layer.input_shape, list):
        print("Model expects multiple inputs:")
        for shape in first_layer.input_shape:
            print(shape)
    else:
        print("Model expected input shape:", first_layer.input_shape)

# Example usage
print_model_input_shape(model)


In [None]:
def predict_next_note(
    notes: tf.Tensor, 
    model: tf.keras.Model, 
    temperature: float = 1.0):
    """
    Generates two notes, each as a tuple of (pitch, step, duration, velocity), using a trained sequence model.
    """
    assert temperature > 0

    # Get predictions from the model
    
    predictions = model.predict(notes)
    print(predictions)
    # Process predictions for two notes
    generated_notes = []
    for i in range(2):  # Two notes
        pitch_logits = predictions[f'pitch{i}'] / temperature

        pitch = tf.random.categorical(pitch_logits, num_samples=1)
        pitch = tf.squeeze(pitch, axis=-1).numpy()

        step = predictions[f'step{i}']
        duration = predictions[f'duration{i}']
        duration = tf.squeeze(duration, axis=-1)
        step = tf.squeeze(step, axis=-1)
        step = tf.maximum(0, step)
        duration = tf.maximum(0, duration)
        velocity_logits = predictions[f'velocity{i}'] / temperature
        velocity =  tf.random.categorical(velocity_logits, num_samples=1)
        velocity = tf.squeeze(velocity, axis=-1).numpy()
#         # Construct the note tuple
        note = (int(pitch), float(step), float(duration), int(velocity))
        generated_notes.append(note)

    return generated_notes[0], generated_notes[1]

# # Usage
# # Assuming `last_seq` is your input tensor and `model` is your trained Keras model
predict_next_note(last_seq, model, temperature=1.0)


In [None]:
def generate_until_three_repeats(model, initial_sequence, seq_length, temperature=1.0, max_pairs=50):
    note1_sequence = []
    note2_sequence = []
    current_sequence = initial_sequence
    repeat_count = 0
    last_pair_matched = False
    total_pairs_generated = 0

    while repeat_count < 3 and total_pairs_generated < max_pairs:
        # Generate next two notes
        note1, note2 = predict_next_note(current_sequence, model, temperature)
        total_pairs_generated += 1

        # Check if the pitch of note1 is the same as the pitch of note2
        if note1[0] == note2[0]:  # Comparing pitch of both notes
            if last_pair_matched:
                repeat_count += 1
            else:
                repeat_count = 1
            last_pair_matched = True
        else:
            repeat_count = 0
            last_pair_matched = False

        # Append notes to their respective sequences
        note1_sequence.append(note1)
        note2_sequence.append(note2)

        if repeat_count >= 3:
            break  # Exit if the condition is met for three consecutive pairs

        # Reshape and concatenate the new notes to match current_sequence
        new_notes_tensor = tf.convert_to_tensor([note1, note2], dtype=current_sequence.dtype)
        new_notes_tensor = tf.reshape(new_notes_tensor, (1, 2, -1))

        # Concatenate along the sequence dimension
        new_sequence = tf.concat([current_sequence[:, :seq_length-2, :], new_notes_tensor], axis=1)
        current_sequence = new_sequence

    return note1_sequence, note2_sequence

# Example usage
note1_sequence, note2_sequence = generate_until_three_repeats(model, last_seq, seq_length, temperature=1.0, max_pairs=50)


In [None]:
print(note1_sequence)
print(note2_sequence)

In [None]:
def append_generated_notes_to_df(generated_notes, existing_df, program, is_drum, instrument_name):
    """
    Appends generated notes to an existing DataFrame.

    Args:
    generated_notes (list of tuples): Generated notes in the format [(pitch, step, duration, velocity), ...].
    existing_df (pd.DataFrame): The existing DataFrame to append notes to.
    program (int): The MIDI program number.
    is_drum (bool): Whether the instrument is a drum.
    instrument_name (str): The name of the instrument.

    Returns:
    pd.DataFrame: The DataFrame with the generated notes appended.
    """
    # Determine the start time for the first generated note
    initial_start = existing_df['end'].iloc[-1] if not existing_df.empty else 0

    # Prepare data for new DataFrame
    note_data = []
    current_start = initial_start
    for note in generated_notes:
        pitch, step, duration, velocity = note
        start = current_start + step
        end = start + duration
        note_data.append({
            'pitch': pitch,
            'start': start,
            'end': end,
            'step': step,
            'duration': duration,
            'velocity': velocity,
            'program': program,
            'is_drum': is_drum,
            'instrument_name': instrument_name
        })
        current_start = end

    # Create a DataFrame from generated_notes
    new_notes_df = pd.DataFrame(note_data)

    # Append new notes to the existing DataFrame
    return pd.concat([existing_df, new_notes_df], ignore_index=True)

head_appended_df = append_generated_notes_to_df(note1_sequence, head_midi, program=0, is_drum=False, instrument_name='Acoustic Grand Piano')

In [None]:
def prepend_generated_notes_to_df(generated_notes, existing_df, program, is_drum, instrument_name):
    """
    Prepends generated notes to an existing DataFrame.

    Args:
    generated_notes (list of tuples): Generated notes in the format [(pitch, step, duration, velocity), ...].
    existing_df (pd.DataFrame): The existing DataFrame to prepend notes to.
    program (int): The MIDI program number.
    is_drum (bool): Whether the instrument is a drum.
    instrument_name (str): The name of the instrument.

    Returns:
    pd.DataFrame: The DataFrame with the generated notes prepended.
    """
    # Calculate the total duration of the generated notes
    total_duration = sum(note[2] for note in generated_notes)

    # Shift the start and end times of the existing notes
    existing_df_shifted = existing_df.copy()
    existing_df_shifted['start'] += total_duration
    existing_df_shifted['end'] += total_duration

    # Prepare data for new DataFrame
    note_data = []
    current_start = 0  # Start from 0 for the first new note
    for note in generated_notes:
        pitch, step, duration, velocity = note
        start = current_start + step
        end = start + duration
        note_data.append({
            'pitch': pitch,
            'start': start,
            'end': end,
            'step': step,
            'duration': duration,
            'velocity': velocity,
            'program': program,
            'is_drum': is_drum,
            'instrument_name': instrument_name
        })
        current_start = end

    # Create a DataFrame from generated_notes
    new_notes_df = pd.DataFrame(note_data)

    # Prepend new notes to the shifted existing DataFrame
    return pd.concat([new_notes_df, existing_df_shifted], ignore_index=True)

tail_appended_df = append_generated_notes_to_df(note2_sequence, tail_midi.iloc[::-1].reset_index(drop=True), program=0, is_drum=False, instrument_name='Acoustic Grand Piano')


In [None]:
create_midi_from_dataframe(head_appended_df, 'head.midi')
play_midi('head.midi', 'final_head.wav')

In [None]:
create_midi_from_dataframe(tail_appended_df, 'tail.midi')
play_midi('tail.midi', 'final_tail.wav')