In [5]:
import numpy as np
import os,glob
import music21
from music21 import *
from pathlib import Path
from collections import Counter
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import torch
import torch.nn as nn
import torch.nn.functional as F


def note_to_int(note): # converts the note's letter to pitch value which is integer form.
    
    
    note_base_name = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
    if ('#-' in note):
        first_letter = note[0]
        base_value = note_base_name.index(first_letter)
        octave = note[3]
        value = base_value + 12*(int(octave)-(-1))
        
    elif ('#' in note): 
        first_letter = note[0]
        base_value = note_base_name.index(first_letter)
        octave = note[2]
        value = base_value + 12*(int(octave)-(-1))
        
    elif ('-' in note): 
        first_letter = note[0]
        base_value = note_base_name.index(first_letter)
        octave = note[2]
        value = base_value + 12*(int(octave)-(-1))
        
    else:
        first_letter = note[0]
        base_val = note_base_name.index(first_letter)
        octave = note[1]
        value = base_val + 12*(int(octave)-(-1))
        
    return value

min_value = 0.00
lower_first = 0.00
#lower_first = 0.1

lower_second = 0.5
#lower_second = 0.4
upper_first = 0.5
#upper_first = 0.6

upper_second = 1.0
#upper_second = 0.8
max_value = 1.0

def notes_to_matrix(notes, durations, offsets, min_value=min_value, lower_first=lower_first,
                    lower_second=lower_second,
                    upper_first=upper_first, upper_second=upper_second,
                    max_value=max_value):
    
    
    try:
        last_offset = int(offsets[-1]) 
    except IndexError:
        print ('Index Error')
        return (None, None, None)
    
    total_offset_axis = last_offset * 4 + (8 * 4) 
    our_matrix = np.random.uniform(min_value, lower_first, (128, int(total_offset_axis))) 
    # creates matrix and fills with (-1, -0.3), this values will represent the rest.
    
    for (note, duration, offset) in zip(notes, durations, offsets):
        how_many = int(float(duration)/0.25) # indicates time duration for single note.
    
        first_touch = np.random.uniform(upper_second, max_value, 1)
        continuation = np.random.uniform(lower_second, upper_first, 1)
        
        if ('.' not in str(note)): # It is not chord. Single note.
            our_matrix[note, int(offset * 4)] = first_touch
            our_matrix[note, int((offset * 4) + 1) : int((offset * 4) + how_many)] = continuation

        else: # For chord
            chord_notes_str = [note for note in note.split('.')] 
            chord_notes_float = list(map(int, chord_notes_str)) # Take notes in chord one by one

            for chord_note_float in chord_notes_float:
                our_matrix[chord_note_float, int(offset * 4)] = first_touch
                our_matrix[chord_note_float, int((offset * 4) + 1) : int((offset * 4) + how_many)] = continuation
                
    return our_matrix

def check_float(duration): # This function fix the issue which comes from some note's duration. 
                        # For instance some note has duration like 14/3 or 7/3. 
    if ('/' in duration):
        numerator = float(duration.split('/')[0])
        denominator = float(duration.split('/')[1])
        duration = str(float(numerator/denominator))
    return duration

def midi_to_matrix(filename, length=250): # Convert midi file to matrix for DL architecture.
    
    midi = converter.parse(filename)
    notes_to_parse = None
    
    try :
        parts = music21.instrument.partitionByInstrument(midi)
    except TypeError:
        print ('Type error.')
        return None
    
    instrument_names = []
    
    try:
        for instrument in parts: # Learn names of instruments.
            name = (str(instrument).split(' ')[-1])[:-1]
            instrument_names.append(name)
    
    except TypeError:
        print ('Type is not iterable.')
        return None
    
    # Just take piano part. For the future works, we can use different instrument.
    try:
        piano_index = instrument_names.index('Piano')
    except ValueError:
        print ('%s have not any Piano part' %(filename))
        return None
    
    
    notes_to_parse = parts.parts[piano_index].recurse()
    
    #duration_piano = float(check_float((str(notes_to_parse._getDuration()).split(' ')[-1])[:-1]))

    durations = []
    notes = []
    offsets = []
    
    for element in notes_to_parse:
        if isinstance(element, note.Note): # If it is single note
            notes.append(note_to_int(str(element.pitch))) # Append note's integer value to "notes" list.
            duration = str(element.duration)[27:-1] 
            durations.append(check_float(duration)) 
            offsets.append(element.offset)

        elif isinstance(element, chord.Chord): # If it is chord
            notes.append('.'.join(str(note_to_int(str(n)))
                                for n in element.pitches))
            duration = str(element.duration)[27:-1]
            durations.append(check_float(duration))
            offsets.append(element.offset)

    
    
    our_matrix = notes_to_matrix(notes, durations, offsets)
    
    try:
        freq, time = our_matrix.shape
    except AttributeError:
        print ("'tuple' object has no attribute 'shape'")
        return None
            
    if (time >= length):
        return (our_matrix[:,:length]) # We have to set all individual note matrix to same shape for Generative DL.
    else:
        print ('%s have not enough duration' %(filename))

def int_to_note(integer):
    # Convert pitch value to the note which is a letter form. 
    note_base_name = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
    octave_detector = (integer // 12) 
    base_name_detector = (integer % 12) 
    note = note_base_name[base_name_detector] + str((int(octave_detector))-1)
    if ('-' in note):
        note = note_base_name[base_name_detector] + str(0)
        return note
    return note


lower_bound = (lower_first + lower_second) / 2
upper_bound = (upper_first + upper_second) / 2

def converter_func(arr,first_touch = 1.0, continuation = 0.0, lower_bound = lower_bound, upper_bound = upper_bound):
    
    np.place(arr, arr < lower_bound, -1.0)
    np.place(arr, (lower_bound <= arr) & (arr < upper_bound), 0.0)
    np.place(arr, arr >= upper_bound, 1.0)
    return arr

def how_many_repetitive_func(array, from_where=0, continuation=0.0):
    new_array = array[from_where:]
    count_repetitive = 1 
    for i in new_array:
        if (i != continuation):
            return (count_repetitive)
        else:
            count_repetitive += 1
    return (count_repetitive)

def matrix_to_midi(matrix, random=0):
    first_touch = 1.0
    continuation = 0.0
    y_axis, x_axis = matrix.shape
    output_notes = []
    offset = 0
        
    # Delete rows until the row which include 'first_touch'
    how_many_in_start_zeros = 0
    for x_axis_num in range(x_axis):
        one_time_interval = matrix[:,x_axis_num] # Values in a column.
        one_time_interval_norm = converter_func(one_time_interval)
        if (first_touch not in one_time_interval_norm):
            how_many_in_start_zeros += 1
        else:
            break
            
    how_many_in_end_zeros = 0
    for x_axis_num in range(x_axis-1,0,-1):
        one_time_interval = matrix[:,x_axis_num] # values in a column
        one_time_interval_norm = converter_func(one_time_interval)
        if (first_touch not in one_time_interval_norm):
            how_many_in_end_zeros += 1
        else:
            break
        
    print ('How many rows for non-start note at beginning:', how_many_in_start_zeros)
    print ('How many rows for non-start note at end:', how_many_in_end_zeros)

    matrix = matrix[:,how_many_in_start_zeros:]
    y_axis, x_axis = matrix.shape
    print (y_axis, x_axis)

    for y_axis_num in range(y_axis):
        one_freq_interval = matrix[y_axis_num,:] # Values in a row.
        
        one_freq_interval_norm = converter_func(one_freq_interval)
        
        i = 0        
        offset = 0
        
        if (random):
        
            while (i < len(one_freq_interval)):
                how_many_repetitive = 0
                temp_i = i
                if (one_freq_interval_norm[i] == first_touch):
                    how_many_repetitive = how_many_repetitive_func(one_freq_interval_norm, from_where=i+1, continuation=continuation)
                    i += how_many_repetitive 

                if (how_many_repetitive > 0):
                    random_num = np.random.randint(3,6)
                    new_note = note.Note(int_to_note(y_axis_num),duration=duration.Duration(0.25*random_num*how_many_repetitive))
                    new_note.offset = 0.25*temp_i*2
                    new_note.storedInstrument = instrument.Piano()
                    output_notes.append(new_note)
                else:
                    i += 1
        
        
        else:
        
            while (i < len(one_freq_interval)):
                how_many_repetitive = 0
                temp_i = i
                if (one_freq_interval_norm[i] == first_touch):
                    how_many_repetitive = how_many_repetitive_func(one_freq_interval_norm, from_where=i+1, continuation=continuation)
                    i += how_many_repetitive 

                if (how_many_repetitive > 0):
                    new_note = note.Note(int_to_note(y_axis_num),duration=duration.Duration(0.25*how_many_repetitive))
                    new_note.offset = 0.25*temp_i
                    new_note.storedInstrument = instrument.Piano()
                    output_notes.append(new_note)
                else:
                    i += 1
        
    return output_notes



def visualize_adjacency_matrix(adjacency_matrix):
    fig, ax = plt.subplots()
    cmap = plt.get_cmap("binary")
    n = len(adjacency_matrix)
    
    im = ax.imshow(adjacency_matrix, cmap=cmap)
    
    ax.set_xticks(np.arange(n))
    ax.set_yticks(np.arange(n))
    ax.set_xticklabels(np.arange(1, n + 1))
    ax.set_yticklabels(np.arange(1, n + 1))
    
    # 設定 x, y 軸的 tick
    x_major_ticks = ticker.MultipleLocator(4)
    y_major_ticks = ticker.MultipleLocator(4)
    ax.xaxis.set_major_locator(x_major_ticks)
    ax.yaxis.set_major_locator(y_major_ticks)

    plt.show()

def extract_bars_from_midi(midi_file):
    midi_score = converter.parse(midi_file)
    bars = []
    for part in midi_score.parts:
        bar_stream = part.makeMeasures()
        for bar in bar_stream:
            bars.append(bar)
    return bars

def extract_bars_from_musicxml(xml_file):
    score = converter.parse(xml_file)
    bars = []
    for part in score.parts:
        measures = part.getElementsByClass('Measure')
        for measure in measures:
            bars.append(measure)
    return bars

def get_pitch_interval(note1, note2):
    if not isinstance(note1, note.Note) or not isinstance(note2, note.Note):
        return None
    else:
        return interval.notesToChromatic(note1, note2).semitones

def count_notes_in_bar(bar):
    notes_list = bar.flat.notes
    notes_counter = Counter()
    total_duration = 0.0
    
    for item in notes_list:
        if item.isNote:
            notes_counter[item.nameWithOctave] += item.duration.quarterLength
            total_duration += item.duration.quarterLength
        elif item.isChord:
            chord_duration = max([n.duration.quarterLength for n in item.notes])
            for n in item.notes:
                notes_counter[n.nameWithOctave] += chord_duration
            total_duration += chord_duration
                
    return notes_counter, total_duration

def are_bars_similar(bar1, bar2, threshold=0.9):
    bar1_notes, bar1_duration = count_notes_in_bar(bar1)
    bar2_notes, bar2_duration = count_notes_in_bar(bar2)

    shared_keys = set(bar1_notes.keys()) & set(bar2_notes.keys())
    total_shared_notes = sum([min(bar1_notes[key], bar2_notes[key]) for key in shared_keys])

    total_duration = bar1_duration + bar2_duration
    
    if total_duration != 0:
        similarity = (total_shared_notes * 2) / total_duration
    else:
        similarity = 0
    return similarity >= threshold


def create_adjacency_matrix(bars ,status):
    num_bars = len(bars)
    adjacency_matrix = np.zeros((num_bars, num_bars))

    if status == 'repeat':

        for i in range(num_bars):
            for j in range(i + 1, num_bars):
                if are_bars_similar(bars[i], bars[j]):
                    adjacency_matrix[i, j] = 1
                    adjacency_matrix[j, i] = 1

    elif status == 'rhythm':
        for i in range(num_bars):
            for j in range(i + 1, num_bars):
                bar1_notes = [n for n in bars[i].flat.notesAndRests if not isinstance(n, note.Rest)]
                bar2_notes = [n for n in bars[j].flat.notesAndRests if not isinstance(n, note.Rest)]

                distances = []
                for n1 in bar1_notes:
                    for n2 in bar2_notes:
                        dist = get_pitch_interval(n1, n2)
                        if dist is not None:
                            distances.append(dist)
                if len(distances) > 0:
                    mean_distance = sum(distances) / len(distances)

                    # Add a weight for the rhythmic relationship between the bars
                    # The weight is the absolute value of the difference between the durations of the first note/rest in each bar
                    bar1_duration = 0.0
                    for n in bar1_notes:
                        if n.duration.quarterLength > 0.0:
                            bar1_duration = n.duration.quarterLength
                            break

                    bar2_duration = 0.0
                    for n in bar2_notes:
                        if n.duration.quarterLength > 0.0:
                            bar2_duration = n.duration.quarterLength
                            break
                    '''
                    rhythm_weight = abs(bar1_duration - bar2_duration)
                    if (mean_distance - rhythm_weight) > 0:
                        adjacency_matrix[i][j] = 1
                        adjacency_matrix[j][i] = 1
                    '''
                    rhythm_weight = abs(bar1_duration - bar2_duration)
                    '''
                    adjacency_matrix[i][j] = mean_distance + rhythm_weight
                    adjacency_matrix[j][i] = mean_distance + rhythm_weight
                    '''
                    # Only show 1 or 0
                    adjacency_matrix[i][j] = 1 if mean_distance + rhythm_weight > 0 else 0
                    adjacency_matrix[j][i] = 1 if mean_distance + rhythm_weight > 0 else 0
                    

    return adjacency_matrix

def merge_metrix(adj_1,adj_2):
    
    n = len(adj_1)
    
    merged_matrix = [[0] * (2 * n) for _ in range(2 * n)]
    
    for i in range(n):
        for j in range(n):
            merged_matrix[i][j] = adj_1[i][j]
    
    for i in range(n):
        for j in range(n):
            merged_matrix[n+i][n+j] = adj_2[i][j]
    
    return merged_matrix

def music_to_adjacency_metrix(file, bars_len):

    midi_file = file
    #midi_file = '../bestekar/midi_files/classic/schumann/schum_abegg_format0.mid'

    # Extract first 32 bars
    if midi_file.endswith(".mxl"):
        bars = extract_bars_from_musicxml(midi_file)[:bars_len]   #for xml files

    elif midi_file.endswith(".mid"):
        bars = extract_bars_from_midi(midi_file)[:bars_len]  #for midi files

    bars_repeat = create_adjacency_matrix(bars, 'repeat')
    bars_rhythm = create_adjacency_matrix(bars, 'rhythm')

    merge_adj = merge_metrix(bars_repeat, bars_rhythm)

    return merge_adj

def music_to_feature(midi_file):

    features_list = []
    features_list = midi_to_matrix(midi_file, length=500)
    f = np.array(features_list)
   
    return f

def merge_list_and_tensor(list_data, tensor_data):
    
    list_data = torch.tensor(list_data, dtype=torch.float32)
    #merge_list = torch.cat([list_data, tensor_data], dim=0)
    merge_list = torch.zeros((list_data.size(0), list_data.size(1)* 2))
    merge_list[:, :list_data.size(1)] = list_data
    merge_list[:tensor_data.size(0), list_data.size(1):] = tensor_data
    #merge_list = merge_list[:, :300]#take first 300

    return merge_list

def reshape_feature(data ,dim):
    
    data_var = np.var(data, axis=1)
    top_indices = np.argsort(data_var)[::-1][:dim]

    selected_feature = data[top_indices,:]
    selected_feature = torch.tensor(selected_feature,dtype=torch.float32)

    return selected_feature

class GCNLayer(nn.Module):
    def __init__(self, input_dim, output_dim, dropout_rate):
        super(GCNLayer, self).__init__()
        self.linear = nn.Linear(input_dim, output_dim)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, adjacency_matrix, feature_matrix):
        # 计算传播规则
        support = torch.matmul(adjacency_matrix, feature_matrix)
        output = self.linear(support)
        output = self.dropout(output)
        output = F.relu(output)
        return output

class GCNModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout_rate):
        super(GCNModel, self).__init__()
        self.layer1 = GCNLayer(input_dim, hidden_dim, dropout_rate)
        self.layer2 = GCNLayer(hidden_dim, output_dim, dropout_rate)

    def forward(self, adjacency_matrix, feature_matrix):
        hidden = self.layer1(adjacency_matrix, feature_matrix)
        output = self.layer2(adjacency_matrix, hidden)
        return output



def GCN_trainning(merge_adj, f):

    adjacency_matrix = torch.tensor(merge_adj, dtype=torch.float32)
    #feature_matrix = torch.tensor(f , dtype=torch.float32)
    feature_matrix = f.clone().detach().requires_grad_(True)


    input_dim = feature_matrix.shape[1]
    hidden_dim = 16
    output_dim = 500    #250
    dropout_rate = 0.22

    
    
    model = GCNModel(input_dim, hidden_dim, output_dim, dropout_rate)

    output = model(adjacency_matrix, feature_matrix)

    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
    output = model(adjacency_matrix, feature_matrix)
    #print(output.size())
    #print(model)
    for epoch in range(300):
        optimizer.zero_grad()

        # Forward pass with noisy input and original adjacency matrix.
        output = model(adjacency_matrix, feature_matrix)

        # Compute reconstruction loss between the output and the original input.
        loss = F.mse_loss(output, feature_matrix)

        # Backward pass and optimization step.
        loss.backward()
        optimizer.step()

        '''
        if (epoch+1) % 10 == 0:
            print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, 100, loss.item()))
        '''
    return output





In [6]:
database_npy = 'midis_array_schumann.npy'

print (os.getcwd())

root_dir = ('../bestekar/midi_files/')
all_midi_paths = glob.glob(os.path.join(root_dir, 'classic/schumann/*mid'))


print (all_midi_paths)
matrix_of_all_midis = []

BARS_LENGTH = 32 # Set the bars length

# All midi have to be in same shape. 

for single_midi_path in all_midi_paths:

    print (single_midi_path)
    matrix_of_single_midi = midi_to_matrix(single_midi_path, length=500)

    if (len(music_to_adjacency_metrix(single_midi_path, BARS_LENGTH))) < (BARS_LENGTH * 2): #Skip the mismatch music
        continue
    
    if (matrix_of_single_midi is not None):

        merge_adj = music_to_adjacency_metrix(single_midi_path, BARS_LENGTH)
        f = music_to_feature(single_midi_path)
        f = reshape_feature(f, BARS_LENGTH * 2)  #降維後的張量大小
        GCN_feature = GCN_trainning(merge_adj, f)
        matrix_of_single_midi = merge_list_and_tensor(matrix_of_single_midi, GCN_feature)
        matrix_of_all_midis.append(matrix_of_single_midi)
        print (matrix_of_single_midi.shape)    

to_cpu_tensor = [item.clone().detach() for item in matrix_of_all_midis]
cpu_tensor = [item.cpu().detach().numpy() for item in to_cpu_tensor]
gpu_tensor = torch.tensor(cpu_tensor).cuda()
cpu_tensor = gpu_tensor.cpu()
midis_array = np.asarray(cpu_tensor)
np.save(database_npy, midis_array)
    
midis_array_raw = midis_array
print ("[Midis_array_raw shape :]",(midis_array_raw.shape))

/home/zed/zed_code/music_gerenation
['../bestekar/midi_files/classic/schumann/scn16_6_format0.mid', '../bestekar/midi_files/classic/schumann/scn15_12.mid', '../bestekar/midi_files/classic/schumann/scn16_5_format0.mid', '../bestekar/midi_files/classic/schumann/scn15_4_format0.mid', '../bestekar/midi_files/classic/schumann/scn15_9_format0.mid', '../bestekar/midi_files/classic/schumann/scn15_8_format0.mid', '../bestekar/midi_files/classic/schumann/scn15_12_format0.mid', '../bestekar/midi_files/classic/schumann/scn16_3_format0.mid', '../bestekar/midi_files/classic/schumann/scn68_12_format0.mid', '../bestekar/midi_files/classic/schumann/scn16_7_format0.mid', '../bestekar/midi_files/classic/schumann/scn68_10_format0.mid', '../bestekar/midi_files/classic/schumann/scn15_7.mid', '../bestekar/midi_files/classic/schumann/scn15_11.mid', '../bestekar/midi_files/classic/schumann/scn15_1.mid', '../bestekar/midi_files/classic/schumann/scn15_10_format0.mid', '../bestekar/midi_files/classic/schumann/scn