In [1]:
# from google.colab import drive
# drive.mount('/content/drive')

In [2]:
!cp ./drive/MyDrive/datasets/data.zip ./data.zip

'cp' 不是内部或外部命令，也不是可运行的程序
或批处理文件。


In [3]:
!unzip data.zip >> /dev/null

系统找不到指定的路径。


In [4]:
!rm data.zip

'rm' 不是内部或外部命令，也不是可运行的程序
或批处理文件。


In [5]:
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from torch import nn
from torch import optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torch.utils as utils
import seaborn as sns
# 音频处理库
import librosa
import librosa.display

import os
import logging

# logging.basicConfig(filename='train_ae.log', encoding='utf-8', level=logging.DEBUG)

ModuleNotFoundError: No module named 'torch'

In [None]:
def plot_spectogram(spectogram, ax, fig=None, title="Spectogram"):
    # Convert to log scale (dB). We'll use the peak power as reference.
    log_spectogram = librosa.amplitude_to_db(np.abs(spectogram), ref=np.max)
    ax.set_title(title)
    img = librosa.display.specshow(log_spectogram, ax=ax, x_axis='time', y_axis='log')
    fig.colorbar(img, ax=ax, format="%+2.0f dB")

def plot_wave(wave, ax):
    ax.set_title('Wave')
    ax.plot(wave, label='wave')

In [None]:
def transform_wave_to_spectogram(wave, n_fft=2048, hop_length=256):
    spectogram = librosa.stft(wave, n_fft=n_fft, hop_length=hop_length)
    spectogram = np.abs(spectogram) # 此时数据还是比较极端，在对齐做log处理，再做均一化
    spectogram = librosa.amplitude_to_db(spectogram, ref=np.max)
    spectogram = spectogram.astype(np.float32)
    spectogram = (spectogram - np.min(spectogram)) / (np.max(spectogram) - np.min(spectogram))
    # normalize
    return spectogram

In [None]:
# config.py

configuration = {
    "datadirs": "./data",
    "optim": {
        "config": {
            "lr": 0.01,
            "weight_decay": 0.0001
        },
        "name": "Adam"
    }
}

In [None]:
# audio dataset
class ChordDataSet(utils.data.Dataset):
    def __init__(self, datadir, transform=transform_wave_to_spectogram):
        self.datadir = datadir
        self.transform = transform
        self.paths = []
        self.labels = []
        self.classes = []
        self._load_data()
    
    def _load_data(self):
        for root, dirs, files in os.walk(self.datadir):
            for file in files:
                if file.endswith(".wav"):
                    self.paths.append(os.path.join(root, file))
                    label = root.split("\\")[-1]
                    self.labels.append(label)
                    if label not in self.classes:
                        self.classes.append(label)
        self.num_class = len(self.classes)

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, index):
        path = self.paths[index]
        label = self.labels[index]
        label = self.classes.index(label)
        label = torch.zeros(self.num_class).scatter_(0, torch.tensor(label), 1)
        wave, sr = librosa.load(path, sr=16000)
        spectogram = torch.from_numpy(self.transform(wave)).unsqueeze(0)
        return spectogram, label
    def get_class(self, ind):
        return self.classes[ind]

In [None]:
class ContentEncoder(nn.Module):
    def __init__(self, in_channels=1):
        super().__init__()
        self.net_l = nn.Sequential(
            nn.Conv2d(in_channels, 64, 3, 1, 1),
            nn.ReLU(),
            nn.InstanceNorm2d(64),
            nn.Conv2d(64, 128, 3, 1, 1),
            nn.ReLU(),
            nn.InstanceNorm2d(128),
            nn.Conv2d(128, 256, 3, 1, 1),
            nn.ReLU(),
            nn.InstanceNorm2d(256),
        )
        self.net_r = nn.Sequential(
            nn.Conv2d(256, 128, 3, 1, 1),
            nn.ReLU(),
            nn.InstanceNorm2d(128),
            nn.Conv2d(128, 64, 3, 2, 1),
            nn.ReLU(),
            nn.InstanceNorm2d(64),
            nn.Conv2d(64, 1, 3, 2, 1),
            nn.ReLU(),
            nn.InstanceNorm2d(1),
            nn.Flatten()
        )

    def forward(self, x):
        res1 = self.net_l(x)
        res2 = x + res1
        res3 = self.net_r(res2)
        # print(res1.shape, res2.shape, res3.shape)
        return res3        

class StyleEncoder(nn.Module):
    def __init__(self, in_channels=1):
        super().__init__()
        self.net_l = nn.Sequential(
            nn.Conv2d(in_channels, 64, 3, 1, 1),
            nn.ReLU(),
            nn.Conv2d(64, 128, 3, 1, 1),
            nn.ReLU(),
            nn.Conv2d(128, 256, 3, 1, 1),
            nn.ReLU(),
        )
        self.net_r = nn.Sequential(
            nn.Conv2d(256, 128, 3, 1, 1),
            nn.ReLU(),
            nn.Conv2d(128, 64, 3, 2, 1),
            nn.ReLU(),
            nn.Conv2d(64, 1, 3, 2, 1),
            nn.ReLU(),
            nn.Flatten()
        )

    def forward(self, x):
        res1 = self.net_l(x)
        res2 = x + res1
        res3 = self.net_r(res2)
        # print(res1.shape, res2.shape, res3.shape)
        # 257 * 4
        return res3        


def adaIn(x, y):
    std_x = torch.std(x, dim=(2, 3), keepdim=True)
    mean_x = torch.mean(x, dim=(2, 3), keepdim=True)
    std_y = torch.std(y, dim=(2, 3), keepdim=True)
    mean_y = torch.mean(y, dim=(2, 3), keepdim=True)
    return std_y * (x - mean_x) / std_x + mean_y

class Decoder(nn.Module):
    def __init__(self):
        super().__init__()
        ### 两个decoder层级，一个是style，一个是content
        ### style decoder 中解构出来的与 content融合
        self.net = nn.Sequential(
            nn.ConvTranspose2d(1, 64, 3, 2, 1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 128, 3, 2, 1),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 1, 3, 1, 1),
            nn.ReLU(),
        )
    
    def forward(self, content, style):
        # resize content to 257 * 4
        # resize style to 257 * 4
        content = content.view(-1, 1, 257, 4)
        style = style.view(-1, 1, 257, 4)
        res = adaIn(content, style)
        return self.net(res)

In [None]:
def get_device():
    return torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# ae train


ae_configuration = {
    "optim": {
        "name": "Adam",
        "config": {
            "lr": 0.001,
        }
    },
    "epoch": 100,
    "batch_size": 8
}

def train_ae(content_net, style_net, decoder_net, device = get_device(), alpha = 0.5, encoder_name="all"):
    
    content_net.to(device)
    style_net.to(device)
    decoder_net.to(device)
    print("加载数据集")
    dataset = ChordDataSet("./data")
    print("加载数据完成")
    dataloader = utils.data.DataLoader(dataset, batch_size=ae_configuration["batch_size"], shuffle=True)
    optim = getattr(torch.optim, ae_configuration["optim"]["name"])
    optim = optim(list(content_net.parameters()) + list(style_net.parameters()) + list(decoder_net.parameters()), **ae_configuration["optim"]["config"])
    criterion = nn.MSELoss()
    print("开始训练")
    for epoch in range(ae_configuration["epoch"]):
        for i, (data, label) in enumerate(dataloader):
            data = data.to(device)
            optim.zero_grad()
            content = content_net(data)
            style = style_net(data)
            output = decoder_net(content, style)
            loss = (1 - alpha) * torch.mean(content) + alpha * criterion(output, data)
            loss.backward()
            optim.step()
            if i % 20 == 0:
                print(f"epoch: {epoch}, iter: {i}, loss: {loss.item()}")
        torch.save(content_net.state_dict(), f"./model/content_net_{encoder_name}_{epoch}.pth")
        torch.save(style_net.state_dict(), f"./model/style_net_{encoder_name}_{epoch}.pth")
        torch.save(decoder_net.state_dict(), f"./model/decoder_net_{encoder_name}_{epoch}.pth")

In [None]:
content_encoder = ContentEncoder()
style_encoder = StyleEncoder()
decoder = Decoder()

# train_ae(content_encoder, style_encoder, decoder)

content_encoder.load_state_dict(torch.load('./model/all_c_encoder.pth'))
style_encoder.load_state_dict(torch.load('./model/all_s_encoder.pth'))
decoder.load_state_dict(torch.load('./model/all_decoder.pth'))
content_encoder.to(get_device())
style_encoder.to(get_device())
decoder.to(get_device())

In [None]:
def restart():
    import os
    os._exit(00)

In [None]:
# restart()

In [None]:
dataset = ChordDataSet('./data/V3')

In [None]:
content_encoder.eval()
style_encoder.eval()
decoder.eval()

#### 测试一下 AE 的效果

In [None]:
sample_x = dataset[100][0].to(get_device())
content_feature = content_encoder(sample_x)
style_feature = style_encoder(sample_x)
decoder_ans = decoder(content_feature, style_feature)

In [None]:
sns.heatmap(decoder_ans.detach().cpu()[0][0])
# sns.heatmap(content_feature.detach().cpu()[0].view(257, 4))

In [None]:
sns.heatmap(sample_x.cpu()[0])

In [None]:
sns.heatmap(decoder_ans.detach().cpu()[0][0] - sample_x.cpu()[0])

* 可以发现encoder和decoder的效果还是非常不错的

### 如果交换一下顺序效果就会差很多

In [None]:
decoder_ans = decoder(content_feature, torch.rand_like(style_feature))

In [None]:
sns.heatmap(decoder_ans.detach().cpu()[0][0])

In [None]:
decoder_ans = decoder(torch.rand_like(content_feature), style_feature)
sns.heatmap(decoder_ans.detach().cpu()[0][0])

!!!!!!!!!!!!! 真的真的真的出现了，
真的提取出来了！！！！！
太棒了！！！！
开始训练分类器

同一个音频的音色特征是否相同呢??

In [None]:
def get_label(label, dataset=ChordDataSet('./data/Error')):
    index = torch.argmax(label)
    return dataset.labels[index]
get_label(dataset[10][0])

In [None]:
sample_D_0_1 = style_encoder(dataset[10][0].to(get_device()))
sample_D_0_2 = style_encoder(dataset[400][0].to(get_device()))

fig, axes = plt.subplots(1, 5, figsize=(20, 10))


sns.heatmap(sample_D_0_1[0].detach().cpu().view(257, 4), ax=axes[0])
sns.heatmap(sample_D_0_2[0].detach().cpu().view(257, 4), ax=axes[1])
sns.heatmap(dataset[10][0][0].detach().cpu(), ax=axes[2])
sns.heatmap(dataset[400][0][0].detach().cpu(), ax=axes[3])
sns.heatmap((sample_D_0_1[0].detach().cpu() - sample_D_0_2[0].detach().cpu()).view(257, 4), ax=axes[4])

可以发现，的的确确是提取出特征来了

In [None]:
class ChordRecognizerWithFull(nn.Module):
    def __init__(self, content_net, style_net, dataset = ChordDataSet('./data/Error'), device=get_device()):
        super().__init__()
        self.content_net = content_net.to(device)
        self.content_net.eval()
        self.style_net = style_net.to(device)
        self.style_net.eval()
        self.net = nn.Sequential(
            nn.InstanceNorm1d(num_features=1),
            nn.Linear(257 * 4 * 2, 2048),
            nn.ReLU(),
            nn.Linear(2048, 512),
            nn.ReLU(),
            nn.Linear(512, dataset.num_class),
        )
    
    def forward(self, x):
        content = self.content_net(x)
        style = torch.flatten(self.style_net(x))
        style = style.view(-1, 1028)
        res = torch.cat([content, style], dim=1)
        res = self.net(res)
        return res

class ChordRecognizerWithContent(nn.Module):
    def __init__(self, content_net, dataset = ChordDataSet('./data/Error'), device=get_device()):
        super().__init__()
        self.content_net = content_net.to(device)
        self.content_net.eval()
        self.net = nn.Sequential(
            nn.InstanceNorm1d(num_features=1),
            nn.Linear(257 * 4, 2048),
            nn.ReLU(),
            nn.Linear(2048, 512),
            nn.ReLU(),
            nn.Linear(512, dataset.num_class),
        )
    
    def forward(self, x):
        content = self.content_net(x)
        res = self.net(content)
        return res

class ChordRecognizerWithStyle(nn.Module):
    def __init__(self, style_net, dataset = ChordDataSet('./data/Error'), device=get_device()):
        super().__init__()
        self.style_net = style_net.to(device)
        self.style_net.eval()
        self.net = nn.Sequential(
            nn.InstanceNorm1d(num_features=1),
            nn.Linear(257 * 4, 2048),
            nn.ReLU(),
            nn.Linear(2048, 512),
            nn.ReLU(),
            nn.Linear(512, dataset.num_class),
        )
    
    def forward(self, x):
        style = torch.flatten(self.style_net(x))
        style = style.view(-1, 1028)
        res = self.net(style)
        return res


In [None]:
from utils import load_chord_predictor

content_predictor, style_predictor, all_predictor, dataset = load_chord_predictor()

loss_record = {
    "cs_net": [],
    "c_net": [],
    "s_net": [],
    "cs_acc": [],
    "c_acc": [],
    "s_acc": [],
}

cls_configuration = {
    "optim": {
        "cs_net": {
            "name": "SGD",
            "config": {
                "lr": 0.01,
            }
        },
        "c_net": {
            "name": "SGD",
            "config": {
                "lr": 0.01,
            }
        },
        "s_net": {
            "name": "SGD",
            "config": {
                "lr": 0.01,
            }
        }
    },
    "epoch": 100,
    "batch_size": 8,
    # "random_seed": 42,
    "alpha": 0.5,
}

In [None]:
def train_cls(cs_net, c_net, s_net, configuration, record, device=get_device()):
    cs_optim = getattr(torch.optim, configuration["optim"]["cs_net"]["name"])(cs_net.parameters(), **configuration["optim"]["cs_net"]["config"])
    c_optim = getattr(torch.optim, configuration["optim"]["c_net"]["name"])(c_net.parameters(), **configuration["optim"]["c_net"]["config"])
    s_optim = getattr(torch.optim, configuration["optim"]["s_net"]["name"])(s_net.parameters(), **configuration["optim"]["s_net"]["config"])
    criterion = nn.CrossEntropyLoss()

    

    # set random seed
    # torch.manual_seed(configuration["random_seed"])
    # torch.cuda.manual_seed(configuration["random_seed"])
    # np.random.seed(configuration["random_seed"])

    # split train and test
    epoch = configuration["epoch"]
    batch_size = configuration["batch_size"]
    alpha = configuration["alpha"]
    dataset = ChordDataSet("./data/V3")
    ano_dataset = ChordDataSet("./data")
    dataset.classes = ano_dataset.classes
    dataset.num_class = ano_dataset.num_class
    train_data, test_data = utils.data.random_split(dataset, [int(len(dataset) * 0.8), len(dataset) - int(len(dataset) * 0.8)])
    train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=True)

    # to cuda
    cs_net = cs_net.to(device)
    c_net = c_net.to(device)
    s_net = s_net.to(device)


    # train
    print("开始训练")
    for epoch in range(epoch):
        for i, (data, label) in enumerate(train_dataloader):
            data = data.to(device)
            label = label.to(device)
            cs_optim.zero_grad()
            c_optim.zero_grad()
            s_optim.zero_grad()
            cs_output = cs_net(data)
            c_output = c_net(data)
            s_output = s_net(data)
            cs_loss = criterion(cs_output, label)
            c_loss = criterion(c_output, label)
            s_loss = criterion(s_output, label)
            cs_loss.backward()
            c_loss.backward()
            s_loss.backward()
            cs_optim.step()
            c_optim.step()
            s_optim.step()
            # print(f'cs: {cs_output.argmax(dim=1)},\nc: {c_output.argmax(dim=1)}, \ns: {s_output.argmax(dim=1)}, label: {label.argmax(dim=1)}')
            if i % 10 == 0:
                print("epoch: {}, batch: {}, cs_loss: {}, c_loss: {}, s_loss: {}".format(epoch, i, cs_loss, c_loss, s_loss))
                record["cs_net"].append(cs_loss)
                record["c_net"].append(c_loss)
                record["s_net"].append(s_loss)
            if i % 100 == 0 and i != 0:
                # check acc
                print("开始测试")
                with torch.no_grad():
                    cs_acc = 0
                    c_acc = 0
                    s_acc = 0
                    for i, (data, label) in enumerate(test_dataloader):
                        data = data.to(device)
                        label = label.to(device)
                        cs_output = cs_net(data)
                        c_output = c_net(data)
                        s_output = s_net(data)
                        cs_acc += (torch.argmax(cs_output, dim=1) == torch.argmax(label, dim=1)).sum()
                        c_acc += (torch.argmax(c_output, dim=1) == torch.argmax(label, dim=1)).sum()
                        s_acc += (torch.argmax(s_output, dim=1) == torch.argmax(label, dim=1)).sum()
                    cs_acc = cs_acc / len(test_data)
                    c_acc = c_acc / len(test_data)
                    s_acc = s_acc / len(test_data)
                    print("epoch: {}, batch: {}, cs_acc: {}, c_acc: {}, s_acc: {}".format(epoch, i, cs_acc, c_acc, s_acc))
                    record["cs_acc"].append(cs_acc)
                    record["c_acc"].append(c_acc)
                    record["s_acc"].append(s_acc)
        torch.save(cs_net.state_dict(), "./model/all_cs_net_fine_{}.pth".format(epoch))
        torch.save(c_net.state_dict(), "./model/all_c_net_fine_{}.pth".format(epoch))
        torch.save(s_net.state_dict(), "./model/all_s_net_fine_{}.pth".format(epoch))

In [None]:
train_cls(all_predictor, content_predictor, style_predictor, cls_configuration, loss_record)