In [1]:
import os
import json
import numpy as np
import random
import glob
import music21 as m21
import djalgo as dj

def scan_midi_files(directory, max_files=None):
    """
    Scans the specified directory for MIDI files using glob with a while loop.

    Args:
        directory (str): The directory to scan for MIDI files.
        max_files (int, optional): The maximum number of files to scan. If None, all files are scanned.

    Returns:
        list: The list of MIDI files found.
    """
    search_pattern = os.path.join(directory, '**', '*.mid*')
    midi_files = []

    # Utiliser glob.iglob pour obtenir un itérateur
    for file in glob.iglob(search_pattern, recursive=True):
        midi_files.append(file)
        if max_files is not None and len(midi_files) >= max_files:
            break

    return midi_files

In [2]:
midi_files = scan_midi_files('_midi')

In [3]:
import mido
from mido import MidiFile, MidiTrack

def repair_midi(input_path, output_path, split_channels=False, change_instruments=None):
    """
    Repairs a MIDI file by splitting or merging tracks and optionally changing instruments.
    
    Args:
        input_path (str): Path to the input MIDI file.
        output_path (str): Path to save the repaired MIDI file.
        split_channels (bool): If True, splits tracks into separate tracks based on channel.
        change_instruments (dict): Optional dictionary mapping from channel to new instrument program number.
    """
    mid = MidiFile(input_path)
    new_mid = MidiFile()
    
    if split_channels:
        # Create a track for each channel (0-15)
        tracks_per_channel = [MidiTrack() for _ in range(16)]
        
        # Distribute messages to appropriate track based on channel
        for track in mid.tracks:
            for msg in track:
                if not msg.is_meta and hasattr(msg, 'channel'):
                    if change_instruments and msg.type == 'program_change' and msg.channel in change_instruments:
                        msg.program = change_instruments[msg.channel]
                    tracks_per_channel[msg.channel].append(msg)
                else:
                    # Append meta messages to all tracks
                    [t.append(msg) for t in tracks_per_channel]
                    
        # Add non-empty tracks to the new MIDI file
        for track in tracks_per_channel:
            if any(not msg.is_meta for msg in track):  # Ensuring the track is not empty
                new_mid.tracks.append(track)
    else:
        # Merge all tracks into one or keep as is, based on the MIDI file's structure
        combined_track = MidiTrack()
        for track in mid.tracks:
            for msg in track:
                if change_instruments and msg.type == 'program_change' and msg.channel in change_instruments:
                    msg.program = change_instruments[msg.channel]
                combined_track.append(msg)
        new_mid.tracks.append(combined_track)
        
    # Save the repaired MIDI file
    new_mid.save(output_path)

# Example usage:

for file in midi_files:
    repair_midi(file, file.replace('_midi/', '_midi-repaired/'), split_channels=True)


In [4]:
midi_files = scan_midi_files('_midi-repaired')

In [5]:
midi_files

['_midi-repaired/adams.mid',
 '_midi-repaired/mario.mid',
 '_midi-repaired/pinkpanther.mid',
 '_midi-repaired/rocky.mid',
 '_midi-repaired/tetris.mid']

In [6]:
for midi_file in midi_files:
    score = m21.converter.parse(midi_file)
    parts = score.getElementsByClass(m21.stream.Part)
    print(f'File: {midi_file}, length: {len(parts)} parts')

File: _midi-repaired/adams.mid, length: 2 parts
File: _midi-repaired/mario.mid, length: 2 parts
File: _midi-repaired/pinkpanther.mid, length: 2 parts
File: _midi-repaired/rocky.mid, length: 2 parts
File: _midi-repaired/tetris.mid, length: 2 parts


In [88]:
import numpy as np
import music21 as m21

def replace_none_with_weighted_value(data):
    # Copy data to avoid modifying the original array directly
    modified_data = np.array(data, dtype=object)
    for i in range(len(modified_data)):
        if modified_data[i] is None:
            nearest_prev = nearest_next = None
            dist_prev = dist_next = float('inf')
            
            for j in range(i - 1, -1, -1):
                if modified_data[j] is not None:
                    nearest_prev = modified_data[j]
                    dist_prev = i - j
                    break
            
            for k in range(i + 1, len(modified_data)):
                if modified_data[k] is not None:
                    nearest_next = modified_data[k]
                    dist_next = k - i
                    break
            
            if nearest_prev is not None and nearest_next is not None:
                total_weight = 1 / dist_prev + 1 / dist_next
                weighted_value = (nearest_prev * (1 / dist_prev) + nearest_next * (1 / dist_next)) / total_weight
            elif nearest_prev is not None:
                weighted_value = nearest_prev
            elif nearest_next is not None:
                weighted_value = nearest_next
            else:
                weighted_value = 0  # Default or handle as needed

            modified_data[i] = weighted_value
    return modified_data


def prepare_data_for_gpflow(midi_files, num_instruments=2):
    features = []  # Will hold the input features
    targets = []   # Will hold the targets with instrument index

    for midi_file in midi_files:
        score = m21.converter.parse(midi_file)
        parts = score.getElementsByClass(m21.stream.Part)
        for idx, part in enumerate(parts[:num_instruments]):
            key = part.analyze('key')
            scale_list = dj.harmony.Scale(key.tonic.name, key.mode).generate()            
            for element in part.flatten().notesAndRests:
                offset = float(element.offset)
                duration = float(element.duration.quarterLength)
                activity = 0 if isinstance(element, m21.note.Rest) else 1
                
                if isinstance(element, m21.note.Note):
                    pitch = element.pitch.midi
                elif isinstance(element, m21.chord.Chord):
                    pitch = element.pitches[0].midi
                else:
                    pitch = None
                
                degree = dj.utils.get_degree_from_pitch(pitch, scale_list=scale_list, tonic_pitch=key.tonic.midi) if pitch is not None else None
                
                features.append([offset, idx])  # Features (time offset)
                # Concatenating instrument index with each target
                targets.append([degree, duration, activity])

    # Replace None values in degrees with a weighted average of the nearest non-None values
    all_degrees = [t[0] for t in targets]
    all_degrees_noNone = replace_none_with_weighted_value(all_degrees)
    targets = [[all_degrees_noNone[i], t[1], t[2]] for i, t in enumerate(targets)]
        
    X = np.array(features).astype(np.float64)
    Y = np.array(targets).astype(np.float64)

    return X, Y

# Example usage
num_instruments=2
midi_files = scan_midi_files('_midi-repaired')
X, Y = prepare_data_for_gpflow(midi_files, num_instruments=num_instruments)


In [89]:
def scaling(x):
    mean = np.mean(x, axis=0)
    std = np.std(x, axis=0)
    return (x - mean) / std, mean, std


Y_sc = Y.copy()
Y_sc[:, :2], mean, std = scaling(Y[:, :2])

X_sc = X.copy()
X_sc[:, 0], mean_X, std_X = scaling(X[:, 0])

In [90]:
X_sc

array([[-1.28407026,  0.        ],
       [-1.26243537,  0.        ],
       [-1.24080048,  0.        ],
       ...,
       [-0.72156317,  1.        ],
       [-0.71615445,  1.        ],
       [-0.71074573,  1.        ]])

In [91]:
Y_sc

array([[-0.39304276,  2.28918835,  0.        ],
       [-0.39304276,  2.28918835,  0.        ],
       [-0.39304276,  0.70454098,  0.        ],
       ...,
       [-0.4499179 , -0.0877827 ,  1.        ],
       [-0.27929249, -0.0877827 ,  1.        ],
       [-0.27929249,  0.70454098,  0.        ]])

In [92]:
import gpflow
import tensorflow as tf
import numpy as np
from gpflow.utilities import print_summary
from gpflow.kernels import MultioutputKernel

In [93]:
import gpflow


# Define the base kernel for the GPs
base_kernel = gpflow.kernels.SquaredExponential()

# Number of outputs matches the number of instruments
num_outputs = int(np.max(X_sc[:, 1]) + 1)  # Assuming column 1 is the index of instruments

# Coregionalization kernel setup
coreg_kernel = gpflow.kernels.Coregion(output_dim=num_outputs, rank=num_outputs, active_dims=[1])
coreg_kernel.W.assign(np.random.rand(num_outputs, num_outputs))

# Combine the base kernel with the coregionalization kernel
# Ensure each model type (degrees, durations, activities) has its own combined kernel
kernel_degrees = base_kernel * coreg_kernel
kernel_durations = base_kernel * coreg_kernel
kernel_activities = base_kernel * coreg_kernel

# Setup the GP models for each output
# Use X_sc which contains the scaled features including time and instrument index
model_degrees = gpflow.models.SVGP(kernel=kernel_degrees, likelihood=gpflow.likelihoods.Gaussian(), inducing_variable=X_sc.copy())
model_durations = gpflow.models.SVGP(kernel=kernel_durations, likelihood=gpflow.likelihoods.Gaussian(), inducing_variable=X_sc.copy())
model_activities = gpflow.models.SVGP(kernel=kernel_activities, likelihood=gpflow.likelihoods.Bernoulli(), inducing_variable=X_sc.copy())

# Define training data for each model using the appropriate columns from Y
training_data_degrees = (X_sc, Y_sc[:, 0:1])  # First column of Y for degrees
training_data_durations = (X_sc, Y_sc[:, 1:2])  # Second column of Y for durations
training_data_activities = (X_sc, Y_sc[:, 2:3])  # Third column of Y for activities

# Optimization using GPflow's built-in optimizer for each model
optimizer = gpflow.optimizers.Scipy()
optimizer.minimize(model_degrees.training_loss_closure(training_data_degrees),
                   variables=model_degrees.trainable_variables,
                   options=dict(maxiter=100))
optimizer.minimize(model_durations.training_loss_closure(training_data_durations),
                   variables=model_durations.trainable_variables,
                   options=dict(maxiter=100))
optimizer.minimize(model_activities.training_loss_closure(training_data_activities),
                   variables=model_activities.trainable_variables,
                   options=dict(maxiter=100))


2