In [1]:
import os
import sys
import numpy as np
import librosa
from math import ceil
import torch
from torch import nn, optim
import torch.nn.functional as F
from tqdm.notebook import tqdm
from fractions import Fraction
import pickle
from time import time
from torch.cuda.amp import autocast, GradScaler
import matplotlib.pyplot as plt
import random
from scipy.signal import find_peaks, windows, convolve
import shutil
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DIC={
    0:[0, 0, 0, 0], 1:[0, 0, 0, 1], 2:[0, 0, 1, 0], 3:[0, 0, 1, 1],
    4:[0, 1, 0, 0], 5:[0, 1, 0, 1], 6:[0, 1, 1, 0], 7:[0, 1, 1, 1],
    8:[1, 0, 0, 0], 9:[1, 0, 0, 1], 10:[1, 0, 1, 0], 11:[1, 0, 1, 1],
    12:[1, 1, 0, 0], 13:[1, 1, 0, 1], 14:[1, 1, 1, 0], 15:[1, 1, 1, 1]
}

In [2]:
## 构建模型类：MugGenModel_T判断音频的时间步上是否出现音符


class MugGenModel_T(nn.Module):
    def __init__(self):
        super(MugGenModel_T, self).__init__()
        self.train_loss=[0]
        self.test_loss=[0]
        self.train_F1=[]
        self.test_F1=[]
        self.train_Recall=[]
        self.test_Recall=[]
        self.train_Precision=[]
        self.test_Precision=[]

        self.tanh=nn.Tanh()
        self.relu=nn.ReLU()
        self.dropout = nn.Dropout(p=0.5)
        
        self.conv0 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=(5,3))
        self.pool0 = nn.MaxPool2d(kernel_size=(1, 3), stride=(1, 3))
        self.conv1 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=(5,3))
        self.pool1 = nn.MaxPool2d(kernel_size=(1, 3), stride=(1, 3))
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=120, kernel_size=(5,1))
        self.fc0 = nn.Linear(960, 512)
        self.fc1 = nn.Linear(512, 256)
        self.fc2 = nn.Linear(256, 1)

    def forward(self, Input):
        batch_size,_,seq_len,_ = Input.size()

        '''
        IN SHAPE:(BATCH,CHANNEL,SEQ,FEATURES)
        IN(BATCH,3,50,80)->CONV0->(BATCH,10,25,78)-POOL0->(BATCH,10,1,26)->CONV1->(BATCH,20,1,24)->POOL1->(BATCH,20,1,8)
        TRANSPOSE & RESIZE->(BATCH,1,20*8)->LINEAR0(BATCH_SIZE,1,256)->LINEAR1(BATCH_SIZE,1,128)->LINEAR2(BATCH_SIZE,1)
        ->OUT(BATCH_SIZE,2)
        '''
        conv_out0 = self.conv0(Input)
        conv_out0 = self.dropout(self.pool0(self.relu(conv_out0)))
        
        conv_out1 = self.conv1(conv_out0)
        conv_out1 = self.dropout(self.pool1(self.relu(conv_out1)))
        
        conv_out1 = self.relu(self.conv2(conv_out1))

        conv_out1 = conv_out1.transpose(1,2).contiguous()
        
        conv_out1 = conv_out1.reshape(batch_size,1,-1)
        
        
        liner_out0=self.fc0(conv_out1)
        liner_out1=self.fc1(liner_out0)
        output = self.fc2(liner_out1)
        

        return output.flatten()#,hidden#torch.sigmoid()
    

    
## 构建模型类：MugGenModel_N判断音频的时间步上音符的排列形状
class MugGenModel_N(nn.Module):
    def __init__(self):
        super(MugGenModel_N, self).__init__()
        self.relu=nn.ReLU()
        self.dropout=nn.Dropout(p=0.5)
        
        self.embed=nn.Embedding(16,4)
        self.embedfc0=nn.Linear(4*16+8,128)
        self.embedfc1=nn.Linear(128,64)
        
        self.conv=nn.Conv2d(in_channels=3, out_channels=16, kernel_size=(3,3),padding=(1,1))
        self.pool = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2),padding=(1,0))
        self.conv1=nn.Conv2d(in_channels=16, out_channels=32, kernel_size=(3,3),padding=(1,1))
        self.pool1 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2),padding=(1,0))
        self.conv2=nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3,3),padding=(1,1))
        self.pool2 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2),padding=(1,0))
        self.conv3=nn.Conv2d(in_channels=64, out_channels=8, kernel_size=(3,3),padding=(1,1))
        self.fc0=nn.Linear(30*8+8,512)
        self.fc1=nn.Linear(512,128)
        
        self.outfc0=nn.Linear(128+64,256)
        self.outfc1=nn.Linear(256,16)
        
    def forward(self, Input,Last_note,note_distinct1,note_distinct2):
        conv_out=self.pool(self.relu(self.conv(Input)))
        conv_out=self.pool1(self.relu(self.conv1(conv_out)))
        conv_out=self.pool2(self.relu(self.conv2(conv_out)))
        conv_out=self.relu(self.conv3(conv_out))
        conv_out=conv_out.squeeze().reshape(conv_out.shape[0],-1)
        conv_out=torch.cat((note_distinct1,note_distinct2,conv_out),dim=-1)
        conv_out=self.relu(self.fc0(conv_out))
        conv_out=self.dropout(conv_out)
        conv_out=self.fc1(conv_out)
        
        ebd_out=self.embed(Last_note)
        ebd_out=torch.cat((note_distinct1,note_distinct2,ebd_out.reshape(ebd_out.shape[0],-1)),dim=-1)
        ebd_out=self.dropout(self.relu(self.embedfc0(ebd_out)))
        ebd_out=self.embedfc1(ebd_out)
        
        out=torch.cat((conv_out,ebd_out),dim=-1)
        out=self.relu(self.outfc0(out))
        out=self.outfc1(out)
        return out
    
class LSTM_T(nn.Module):
    def __init__(self):
        global DIC,MEL_DIM,HIDDEN_DIM
        super(LSTM_T, self).__init__()
        self.relu=nn.ReLU()
        
        self.lstm = nn.LSTM(240,200, num_layers=2,dropout=0.3,bidirectional=True,batch_first=True)
        self.fc0=nn.Linear(400,256)
        self.fc1=nn.Linear(256,128)
        self.fc2=nn.Linear(128,1)
        
    def forward(self,Input,hidden=None):
        batch_size=Input.shape[0]
        if hidden==None:
            lstmout,hidden=self.lstm(Input.transpose(1,2).reshape(batch_size,-1,240))
        else:
            lstmout,hidden=self.lstm(Input.transpose(1,2).reshape(batch_size,-1,240),hidden)
        #print(lstmout.shape)
        out=self.relu(self.fc0(lstmout))
        #out=self.relu(self.fc0(lstmout))
        out=self.relu(self.fc1(out))
        out=self.fc2(out)
        return out,hidden

In [3]:
def music_mel(y,sr,step=10,mel_dim=80):#默认step是10毫秒
    hop_length = ceil(sr / (22050/220))  # 计算hop_length
    # 提取音频特征
    spectrogram1 = librosa.feature.melspectrogram(y=y, sr=sr,n_mels=mel_dim,hop_length=hop_length,n_fft=512)
    spectrogram2 = librosa.feature.melspectrogram(y=y, sr=sr,n_mels=mel_dim,hop_length=hop_length,n_fft=1024)
    spectrogram3 = librosa.feature.melspectrogram(y=y, sr=sr,n_mels=mel_dim,hop_length=hop_length,n_fft=2048)
    log_spectrogram1 = Normalization(librosa.power_to_db(spectrogram1))
    log_spectrogram2 = Normalization(librosa.power_to_db(spectrogram1))
    log_spectrogram3 = Normalization(librosa.power_to_db(spectrogram1))
    return np.stack((log_spectrogram1,log_spectrogram2,log_spectrogram3),axis=2)
def Normalization(datas):
    Min = np.min(datas)
    Max = np.max(datas)
    return (datas-Min)/(Max-Min)



def get_frame_labels(inputs,peaks,seq_len=5):
    #将输入的音乐转化为音乐片段，只选取有音符部分的音乐和标签
    #返回当前note与上一个note的距离,分为4个类别，32~16分，16~8分，8~4分和4分以上的音符
    res=[]
    res_l=[]
    inupts=np.pad(inputs,((0,0),(seq_len,seq_len+1),(0,0)))
    distincts=[]
    di=0
    for i in range(seq_len,inputs.shape[1]-seq_len-1):
        if i in peaks:
            if i-di<=5:#约50毫秒以内，通常是32分音符
                di_type=[1,0,0,0]
            elif i-di<=10:#约100毫秒以内，通常是16分音符
                di_type=[0,1,0,0]
            elif i-di<=20:#约200毫秒以内，通常是8分音符
                di_type=[0,0,1,0]
            else:#其他距离视作同一种类别的音符
                di_type=[0,0,0,1]
            distincts.append(di_type)
            di=i
            res.append(inputs[:,i-seq_len:i+seq_len+1,:])
    res=np.array(res)
    distincts1=np.array(distincts)
    distincts.append([0,0,0,1])
    distincts=distincts[1:]
    distincts2=np.array(distincts)
    return res,distincts1,distincts2


def tensor2mc(data,music_name,steptime=10,bpm=120,offset=0):
    global DIC
    res_data={"meta":{"$ver":0,
                      "creator":"MugGenModel",
                      "background":"",
                      "version":"",
                      "id":0,
                      "mode":0,
                      "time":0,
                      "song":{"title":f"{music_name}","artist":"","id":0},
                      "mode_ext":{"column":4,"bar_begin":0}
                     },
               "time":[{"beat":[0,0,1],"bpm":bpm}],
               "effect":[],
               "note":[{"beat":[0,0,1],"sound":music_name,"vol":100,"offset":offset,"type":1}],
               "extra":{"test":{"divide":4,"speed":100,"save":0,"lock":0,"edit_mode":0}}}
    

    notes=[]
    for i in data:
        notes.append(DIC[i.item()])
        
    note=[]
    for i in range(len(notes)):
        for col in range(len(notes[i])):
            if notes[i][col]==1:
                b0=int((i*steptime+50-offset)/1000/60*bpm)
                fraction_result = Fraction((i*steptime+50-offset)/1000/60*bpm-b0).limit_denominator()
                b1,b2=fraction_result.numerator, fraction_result.denominator
                original_fraction = Fraction(b1, b2) # 原始分数
                target_denominator = 128 # 目标分母
                # 方法是先将分数转换为小数，然后乘以目标分母，并四舍五入得到新的分子
                new_numerator = round(original_fraction * target_denominator)
               # 创建新的分数
                converted_fraction = Fraction(new_numerator, target_denominator)
                b1,b2=converted_fraction.numerator, converted_fraction.denominator
                note.append({'beat':[b0,b1,b2],'column':col})
    res_data['note']=note+res_data['note']
    return res_data

def get_Bpm(peaks):
    #获取一个大概的bpm和offset，由于使用了（220/22050）ms作为一个时间步，所以并不是很准确
    dis=[]
    dis_dic={}
    for i in range(len(peaks)):#寻找相同间隔最多的一类，通常是16分音符
        if len(dis)==0:
            dis.append(peaks[i])
        else:
            dis.append(peaks[i]-peaks[i-1])
    for i in dis:
        if i not in dis_dic.keys():
            dis_dic[i]=1
        else:
            dis_dic[i]+=1
    dis_dic=sorted(dis_dic.items(),key=lambda x:x[1])
    i=-1
    time=(dis_dic[-1][0]*dis_dic[-1][1]+dis_dic[-2][0]*dis_dic[-2][1])/(dis_dic[-1][1]+dis_dic[-2][1])
    bpm=60/(220/22050*time*4) #16分音符的间隔*4就是一拍的时间
    #return bpm
    while bpm<150 or bpm>300:#缩放至150-300bpm之间
        if bpm<150:
            bpm*=2
        if bpm>300:
            bpm/=2
    bpm=round(bpm,3)
    
    offset_=0
    for i in dis:
        if i!=time:
            offset_+=i
        else:
            break
    offset=(offset_%time)*(220/22050)*1000
    return bpm,offset
        

In [4]:
T=MugGenModel_T().to(device)
N=MugGenModel_N().to(device)
checkpoint = torch.load('model/T/model .pth')
T.load_state_dict(checkpoint['model_state_dict'])
checkpoint = torch.load('model/N/model.pth')
N.load_state_dict(checkpoint['model_state_dict'])

<All keys matched successfully>

In [5]:
#N2 包含前后note距离信息
music_path='music/'
music_name='1684656672.ogg'
music_bpm=200

In [6]:
#Mug_Gen_T预测音符
threshold0=0
threshold1=0.8

test_music_temp=[]
res=[]
y,sr=librosa.load(music_path+music_name)
in_data=music_mel(y,sr,mel_dim=80).transpose(2,1,0)
for k in tqdm(range(6,in_data.shape[1]-7)):
    test_music_temp.append(in_data[:,k-6:(k+7),:])
    if len(test_music_temp)>=16 or k>=in_data.shape[1]-128:
        with torch.no_grad():
            test_music_temp=torch.tensor(np.array(test_music_temp), dtype=torch.float32).to(device)
            output = T(test_music_temp)
            res+=(output.cpu().tolist())
            test_music_temp=[]
sig_res=torch.sigmoid(torch.tensor(res)).numpy()
window = windows.hamming(5)
smoothed_pred = convolve(sig_res, window / window.sum(), mode='same')
peaks,_=find_peaks(smoothed_pred, height=threshold0*sum(sig_res)/len(sig_res),distance=3,prominence=threshold1*sum(sig_res)/len(sig_res))
bpm,offset=get_Bpm(peaks)
a,b,c=get_frame_labels(in_data,peaks+14,seq_len=8)
offset=0

  0%|          | 0/13218 [00:00<?, ?it/s]

In [7]:
last_note=torch.zeros((1,16),dtype=torch.long).to(device)
out=None
for i in tqdm(range(a.shape[0])):
    out_temp=N(torch.tensor(np.array([a[i]])).to(device),last_note,torch.tensor([b[i]]).to(device),torch.tensor([c[i]]).to(device))
    cur_note=torch.argmax(out_temp,dim=1)[None,:]
    last_note=torch.cat((cur_note,last_note),dim=1)[:,:-1]
    if type(out)==type(None):
        out=cur_note
    else:
        out=torch.cat((out,cur_note),dim=1)

  0%|          | 0/1326 [00:00<?, ?it/s]

  out_temp=N(torch.tensor(np.array([a[i]])).to(device),last_note,torch.tensor([b[i]]).to(device),torch.tensor([c[i]]).to(device))


In [8]:
res=torch.zeros(in_data.shape[1])
for i in range(len(peaks)):
    try:
        res[peaks[i]]=out[0][i]
    except:
        pass
    
mc=tensor2mc(res,music_name,steptime=1000/(22050/220),bpm=bpm,offset=offset)

dir_name=f"export/{int(time())} "+music_name
os.makedirs(dir_name, exist_ok=True)
filename = dir_name+"/0.mc"
with open(filename, "w") as file:
    file.write(str(mc).replace('\'','"'))
    

# 使用shutil.move()函数移动文件
shutil.copy(music_path+music_name, dir_name)

'export/1714831503 1684656672.ogg\\1684656672.ogg'