In [5]:
# 导入相关模块
import os 
import glob 
import time
import subprocess 
import pickle
import numpy as np
from pickle import dump, load 
from music21 import converter, instrument, note, chord, stream

import torch
import torch.utils.data as DataSet
import torch.nn as nn
import torch.optim
from torch.autograd import Variable
import torch.nn.functional as F

# 读取作曲任务所需序列数据
musicians = load(open('./data/musicians', 'rb'))
namelist = load(open('./data/namelist', 'rb'))
seqs = load(open('./data/seqs', 'rb'))

In [11]:
len(seqs)

589

In [6]:
# 定义序列编码函数
def seq_encode(seqs):
    seq2idx = {}
    seqs_digit = []
    
    i = 1
    for seq in seqs:
        for s in seq:
            if seq2idx.get(s) == None:
                seq2idx[s] = i
                i += 1
                
    for seq in seqs:
        seq_digit = []
        for s in seq:
            seq_digit.append(seq2idx[s])
        seqs_digit.append(seq_digit)
    return seq2idx, seqs_digit

seq2idx, seqs_digit = seq_encode(seqs)
print("原始序列")
print(seqs[123][1:100])
print("\n 编码后的结果")
print(seqs_digit[123][1:100])

原始序列
['2.5', '1.4', 'D2', '4.7', '2.5', 'D2', 'A3', 'G3', 'D2', 'B-3', 'A3', 'A3', 'D2', '2.5.9', '1.4.7', 'D2', '1.4.7.9', '2.5.9', 'D2', '2.5.9', '1.4.7.10', 'D2', '1.4.7.10', '2.5.9', '2.5.9', 'D2', 'D4', 'C#4', 'E4', 'C#3', 'D4', 'D3', 'D4', 'C#4', 'E4', 'C#4', 'D4', 'D4', 'D4', 'C4', 'B-3', 'A3', 'G3', 'A3', 'B-3', 'B3', 'A3', 'A2', 'A2', '2.5', '4.7', '5.9', 'A2', '4.7', '2.5', '1.4', 'A2', '2.5', '4.7', '2.5', 'A2', '10.2.5', '9.1.4', 'A2', '8.11.2', '9.1.4', 'A2', 'B-4', 'A4', 'G4', 'F4', 'E4', 'F4', 'G4', 'C4', 'F4', 'B-4', 'B-4', 'A4', 'G4', 'A4', 'B-4', 'B4', 'C5', 'C3', '0.4', '11.2', 'C3', '2.5', '0.4', '0.4', 'C3', '0.4.7', '5.8.0', 'C3', '5.8.0', '0.4.7', '0.4.7', 'C3']

 编码后的结果
[113, 150, 19, 155, 113, 19, 3, 2, 19, 22, 3, 3, 19, 89, 276, 19, 279, 89, 19, 89, 88, 19, 88, 89, 89, 19, 5, 48, 37, 77, 5, 4, 5, 48, 37, 48, 5, 5, 5, 10, 22, 3, 2, 3, 22, 78, 3, 45, 45, 113, 155, 56, 45, 155, 113, 150, 45, 113, 155, 113, 45, 58, 145, 45, 231, 145, 45, 29, 11, 15, 30, 37, 30, 15

In [13]:
### 定义音乐家姓名编码函数
def musician_encode(namelist):
    # 创建音乐家编码字典
    name2idx = {}
    i = 0
    for name in namelist:
        if name2idx.get(name) == None:
                name2idx[name] = i
                i += 1
                
    # 对音乐家列表进行编码
    namelist_digit = []
    for name in namelist:
        namelist_digit.append(name2idx[name])
    return name2idx, namelist_digit

name2idx, namelist_digit = musician_encode(namelist)
print("原始序列")
print(namelist[25:45])
print("\n 编码后的结果")
print(namelist_digit[25:45])

原始序列
['schubert', 'schubert', 'schubert', 'haydn', 'chopin', 'chopin', 'vivaldi', 'schumann', 'mendelsonn', 'schubert', 'schubert', 'schubert', 'schubert', 'schubert', 'schubert', 'mendelsonn', 'schubert', 'schubert', 'schubert', 'liszt']

 编码后的结果
[6, 6, 6, 7, 1, 1, 4, 0, 2, 6, 6, 6, 6, 6, 6, 2, 6, 6, 6, 8]


In [14]:
# 将音乐家姓名编码转为one-hot形式
namelist_digit = F.one_hot(torch.tensor(namelist_digit))
namelist_digit.shape

torch.Size([589, 9])

In [15]:
### 定义生成训练输入输出序列函数
def generate_XY(seqs_digit, namelist, max_len):
    X = []
    Y = []
    i = -1
    for seq_digit in seqs_digit:
        i += 1
        if len(seq_digit) < 1:
            continue

        # 将每首乐曲的最后一个音符作为Y
        Y.append(seq_digit[-1])
        # 将最后一个音符之前的部分作为X，并补齐字符
        x = seq_digit[:-1] + [0]*(max_len - len(seq_digit))
        l = namelist_digit[i].tolist()
        X.append(x+l)
    # 将所有数据的顺序打乱重排
    idx = np.random.permutation(range(len(X)))
    X = [X[i] for i in idx]
    Y = [Y[i] for i in idx]
    return X, Y

X, Y = generate_XY(seqs_digit, namelist, 1000)
print("原始乐曲（部分）: ")
print(seqs[123][1:50])
print("变量X（音符序列）: ")
print(X[123][0:999])
print("变量X（作曲家）: ")
print(X[123][-9:])
print("变量Y: ")
print(Y[123])

原始乐曲（部分）: 
['2.5', '1.4', 'D2', '4.7', '2.5', 'D2', 'A3', 'G3', 'D2', 'B-3', 'A3', 'A3', 'D2', '2.5.9', '1.4.7', 'D2', '1.4.7.9', '2.5.9', 'D2', '2.5.9', '1.4.7.10', 'D2', '1.4.7.10', '2.5.9', '2.5.9', 'D2', 'D4', 'C#4', 'E4', 'C#3', 'D4', 'D3', 'D4', 'C#4', 'E4', 'C#4', 'D4', 'D4', 'D4', 'C4', 'B-3', 'A3', 'G3', 'A3', 'B-3', 'B3', 'A3', 'A2', 'A2']
变量X（音符序列）: 
[24, 18, 2, 46, 31, 34, 31, 34, 15, 2, 36, 10, 36, 10, 41, 15, 63, 28, 41, 15, 36, 10, 36, 10, 15, 2, 31, 34, 31, 34, 2, 46, 4, 18, 71, 47, 5, 34, 5, 34, 69, 32, 27, 10, 27, 10, 60, 30, 104, 28, 60, 30, 27, 10, 27, 10, 69, 32, 5, 34, 5, 34, 71, 47, 32, 18, 10, 75, 30, 34, 30, 34, 28, 71, 33, 10, 33, 10, 70, 69, 62, 28, 70, 69, 33, 10, 33, 10, 28, 71, 30, 34, 30, 34, 10, 75, 24, 18, 2, 46, 31, 34, 31, 34, 15, 2, 36, 10, 36, 10, 41, 15, 63, 28, 41, 15, 36, 10, 36, 10, 15, 2, 31, 34, 31, 34, 2, 46, 4, 18, 71, 47, 5, 34, 5, 34, 69, 32, 27, 10, 27, 10, 60, 30, 104, 28, 60, 30, 27, 10, 27, 10, 69, 32, 5, 34, 5, 34, 71, 47, 24, 18, 71,

In [17]:
# 设定batch size
batch_size = 64
# 创建Tensor形式的数据集
ds = DataSet.TensorDataset(torch.IntTensor(np.array(X, dtype=int)), torch.IntTensor(np.array(Y, dtype=int)))
# 形成数据集加载器
loader = DataSet.DataLoader(ds, batch_size=batch_size, shuffle=True, num_workers=1)

In [18]:
### 定义一个LSTM模型类
class LSTMNetwork(nn.Module):
    def __init__(self, input_size, output_size, word_num, embedding_size, hidden_size, num_layers=1):
        super(LSTMNetwork, self).__init__()
        # 一个embedding层
        self.embedding = nn.Embedding(word_num, embedding_size) 
        # PyTorch的LSTM层，batch_first标识可以让输入的张量的第一个维度表示batch指标
        self.lstm = nn.LSTM(embedding_size, hidden_size, num_layers, batch_first=True)
        # 输出的全连接层
        self.fc = nn.Linear(hidden_size, output_size) 
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.input_size = input_size
        self.output_size = output_size
        self.embedding_size = embedding_size
    
    ### 定义前向计算流程
    def forward(self, x2, hidden):
        # 先进行embedding层的计算
        x = self.embedding(x2)
        # 读入隐含层的初始信息
        hh = hidden#[0]
        # 从输入到隐含层的计算
        # x的尺寸为：batch_size，num_step，hidden_size
        output, hidden = self.lstm(x, hh)
        # 从output中去除最后一个时间步的数值（output中包含了所有时间步的结果）
        output = output[:, -1, ...]
        # 最后一层全连接网络
        output = self.fc(output)
        # output的尺寸为：batch_size，output_size
        return output
    
    ### 对隐含单元初始化
    def initHidden(self, x1, x1_size, batch_size):
        x = self.embedding(x1).cuda()     
        # 初始化的隐藏元和记忆元,通常它们的维度是一样的
        h1 = Variable(torch.zeros(self.num_layers, batch_size, self.hidden_size)).cuda()
        c1 = Variable(torch.zeros(self.num_layers, batch_size, self.hidden_size)).cuda()
        # 这里我们要对后面的LSTM模型的隐藏状态进行条件初始化
        # 需要借助一个LSTM来获得其在对应音乐家特征向量输入下输出的隐藏状态
        _, out = self.lstm(x, (h1, c1)) 
        return out

In [19]:
# 获取数据集包含的音符数量
seq_size = len(seq2idx.keys())+1
# 设定学习率和训练轮数
lr = 1e-2
epochs = 50
# 序列最大长度
max_len = 1000
# 生成一个简单的LSTM，输入size为999，输出size为seq_size（字符总数）
lstm = LSTMNetwork(input_size=max_len-1, output_size=seq_size, word_num=seq_size, embedding_size=256, hidden_size=128)
# 转为GPU下的模型
lstm = lstm.cuda()
#交叉熵损失函数
criterion = torch.nn.CrossEntropyLoss() 
#Adam优化算法
optimizer = torch.optim.Adam(lstm.parameters(), lr=lr) 
#查看模型具体信息
print(lstm)

LSTMNetwork(
  (embedding): Embedding(732, 256)
  (lstm): LSTM(256, 128, batch_first=True)
  (fc): Linear(in_features=128, out_features=732, bias=True)
)


In [20]:
### 定义预测准确率的函数
def accuracy(pre, label):
    #得到每一行（每一个样本）输出值最大元素的下标
    pre = torch.max(pre.data, 1)[1]
    #将下标与label比较，计算正确的数量
    rights = pre.eq(label.data).sum()
    #计算正确预测所占百分比
    acc = rights.data / len(label)
    return acc.float()

In [21]:
### 定义一个tensor分割函数
def split_x1_x2(x):
    x = x.tolist()
    x1 = [x[i][0:999] for i in range(len(x))]
    x2 = [x[i][-9:] for i in range(len(x))]
    x1 = torch.IntTensor(np.array(x1, dtype=int))
    x2 = torch.IntTensor(np.array(x2, dtype=int))
    return Variable(x1).cuda(), Variable(x2).cuda()

In [22]:
# 定义打印日志函数
def print_log(epoch, train_time, train_loss, train_acc, epochs=10):
    print(f"Epoch [{epoch}/{epochs}], time: {train_time:.2f}s, loss: {train_loss:.4f}, acc: {train_acc:.4f}")

In [23]:
### 定义模型训练函数
def train(model,optimizer, train_loader, epochs=1):
    train_losses = []
    train_accs = []
    val_losses = []
    val_accs = []
    
    for epoch in range(epochs):
        train_loss = 0
        train_acc = 0
        model.train() 
        # 记录当前epoch开始时间
        start = time.time()  
        for batch, data in enumerate(train_loader):
            # batch为数字，表示已经进行了几个batch
            # data为一个二元组，存储了一个样本的输入和标签
            x, y = Variable(data[0]), Variable(data[1])
            x, y = x.cuda(), y.cuda()
            x1, x2 = split_x1_x2(x)
            init_hidden = model.initHidden(x2, 9, len(data[0]))
            optimizer.zero_grad()
            outputs = model(x1, init_hidden)
            y = y.long()
            # 计算当前损失
            loss = criterion(outputs, y) 
            train_loss += loss.data.cpu().numpy()  
            train_acc += accuracy(outputs, y) 
            loss.backward() 
            optimizer.step() 
            
        # 记录当前epoch结束时间
        end = time.time()  
        # 计算当前epoch的训练耗时 
        train_time = end - start
        # 计算平均损失
        train_loss /= len(train_loader) 
        # 计算平均准确率 
        train_acc /= len(train_loader)              
        train_losses.append(train_loss)
        train_accs.append(train_acc)
        # 打印训练过程信息
        print_log(epoch + 1, train_time, train_loss, train_acc, epochs=epochs)  

    return train_losses, train_accs

In [24]:
# 模型训练
history = train(lstm, optimizer, loader, epochs=epochs)  

Epoch [1/50], time: 0.61s, loss: 6.0757, acc: 0.0122
Epoch [2/50], time: 0.21s, loss: 4.4390, acc: 0.0874
Epoch [3/50], time: 0.23s, loss: 3.5908, acc: 0.2727
Epoch [4/50], time: 0.23s, loss: 2.8582, acc: 0.4361
Epoch [5/50], time: 0.21s, loss: 2.4330, acc: 0.5264
Epoch [6/50], time: 0.21s, loss: 2.2569, acc: 0.5383
Epoch [7/50], time: 0.21s, loss: 2.2053, acc: 0.5432
Epoch [8/50], time: 0.20s, loss: 2.1686, acc: 0.5354
Epoch [9/50], time: 0.22s, loss: 2.1604, acc: 0.5453
Epoch [10/50], time: 0.21s, loss: 2.1625, acc: 0.5412
Epoch [11/50], time: 0.22s, loss: 2.1451, acc: 0.5387
Epoch [12/50], time: 0.22s, loss: 2.1481, acc: 0.5403
Epoch [13/50], time: 0.21s, loss: 2.1353, acc: 0.5448
Epoch [14/50], time: 0.21s, loss: 2.1066, acc: 0.5385
Epoch [15/50], time: 0.21s, loss: 2.0903, acc: 0.5466
Epoch [16/50], time: 0.22s, loss: 2.0687, acc: 0.5538
Epoch [17/50], time: 0.22s, loss: 2.0369, acc: 0.5464
Epoch [18/50], time: 0.21s, loss: 2.0260, acc: 0.5594
Epoch [19/50], time: 0.22s, loss: 2.0

In [14]:
### 生成指定音乐家的音乐
# 导入随机模块
import random
# 指定音乐家
musicianname = 'beethoven'
# 获得指定音乐家的数字序号
name_digit = name2idx[musicianname]
# 将指定音乐家变为输入的one-hot向量
name_digit = F.one_hot(torch.tensor(name_digit), num_classes=9)
# 用于存储后续模型输入的初始部分音乐序列
input_index = []
#随机抽取所选音乐家的一段已有乐曲用于后续辅助
for i in range(len(seqs)):
    if namelist[i] == musicianname:
        temp = seqs_digit[i][0:20]
        vocab = list(seqs_digit[i])
        if random.random() > 0.5:
            input_index = seqs_digit[i][0:20]
            vocab = list(seqs_digit[i])
            break
        else:
            continue
            
if len(input_index) == 0:
    input_index = temp

input_index = list(input_index)

In [15]:
### 模型预测生成音乐的过程
# 用于存储输出的乐曲序列
output_word = []
# 指定要生成的乐曲长度
length = 500 
for i in range(length):
    # 由于乐曲序列往往较长，随着预测长度边长，可能会出现信息缺失导致预测效果变差（如重复的旋律等）
    # 所以每间隔一段距离在此在输入序列中加入一定辅助乐曲片段作为补充信息
    if i % 25 == 0:
        indexs = list(random.sample(vocab, 5))
        input_index.extend(indexs)
    else:
        # 预测过程与作诗模型就比较相像了
        # 用经预测出的乐曲序列作为输入预测下一个音符存入输出序列中
        # 同时每预测出一个音符也要对输入序列进行更新
        # 将当前字符与之前的字符拼接形成新的输入序列
        x1 = input_index + [0]*(max_len - 1 - len(input_index)) 
        x1 = [int(i.cpu()) if type(i) != int else i for i in x1]
        x1 = torch.IntTensor(np.array([x1], dtype=int))
        x1 = Variable(x1).cuda()

        x2 = torch.IntTensor(np.array([name_digit.tolist()], dtype=int))
        x2 = Variable(x2).cuda()
        init_hidden = lstm.initHidden(x2, 9, 1)
        pre = lstm(x1, init_hidden)
        # 提取最大概率的字符所在的位置，记录其编号
        index = torch.argmax(pre) 
        # 提取上述编号所对应的字符
        current_word = [k for k, v in seq2idx.items() if v == index][0] 
        # 将其存入输出序列
        output_word.append(current_word)   
        # 同时对输入序列也要更新
        input_index.append(index)

# 最后展示一下预测出的完整的乐曲序列
print(output_word)

['A2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '9.2', '10.3', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', 'B-2', '5.10', '5.10', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', '0.3.7', 'G#1', 'F1', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', '0.4.7', 'A3', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', '5.8.0', 'A3', 'A3', 'G#5', '5', '

In [16]:
### 定义生成音乐函数
def seq_to_mid(prediction):
    # 偏移累积量，防止数据覆盖
    offset = 0
    output_notes = []
    # 将预测的乐曲序列中的每一个音符符号转换生成对应的Note或chord对象
    for data in prediction:
        # 如果是和弦chord：列如45.21.78
        # data中有.或者有数字
        if ('.' in data) or data.isdigit():
            # 用.分隔和弦中的每个音
            note_in_chord = data.split('.')
            # notes列表接收单音
            notes = []
            for current_note in note_in_chord:
                # 把当前音符化成整数，在对应midi_number转换成note
                new_note = note.Note(int(current_note))
                # 乐器使用钢琴
                new_note.storedInstrument = instrument.Piano()
                notes.append(new_note)
            # 再把notes中的音化成新的和弦
            new_chord = chord.Chord(notes)
            # 初试定的偏移给和弦的偏移
            new_chord.offset = offset
            # 把转化好的new_chord弦传到output_notes中
            output_notes.append(new_chord)
        # 是音符note：
        else:
            # note直接可以把data变成新的note
            new_note = note.Note(data)
            new_note.offset = offset
            # 乐器用钢琴
            new_note.storedInstrument = instrument.Piano()
            # 把new_note传到output_notes中
            output_notes.append(new_note)
        # 每次迭代都将偏移增加，防止交叠覆盖
        offset += 0.5
    # 将上述转化好的output_notes传到外层的流stream
    # 注由于我们只涉及了钢琴一种乐器所以这里stream只由一个part构成即可
    # 把上面的循环输出结果传到流
    midi_stream = stream.Stream(output_notes)
    # 将流stream写入midi文件
    # 最终输出的文件名是output.mid，格式是mid
    midi_stream.write('midi', fp='output.mid')
    
# 调用函数将输出的音乐列转为midi格式文件存储
seq_to_mid(output_word)