In [4]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import music21 as mc
import pickle
import os
import random
import copy
import pygame as pg

pygame 1.9.6
Hello from the pygame community. https://www.pygame.org/contribute.html


In [5]:
#读取指定后缀的所有文件
def find_all_file(base,suffix):
    for root,ds,fs in os.walk(base):
        for f in fs:
            if f.endswith(suffix):
                fullname=os.path.join(root,f)
                yield fullname

In [6]:
#读取单个midi文件并返回notes
def read_midi_file(file):
    notes=[]
    notes_merge_rest=[]
    print('读取文件:'+file)
    try:
        stream=mc.converter.parse(file)
        instruments=mc.instrument.partitionByInstrument(stream)
        for instrument in instruments.parts:
            print(str(instrument))
            if 'Piano' in str(instrument):
                elements=instrument.recurse()
                for element in elements:
                    if isinstance(element,mc.note.Note):
                        #机智的操作要来了
                        ###############
                        if element.duration.quarterLength>1:
                            element.duration.quarterLength=int(element.duration.quarterLength)
                        elif element.duration.quarterLength>0.3 and element.duration.quarterLength<0.4:
                            element.duration.quarterLength=0.25
                        elif element.duration.quarterLength>0.6 and element.duration.quarterLength<0.7:
                            element.duration.quarterLength=0.75
                        if element.duration.quarterLength>4.0:
                            element.duration.quarterLength=4.0
                        ###############
                        notes.append(str(element.pitch)+'|'+str(float(element.duration.quarterLength)))
                    elif isinstance(element,mc.chord.Chord):
                        s='^'.join(str(i) for i in element.normalOrder)
                        #机智的操作要来了
                        ###############
                        if element.duration.quarterLength>1:
                            element.duration.quarterLength=int(element.duration.quarterLength)
                        elif element.duration.quarterLength>0.3 and element.duration.quarterLength<0.4:
                            element.duration.quarterLength=0.25
                        elif element.duration.quarterLength>0.6 and element.duration.quarterLength<0.7:
                            element.duration.quarterLength=0.75
                        if element.duration.quarterLength>4.0:
                            element.duration.quarterLength=4.0
                        ###############
                        notes.append(s+'|'+str(float(element.duration.quarterLength)))
                    elif isinstance(element,mc.note.Rest):
                        if notes:
                            #机智的操作要来了
                            ###############
                            if element.duration.quarterLength>1:
                                element.duration.quarterLength=int(element.duration.quarterLength)
                            elif element.duration.quarterLength>0.3 and element.duration.quarterLength<0.4:
                                element.duration.quarterLength=0.25
                            elif element.duration.quarterLength>0.6 and element.duration.quarterLength<0.7:
                                element.duration.quarterLength=0.75
                            if element.duration.quarterLength>4.0:
                                element.duration.quarterLength=4.0
                            ###############
                            #已经存在音符才放入休止，防止开头的一堆休止符号
                            notes.append(' '+'|'+str(float(element.duration.quarterLength)))
        #处理Rest，合并冗余的rest，防止模型产生rest的几率飙升
        i=0
        while(i<len(notes)):
            a,b=notes[i].split('|')
            if a==' ':
                j=i+1
                duration=float(b)
                while(j<len(notes)):
                    c,d=notes[j].split('|')
                    if c==' ':
                        duration+=float(d)
                        j+=1
                    else:
                        i=j-1
                        #机智的操作要来了
                        ###############
                        if duration>1:
                            duration=int(duration)
                        elif duration>0.3 and duration<0.4:
                            duration=0.25
                        elif duration>0.6 and duration<0.7:
                            duration=0.75
                        if duration>4.0:
                            duration=4.0
                        ###############
                        notes_merge_rest.append(' |'+str(float(duration)))
                        break
            else:
                notes_merge_rest.append(notes[i])
            i+=1    
    except Exception:
        print('出错:'+file)
        return notes_merge_rest
    
    return notes_merge_rest

In [9]:
def save_data():
    sum_notes=[]
    note2num={}
    num2note={}
    base = 'Files/jazz/'
    for file in find_all_file(base,'.mid'):
        filenames=file.split('/')
        filename=filenames[len(filenames)-1]
        filename=filename.rstrip('.mid')
        notes = read_midi_file(file)
        for note in notes:
            sum_notes.append(note)
        if len(notes)>0:
            if not os.path.exists("data"):
                os.mkdir("data")
            with open('data/'+filename+'.bin', 'wb') as f:
                pickle.dump(notes, f)
    notes_member=sorted(set(sum_notes))
    for i in range(len(notes_member)):
        note2num[notes_member[i]]=i
        num2note[str(i)]=notes_member[i]
    print(note2num)    
    if not os.path.exists("data"):
        os.mkdir("data")
    with open('data/'+'note2num'+'.bin', 'wb') as f:
        pickle.dump(note2num, f)
    with open('data/'+'num2note'+'.bin', 'wb') as f:
        pickle.dump(num2note, f)

In [10]:
def load_data():
    base = 'data/'
    notes=[]
    sum_notes=[]
    note2num={}
    num2note={}
    for file in find_all_file(base,'.bin'):
        if file!='data/note2num.bin' and file!='data/num2note.bin':
            with open(file,'rb') as f:
                note_group=pickle.load(f)
                notes.append(note_group)
                for note in note_group:
                    sum_notes.append(note)
#     notes_member=sorted(set(sum_notes))
#     for i in range(len(notes_member)):
#         note2num[notes_member[i]]=i
#         num2note[str(i)]=notes_member[i]
    with open('data/note2num.bin','rb') as f:
        note2num=pickle.load(f)
    with open('data/num2note.bin','rb') as f:
        num2note=pickle.load(f)
    return notes,note2num,num2note

In [11]:
def data_processing(notes,note2num,num2note):
    predict_length=128
    train_x=[]
    train_y=[]
    for note_group in notes:
        for i in range(len(note_group)-predict_length-1):
            note_list=note_group[i:i+predict_length]
            for j in range(len(note_list)):
                note_list[j]=note2num[note_list[j]]
            train_x.append(note_list)
            train_y.append(note2num[note_group[i+predict_length]])
    n=len(train_x)
    train_x=np.reshape(train_x,(n,predict_length,1))
    train_x=train_x/float(len(note2num))
    train_y=tf.keras.utils.to_categorical(train_y)
    return train_x,train_y    

In [12]:
#写一个LossHistory类，保存loss和acc
class LossHistory(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.losses = {'batch':[], 'epoch':[]}
        self.accuracy = {'batch':[], 'epoch':[]}
        self.val_loss = {'batch':[], 'epoch':[]}
        self.val_acc = {'batch':[], 'epoch':[]}
    def on_batch_end(self, batch, logs={}):
        self.losses['batch'].append(logs.get('loss'))
        self.accuracy['batch'].append(logs.get('acc'))
        self.val_loss['batch'].append(logs.get('val_loss'))
        self.val_acc['batch'].append(logs.get('val_acc'))
    def on_epoch_end(self, batch, logs={}):
        self.losses['epoch'].append(logs.get('loss'))
        self.accuracy['epoch'].append(logs.get('acc'))
        self.val_loss['epoch'].append(logs.get('val_loss'))
        self.val_acc['epoch'].append(logs.get('val_acc'))
    def loss_plot(self, loss_type):
        iters = range(len(self.losses[loss_type]))
        plt.figure()
        # loss
        plt.plot(iters, self.losses[loss_type], 'g', label='train loss')
        plt.grid(True)
        plt.xlabel(loss_type)
        plt.ylabel('acc-loss')
        plt.legend(loc="upper right")
        plt.show()

def train(file=None):
    notes,note2num,num2note=load_data()
    train_x,train_y=data_processing(notes,note2num,num2note)
    print(train_x.shape)
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.LSTM(
        256,  
        input_shape=(train_x.shape[1], train_x.shape[2]),
        return_sequences=True  # 返回所有的输出序列（Sequences）
    ))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.Dropout(0.3))
    model.add(tf.keras.layers.Dense(512))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.Dropout(0.3))
    model.add(tf.keras.layers.Dense(512))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.Dropout(0.3))
    model.add(tf.keras.layers.LSTM(256, return_sequences=False))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.Dropout(0.3))
    model.add(tf.keras.layers.Dense(len(note2num)))
    model.add(tf.keras.layers.Activation('softmax'))
    # 计算误差（先用 Softmax 计算百分比概率，再用 Cross entropy（交叉熵）来计算百分比概率和对应的独热码之间的误差）
    model.compile(loss='categorical_crossentropy', optimizer='adam')
    filepath = "model/weights.{epoch:02d}-{loss:.4f}.hdf5"
 
    # 用 Checkpoint（检查点）文件在每一个 Epoch 结束时保存模型的参数（Weights）
    # 不怕训练过程中丢失模型参数。可以在我们对 Loss（损失）满意了的时候随时停止训练
    checkpoint = tf.keras.callbacks.ModelCheckpoint(
        filepath,  # 保存的文件路径
        monitor='loss',  # 监控的对象是 损失（loss）
        verbose=0,
        period=50
    )
    callbacks_list = [checkpoint]
    if file is not None:
        model.load_weights(file)
    else:
        #创建一个实例history
        history = LossHistory()
        # model.load_weights('model/weights.100-3.7315.hdf5')
        # 用 fit 方法来训练模型
        print(model.summary())
        model.fit(train_x, train_y, epochs=3, batch_size=128, callbacks=[history])
        #绘制acc-loss曲线
        history.loss_plot('epoch')
    return model    

In [14]:
train()

(49347, 128, 1)
Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
lstm_2 (LSTM)                (None, 128, 256)          264192    
_________________________________________________________________
batch_normalization_4 (Batch (None, 128, 256)          1024      
_________________________________________________________________
dropout_4 (Dropout)          (None, 128, 256)          0         
_________________________________________________________________
dense_3 (Dense)              (None, 128, 512)          131584    
_________________________________________________________________
batch_normalization_5 (Batch (None, 128, 512)          2048      
_________________________________________________________________
dropout_5 (Dropout)          (None, 128, 512)          0         
_________________________________________________________________
dense_4 (Dense)              (None, 12

KeyboardInterrupt: 

In [113]:
def generate(choice):
    predict_length=128
    file='model/轻音乐.hdf5'
    notes,note2num,num2note=load_data()
    model=train(file)
    #notes_len=len(notes)
    #choice=int(random.random()*notes_len)
    #选择某首歌作为引子
    list_x=read_midi_file(choice)
    
    
    #选择全空做引子
#     list_x=[]
#     for i in range(128):
#         list_x.append(' ')
        
        
    for i in range(len(list_x)):
        list_x[i]=note2num[list_x[i]]
    start=len(list_x)-128-1
    test_x=[]
    res=[]
    for i in range(start,start+predict_length):
        test_x.append(list_x[i])  
    for i in range(300):
        print(i,end=' ')
        input_x=copy.deepcopy(test_x)
        input_x=np.reshape(input_x,(1,len(input_x),1))
        input_x=input_x/float(len(num2note))
        test_y=model.predict(input_x)
        new_num=np.argmax(test_y)
        new_note=num2note[str(new_num)]
        res.append(new_note)
        test_x.append(new_num)
        test_x=test_x[1:]
    #生成midi文件
    offset = 0
    music = []
    # 生成 Note（音符）或 Chord（和弦）对象
    for note in res:
        note,duration=note.split('|')
        if '^' in note or note.isdigit():
            chord_list = note.split('^')
            music_notes = []
            for i in chord_list:
                music_note = mc.note.Note(int(i))
                music_note.duration=mc.duration.Duration(float(duration))
                music_note.storedInstrument = mc.instrument.Piano()  # 乐器用钢琴 (piano)
                music_notes.append(music_note)
            chord = mc.chord.Chord(music_notes)
            chord.offset = offset
            music.append(chord)
        #是停顿符rest
        elif note==' ':
            music_note = mc.note.Rest()
            music_note.offset = offset
            music_note.duration=mc.duration.Duration(float(duration))
            music.append(music_note)
        # 是 Note
        else:
            music_note = mc.note.Note(note)
            music_note.offset = offset
            music_note.duration=mc.duration.Duration(float(duration))
            music_note.storedInstrument = mc.instrument.Piano()
            music.append(music_note)
 
        # 每次迭代都将偏移增加，这样才不会交叠覆盖
        offset += 0.5
 
    # 创建音乐流（Stream）
    res_stream = mc.stream.Stream(music)
    
    path_list=choice.split('/')
    path=path_list[-1]
    
    # 写入 MIDI 文件
    res_stream.write('midi', fp='生成音乐/'+path)
    

In [116]:
generate('Files/轻音乐/你离开的真相-The Truth That You Leave.mid')

(22072, 128, 1)
读取文件:Files/轻音乐/你离开的真相-The Truth That You Leave.mid
<music21.stream.Part Piano>
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253

In [62]:
base = 'Files/pink floyd/'
for file in find_all_file(base,'.mid'):
    try:
        generate(file)
    except Exception:
        print('出错'+file)
        continue

(61395, 128, 1)
读取文件:Files/pink floyd/bluesky.mid
<music21.stream.Part Acoustic Guitar>
<music21.stream.Part Electric Bass>
<music21.stream.Part Voice>
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 

KeyboardInterrupt: 

In [None]:
def test_generate(choice):
    notes,note2num,num2note=load_data()
    res=read_midi_file(choice)
    #生成midi文件
    offset = 0
    music = []
    # 生成 Note（音符）或 Chord（和弦）对象
    for note in res:
        #休止符直接生成不设置duration
        if note==' ':
            music_note = mc.note.Rest()
            music_note.offset = offset
            music.append(music_note)
        # 是 Chord。格式例如： 4^15^7
        else:
            note,duration=note.split('|')
            if '^' in note:
                chord_list = note.split('^')
                music_notes = []
                for i in chord_list:
                    music_note = mc.note.Note(int(i))
                    music_note.duration=mc.duration.Duration(duration)
                    music_note.storedInstrument = mc.instrument.Piano()  # 乐器用钢琴 (piano)
                    music_notes.append(music_note)
                chord = mc.chord.Chord(music_notes)
                chord.offset = offset
                music.append(chord)
            # 是 Note
            else:
                music_note = mc.note.Note(note)
                music_note.offset = offset
                music_note.duration=mc.duration.Duration(duration)
                music_note.storedInstrument = mc.instrument.Piano()
                music.append(music_note)
 
        # 每次迭代都将偏移增加，这样才不会交叠覆盖
        offset += 0.5
    # 创建音乐流（Stream）
    res_stream = mc.stream.Stream(music)
 
    # 写入 MIDI 文件
    res_stream.write('midi', fp='example.mid')
    

In [30]:
test_generate()

读取文件:Files/AbbeyRoad/ComeTogether.mid
<music21.stream.Part Fretless E.Bass>
<music21.stream.Part Abbey Road 1969>
<music21.stream.Part Fretless Bass>
<music21.stream.Part Electric Guitar>
<music21.stream.Part Electric Organ>
<music21.stream.Part Elect Piano 2 Bnk 16>
<music21.stream.Part Piano>


In [21]:
import pygame as pg
def play_music(music_file):
  '''
  stream music with mixer.music module in blocking manner
  this will stream the sound from disk while playing
  '''
  clock = pg.time.Clock()
  try:
    pg.mixer.music.load(music_file)
    print("Music file {} loaded!".format(music_file))
  except pygame.error:
    print("File {} not found! {}".format(music_file, pg.get_error()))
    return
  pg.mixer.music.play()
  # check if playback has finished
  while pg.mixer.music.get_busy():
    clock.tick(30)

In [117]:
# pick a midi or MP3 music file you have in the working folder
# or give full pathname
music_file = "生成音乐/你离开的真相-The Truth That You Leave.mid"
#music_file = "Drumtrack.mp3"
freq = 44100  # audio CD quality
bitsize = -16  # unsigned 16 bit
channels = 2  # 1 is mono, 2 is stereo
buffer = 2048  # number of samples (experiment to get right sound)
pg.mixer.init(freq, bitsize, channels, buffer)
# optional volume 0 to 1.0
pg.mixer.music.set_volume(0.8)
try:
  play_music(music_file)
except KeyboardInterrupt:
  # if user hits Ctrl/C then exit
  # (works only in console mode)
  pg.mixer.music.fadeout(1000)
  pg.mixer.music.stop()
  raise SystemExit

Music file 生成音乐/你离开的真相-The Truth That You Leave.mid loaded!
