In [None]:
from music21 import corpus, stream, note, midi
import numpy as np
import math

class Encoding_BACH_From_Score(): #编码器
    
    def __init__(self, appointed_voice_part = 0, min_tick = 0.25, key_mode = 'major'):
        
        self.appointed_voice_part = appointed_voice_part #给定的声部
        self.min_tick = min_tick #最小时间单位
        self.key_mode = key_mode #调性（大调或小调）
        self.song_list = [] #输入曲目总表
        self.data_max = -100 #指定声部的音高最大值
        self.data_min = 100 #指定声部的音高最小值
        self.target_max = -100 #其他声部的音差最大值
        self.target_min = 100 #其他声部的音差最小值
        self.trans_pitch = {'A- major':4, 'E- major':-3, 'B- major':2, 'F major':7, 'C major':0, \
                            'G major':5, 'D major':-2, 'A major':3, 'E major':-4, } #移调字典
        
    def get_appointed_part(self, song): #对指定声部编码
        
        appointed_part = [] #准备提取给定声部
        now_tick = 0 #初始时间指针
        for note in song.parts[self.appointed_voice_part].flat.notes: #对每个音符逐步编码并加入指定声部列表
            while note.offset > now_tick: #处理休止符
                appointed_part.append([now_tick, 0, 1]) #利用三个维度表示音的状态
                now_tick += self.min_tick
            note_duration_tick = int(note.duration.quarterLength / self.min_tick) #计算每个音持续多少最小时间单位
            note_pitch = note.pitches[0].midi #得到该音符的音高数（返回值为整数，在0-127之间）
            #考虑遍历计算得到data集合音域宽度
            if note_pitch != 0: #0值pitch在music21中为休止，没有音高
                if note_pitch > self.data_max:
                    self.data_max = note_pitch
                if note_pitch < self.data_min:
                    self.data_min = note_pitch
            for tick in range(note_duration_tick): #以最小时间为单位添加音符状态
                if tick == 0:
                    appointed_part.append([now_tick, note_pitch, 1]) #新音符的第三维状态为1
                else:
                    appointed_part.append([now_tick, note_pitch, 0]) #若持续则第三维状态为0
                now_tick += self.min_tick #时间指针前进
        
        return appointed_part #返回被提取声部列表
    
    def pitch_diff(self, base_pitch, new_pitch): #比较两个音相差的半音数
        
        if base_pitch*new_pitch == 0: #休止符不比较
            return 0
        else:
            return new_pitch-base_pitch  
    
    def get_other_parts(self, song): #获取所有编码后的谱面信息
        
        appointed_part = self.get_appointed_part(song) #获取指定声部列表
        uncoded_part = list(range(4))
        uncoded_part.remove(self.appointed_voice_part) #得到还未编码的声部
        
        for voice_index in uncoded_part: #对其他声部循环
            part_note = song.parts[voice_index].flat.notes #得到其他某一个声部中所有音的信息
            note_index = 0 #初始化音符指针
            for encoded_tick in appointed_part: #对指定声部循环,更新列表信息
                now_tick = encoded_tick[0] #获取当前时间指针
                now_note = part_note[note_index] #获取当前时间所对应的音
                if now_note.offset + now_note.duration.quarterLength <= now_tick: #时间指针达到或超过音符指针
                    note_index += 1 #音符指针前进
                now_note = part_note[note_index] #查看是否音符指针是否前进，若是，则立即更新
                part_note_pitch = now_note.pitches[0].midi #获取当前时间点的音高
                if now_note.offset == now_tick: #新音符的第2个状态为1
                    encoded_tick.extend([self.pitch_diff(encoded_tick[1], part_note_pitch), 1])
                elif now_note.offset < now_tick: #若持续则第2个状态为0
                    encoded_tick.extend([self.pitch_diff(encoded_tick[1], part_note_pitch), 0])
                else: #补充休止符
                    encoded_tick.extend([0, 1])
                #考虑遍历计算得到target集合音差宽度，休止符不算
                if encoded_tick[1]*part_note_pitch != 0:
                    if self.pitch_diff(encoded_tick[1], part_note_pitch) > self.target_max:
                        self.target_max = self.pitch_diff(encoded_tick[1], part_note_pitch)
                    if self.pitch_diff(encoded_tick[1], part_note_pitch) < self.target_min:
                        self.target_min = self.pitch_diff(encoded_tick[1], part_note_pitch)
                    
        return appointed_part #返回全套列表（虽然名称叫appointed_part）
    
    def get_all_chorales(self):
        
        for chorale in corpus.chorales.Iterator(returnType='filename'): #这是巴赫众赞歌材料包，详情见官网
            song = corpus.parse(chorale) #加载该首合唱的全部信息
            if len(song.parts) == 4 and song.analyze('key').mode == self.key_mode:
                old_tune = str(song.analyze('key'))
                #若为指定的声部数量和调性才执行
                song.transpose(self.trans_pitch[str(song.analyze('key'))], inPlace = True) #调性归一化
                get_temp = self.get_other_parts(song)
                if len(get_temp) <= 450: #若过长则删去（选450的原因用matplotlib画一下分布就知道了）
                    self.song_list.append(get_temp) #加入新的乐曲
                    print('%4d: %20s, %10s → %6s'%(len(self.song_list), chorale, \
                                                        old_tune, song.analyze('key')))
                    
        return self.song_list #返回所有曲目编码列表
    
    def get_dataset(self): #数据集再编码后准备输入网络
        
        song_tick_length,X,Y = [],[],[] #分别定义时间长度列表，训练集{X,Y}
        for song in self.song_list: song_tick_length.append(len(song)) #获取每首乐曲的时间总长
        max_song_tick_length = int(np.max(np.array(song_tick_length)))
        print('最大长度为：%d 曲目量为：%d'%(max_song_tick_length,len(self.song_list)))
        #准备第二次编码
        for song in self.song_list:
            x,y = [],[]
            for song_tick in song:
                #利用局部oneHot编码对输入集合进行再次编码
                oneHot = [0]*(self.data_max - self.data_min + 1) #data音域宽度
                if song_tick[1] != 0: oneHot[song_tick[1] - self.data_min] = 1 #oneHot编码
                oneHot.append(song_tick[2])
                x.append(oneHot) #加入是否连续的信息
                #利用局部oneHot编码对其他声部集合进行再次编码
                other_parts = [] #准备填充其他声部的编码信息
                for part in range(3):
                    part_index = 3 + part*2 #参见原先第一次编码方式即可知
                    oneHot = [0]*(self.target_max - self.target_min + 1) #target音差宽度
                    if song_tick[part_index] != 0: oneHot[song_tick[part_index] - self.target_min] = 1
                    oneHot.append(song_tick[part_index + 1])
                    other_parts.append(oneHot)
                y.append(other_parts)
            #准备padding操作
            padding_length = max_song_tick_length - len(x) #序列需要padding的长度
            for turn in range(padding_length):
                x.append([0]*(self.data_max - self.data_min + 2)) #data集合的填充
                y.append([[0]*(self.target_max - self.target_min + 2)]*3) #target集合的填充
            X.append(x)
            Y.append(y)
        #此处进行格式转换（因为上面一些玄学的原因导致第三层是list类型的表，有人知道怎么直接弄请告诉我）
        npX,npY = [],[]
        for i in range(len(self.song_list)):
            for j in range(max_song_tick_length):
                for k in range(self.data_max - self.data_min + 2): #其实为种类数+1的值，只是计算种类有一个加1
                    npX.append(X[i][j][k])
        for i in range(len(self.song_list)):
            for j in range(max_song_tick_length):
                for k in range(3): #注意此处还有一维
                    for m in range(self.target_max - self.target_min + 2):
                        npY.append(Y[i][j][k][m])
        #制作词典，准备返回数据集
        dataset = {'data': np.array(npX).reshape(len(self.song_list),\
                            max_song_tick_length,(self.data_max - self.data_min + 2)), \
                   'target': np.array(npY).reshape(len(self.song_list),\
                            max_song_tick_length,3,(self.target_max - self.target_min + 2)), \
                   'seq_length': np.array(song_tick_length)}
        
        return dataset
    
    def load_new_chorales(self, song): 
        
        song = corpus.parse(song) #加载音乐信息
        song.transpose(self.trans_pitch[str(song.analyze('key'))], inPlace = True) #移调
        get_temp = self.get_other_parts(song) #得到四声部第一次编码结果
        self.song_list.append(get_temp) #加入song_list的尾部
    
    def get_new_input_and_length(self):
        
        new_dataset = self.get_dataset() #此时song_list已经更新
        in_put = new_dataset['data'][-5:] #由于是加入song_list的尾部，又每批是5个，故取后五个值
        seq_length = new_dataset['seq_length'][-5:]
        
        return in_put,seq_length #返回批次的信息，最后一个是需要预测的音乐
################################################################################################################
import torch
from torch import nn
from torch.autograd import Variable
import tensorflow as tf

class BACH_Net(nn.Module):
    
    #网络的基本结构较简单，3个有300个隐藏层的LSTM，然后加两个线性层，最后是一个激活层和归一化层
    #LSTM非双向，双向做下来效果和单向差不多，但时间增加了，不划算
    #损失函数是带有mask的交叉熵函数
    
    def __init__(self, input_size, hidden_size, output_size, num_layers, max_length, lengths=[]):
        super(BACH_Net,self).__init__()
        self.lstm_layer = nn.LSTM(input_size,hidden_size,num_layers) #这里一定要搞清楚输入输出是什么，很重要
        self.Softmax = nn.Softmax(dim=3) #在Pytorch的0.4.0版本及其之后的版本，Softmax必须指定维度
        self.Sigmoid = nn.Sigmoid() #Sigmoid层
        self.Linearmiddle = nn.Linear(hidden_size,hidden_size) #第一个线性层
        self.Linearnote = nn.Linear(hidden_size,3*(output_size-1)) #第二个线性层中的音符层
        self.Linearchangenote = nn.Linear(hidden_size,3*1) #第二个线性层中的换音层
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.lengths = lengths #长度为5个，与batch数一致
        self.max_length = max_length #是batch最大长度，不是序列padding后的，padding后长度都一样
        
    def forward(self,x):
        #下面其实就做了一件事，根据batch做动态rnn输入
        #tensorflow里面只要调用dynamic_rnn就可以，但Pytorch不行
        #注意Pytorch的pack_padded_sequence函数默认序列长度递减输入
        #################################################################################
        _, idx_sort = torch.sort(self.lengths, dim=0, descending=True)
        _, idx_unsort = torch.sort(idx_sort, dim=0)
        length = torch.tensor(list(self.lengths[idx_sort])) #降序排列
        x = x[:,:length[0]]
        x_zeros = torch.zeros(5,self.max_length-length[0],self.hidden_size) #padding的长度
        x = x.index_select(0, idx_sort)
        x_packed = nn.utils.rnn.pack_padded_sequence(x,length,batch_first=True)
        x_packed,_ = self.lstm_layer(x_packed) #pack后的序列再输入lstm网络
        x_padded = nn.utils.rnn.pad_packed_sequence(x_packed,batch_first=True)
        x = x_padded[0].index_select(0, idx_unsort) #还原顺序
        if self.max_length-length[0]!=0:
            if torch.cuda.is_available():
                x = torch.cat((x.cuda(),x_zeros.cuda()),1)
            else:
                x = torch.cat((x,x_zeros),1) #将原始序列与padding的0拼接起来
        #################################################################################
        x = torch.reshape(x, [-1,self.hidden_size]) #准备转入线性层
        x = self.Linearmiddle(x) #传入第一层线性层
        
        notes = self.Linearnote(x)
        #下面需要注意shape要匹配好
        notes = self.Softmax(torch.reshape(notes,[5,self.max_length,3,(self.output_size-1)]))
        changenote = self.Linearchangenote(x)
        #changenote最后是一维，和note不一样
        changenote = self.Sigmoid(torch.reshape(changenote,[5, self.max_length,3,1]))
        
        x = torch.cat((notes, changenote),3) #在最后一个维度进行拼接

        return x

class LossFunction(nn.Module):
    
    def __init__(self):
        super(LossFunction, self).__init__()

    def forward(self,p,t):
        
        #此处为交叉熵函数，定义很简单，但必须利用torch内置函数，才能自动求导反向传播
        #比如在tensorflow中有clip截断函数，但torch中只能自己用数学方法凑出来，如下是我想的一个办法
        c_e = t*torch.log(1e-10*(1-p>1e-10).float()+p.mul((p>1e-10).float()).mul((p<1.0).float())+\
                          1.0*(1-(p<1.0).float()).float())+ (1-t)*torch.log(1e-10*(1-(1-p)>1e-10).float()+\
        (1-p).mul(((1-p)>1e-10).float()).mul(((1-p)<1.0).float())+1.0*(1-((1-p)<1.0).float()).float())
        c_e = -torch.sum(torch.sum(c_e,3),2) #此步为止为交叉熵定义，但此处需要有mask作用，避免padding值也算入误差
        mask = torch.sign(torch.max(torch.max(torch.abs(t),3)[0],2)[0]) #此处维度需要看清楚，在哪一个维加
        c_e*= mask #套上mask
        c_e = torch.sum(c_e,1)
        c_e/= torch.sum(mask,1) #输出列表

        return torch.mean(c_e.float()) #输出实数
################################################################################################################
from music21 import corpus, stream, note, midi
from Encoder import Encoding_BACH_From_Score
import numpy as np
import math

class Decoding_BACH_To_Score(): #解码器
    
    def __init__(self, Encoder, new_short_code = []):
        
        self.new_short_code = new_short_code #存放第一次解码后的信息
        self.Encoder = Encoder #由于要用到前面编码器的内部信息，导入该实例（还是一个类）
        
    def decode_oneHot_to_short_code(self, new_song, new_result, continue_pro=0.5, change_pro=0.7):
        #该函数的功能是进行第一次解码
        #但是第一次解码的结果与第一次编码的结果是不一样的，后者是存储音差，前者是存储数组下标，需要第二次解码时再转换
        self.new_short_code = []
        old_pitches = [0]*3 #音符指针初始化
        now_tick = 0 #时间指针初始化

        for tick_index in range(len(new_song)): #对时间节点循环
            tick = [now_tick, new_song[tick_index][1], new_song[tick_index][2]] #先加载给定声部信息
            for part in range(3): #对声部循环
                old_pitch = old_pitches[part] #记录上一次的音高
                new_pitch = np.argmax(new_result[tick_index, part, :len(new_result[0][0])-1]) #返回下标
                #此处的想法是，如果换音的概率不是太大，那么就不要换
                whether_changenote = new_result[tick_index, part, -1] > change_pro
                #这里与上面想法类似，如果概率最大的音的概率并不高，那么就保留前面那个音，这里用到了前一个音的音高信息
                if new_result[tick_index, part, old_pitch] > continue_pro:
                    pitch = old_pitch
                elif not whether_changenote: #决定是不是changenote
                    pitch = old_pitch
                else: pitch = new_pitch #若上述判断都通过，那么此时就应当改变音的状态了
                tick.append(pitch) #加入“音高”信息（其实为下标，不是真的音高）
                tick.append(int(whether_changenote)) #加入连续信息
                old_pitches[part] = pitch #保留此轮信息进入下一轮
            self.new_short_code.append(tick)
            now_tick += self.Encoder.min_tick #时间指针前进一个单位
    
    def change_tick(self, L, index):
        #这个函数是用来处理get_score中的tick操作，目的是将给定声部的两个值调换至正确的位置
        L_temp = L[1:3].copy() #copy则不会复制变量地址
        L.pop(1)
        L.pop(1) #因为前面index为1的已经不见了，所以还是去除现在index为1的
        L.insert(index *2 + 1, L_temp[0]) #将给定声部信息插入到正确位置，以备第二次解码
        L.insert(index *2 + 2, L_temp[1])

        return L
    
    def get_score(self):
        #该函数的功能是进行第二次解码，并输出到乐谱Score类型中
        new_score = stream.Score() #建立空乐谱
        #准备音符指针与声部初始化
        new_notes = [] 
        for part in range(4):
            new_score.insert(0, stream.Part())
            new_notes.append('ready?')
        for tick in self.new_short_code:
            #调用函数交换至正确顺序，思考若不调换会有什么后果
            tick = self.change_tick(tick, self.Encoder.appointed_voice_part) 
            for part_index in range(4): #对所有声部循环
                part = new_score.parts[part_index] #加载声部信息
                #准备使用下标索引
                pitch_index = part_index *2 + 1
                changenote_index = part_index *2 + 2
                #若不换音，则延续上一个音
                if new_notes[part_index] != 'ready?' and tick[changenote_index] == 0:
                    new_notes[part_index].quarterLength += self.Encoder.min_tick
                #如果要换音了
                if tick[changenote_index] == 1:
                    if tick[pitch_index] > 0: #非休止
                        new_ready_note = note.Note() #创建新的音 
                        new_notes[part_index] = new_ready_note
                        new_ready_note.offset = tick[0] #offset为该音的位置，设置为当前时间指针
                        if part_index == self.Encoder.appointed_voice_part:
                            #直接索引到tick该音的midi音高
                            new_ready_note.pitch.midi = tick[pitch_index]
                        else:
                            #此处的逻辑可能要草稿纸上画一画，需要完全弄清楚第一、第二次编码和第一次解码的输入输出才可以
                            new_ready_note.pitch.midi = tick[self.Encoder.appointed_voice_part *2 + 1] \
                            + tick[pitch_index] + self.Encoder.target_min
                        new_ready_note.quarterLength = self.Encoder.min_tick #调用最小时间单位设置长度
                        part.append(new_ready_note)
                    else:
                        new_ready_note = note.Rest() #建立休止符
                        new_notes[part_index] = new_ready_note #更新音符指针信息
                        new_ready_note.offset = tick[0]
                        new_ready_note.quarterLength = self.Encoder.min_tick
                        part.append(new_ready_note) #休止符加入声部信息
        
        return new_score #返回Score类型乐谱

In [None]:
from music21 import corpus, stream, note, midi #music21包
from Encoder import Encoding_BACH_From_Score #编码器
from Decoder import Decoding_BACH_To_Score #解码器
from Network import BACH_Net,LossFunction #搭建的网络
from torch.autograd import Variable
import tensorflow as tf
from torch import nn
import numpy as np
import torch
import math

In [None]:
#将巴赫编码器导入（part为0-3的整数，对应SATB声部，默认0）
Encoder = Encoding_BACH_From_Score(appointed_voice_part=0)
Encoder.get_all_chorales() #加载所有指定的众赞歌信息并进行第一次编码
dataset = Encoder.get_dataset() #第二次编码后返回结果

In [None]:
'''
#保存训练数据，在服务器上运行时直接上传本地数据，免去安装music21包的过程
np.save(file="train_data.npy", arr=dataset['data'])
np.save(file="train_target.npy", arr=dataset['target'])
np.save(file="train_length.npy", arr=dataset['seq_length'])
'''

In [None]:
'''
#直接加载保存的numpy数组
dataset = {'data': np.load(file='train_data0.npy',allow_pickle=True) , \
           'target' : np.load(file='train_target0.npy',allow_pickle=True), \
           'seq_length' : np.load(file='train_length0.npy',allow_pickle=True)}
'''
var_x = Variable(torch.from_numpy(dataset['data'])).float() #加载输入数据
var_y = Variable(torch.from_numpy(dataset['target'])).float() #加载目标数据
var_s = Variable(torch.from_numpy(dataset['seq_length'])).int() #加载序列长度
max_song = var_x.size()[0] #曲目量
max_length = var_x.size()[1] #时间单位长度

In [None]:
#torch.cuda.set_device(0) #貌似服务器上不加这句会报错，因为没有指定哪一块GPU（要运行两次，第一次会报错）
if torch.cuda.is_available(): #判断是否能用GPU加速
    #定义搭建的模型与损失函数并初始化
    model = BACH_Net(np.shape(dataset['data'])[2],300,np.shape(dataset['target'])[3],3,max_length).cuda()
    criterion = LossFunction().cuda() #定义损失函数
else:
    model = BACH_Net(np.shape(dataset['data'])[2],300,np.shape(dataset['target'])[3],3,max_length)
    criterion = LossFunction() 
optimizer = torch.optim.RMSprop(model.parameters(), lr=3e-3) #定义优化器与学习率

In [None]:
#服务器训练用，但本地测试也可以用
'''
for epoch in range(1000):
    print('第%d轮'%(epoch+1))
    L=[]
    for batchnum in range(math.floor(max_song / 5)): #分批训练，每批5个
        batch_start = batchnum * 5
        batch_end = batchnum * 5 + 5
        batch_end = min(max_song, batch_end)
        if torch.cuda.is_available():
            batch_x = var_x[batch_start : batch_end].cuda()
            batch_y = var_y[batch_start : batch_end].cuda()
            batch_s = var_s[batch_start : batch_end].cuda()
            #若Pytorch的版本为1.0+那么此处应当跳过某一个批次
            #因为它含有具备最大有效长度的序列，在高版本中不允许出现，构建某一维为0的张量
            #if batch_start == 50: continue
        else:
            batch_x = var_x[batch_start : batch_end]
            batch_y = var_y[batch_start : batch_end]
            batch_s = var_s[batch_start : batch_end]
        model.lengths = batch_s #输入此批序列的有效
        #torch.cuda.set_device(0) #运行两次，理由上面已经讲过了
        out = model(batch_x) #该轮输出的预测结果
        loss = criterion(out, batch_y) #计算损失偏差
        #loss如果一直在某一个值周围徘徊，可以考虑重启网络，可能运气不好，初始化状态较差
        #Pytorch的lstm是均匀分布初始化的，与tensorflow不同，详情见官网
        print('%d~%d:'%(batch_start,batch_end),round(loss.item(),6))
        optimizer.zero_grad() #梯度清零
        loss.backward() #反向传播
        optimizer.step() #优化器迭代
        L.append(loss.item())
    print('该轮平均误差为：%f'%round(np.mean(np.array(L)),6))
'''

In [None]:
#torch.save(model,'model.pkl')
model = torch.load('Model_S.pkl', map_location='cpu') #由于服务器是GPU训练的模型，本机若是CPU应如前备注
#如果有Warning不用管，正常

In [None]:
song = 'bach/new.mxl' #新的旋律线，注意待填充的声部要和编码器初始化时的一致
Encoder.load_new_chorales(song) #加载后进行第一次编码
new_input, new_length = Encoder.get_new_input_and_length() #第二次编码后返回序列信息
new_input = Variable(torch.from_numpy(new_input)).float()
new_length = Variable(torch.from_numpy(new_length)).int()
model.lengths = new_length
new_result = model(new_input) #再次走一遍网络

In [None]:
Decoder = Decoding_BACH_To_Score(Encoder) #Encoder是最先定义的解码器，其中储存有先前的信息，此部进行第一次解码
Decoder.decode_oneHot_to_short_code(Encoder.get_appointed_part(corpus.parse(song))\
                                    , new_result[4].detach().numpy()) #进行第二次解码，这里是取batch的最后一个
new_score = Decoder.get_score() #生成Score类型的最终谱面

In [None]:
#new_score.write('path...') 保存谱面用这个
new_score.show() #显示谱面
new_score.show('midi') #显示音乐播放器