In [None]:
# !wget https://github.com/karoldvl/ESC-50/archive/master.zip
# !mkdir -p data && cd data && unzip ../master.zip

In [1]:
import torch
import torchaudio
import numpy as np
from torch import nn
from torch.utils.data import DataLoader
from torch.utils.data import ConcatDataset

from utils.transform import UnsqueezeTransform, NoiseGeneratorTransform, FixedValueTransform
from utils.dataset import dump_dataset_info
from dataset.cry import CryDataset
from dataset.noise import NoiseDataset


# 定义超参数
classes = 50
train_batch_size = 300
test_batch_size = 5
train_data_esc50 = 'e:/dataset/out/esc50pp/training128mel1.pkl'
valid_data_esc50 = 'e:/dataset/out/esc50pp/validation128mel1.pkl'
train_data_donateacry = 'e:/dataset/out/dnac/donateacry.pkl'

train_data_us8k = 'e:/dataset/out/us8k/training128mel1.pkl'
valid_data_us8k = 'e:/dataset/out/us8k/validation128mel1.pkl'

#
# classes = 51
#   label[51] = 'noise'
#

noise = NoiseGeneratorTransform()

def get_noise_dataset(shape=(1, 128, 256), num_samples=6000):
    return NoiseDataset(shape, num_samples, target_id=51, noise_std=0.1)

def get_donateacry_dataset():
    return CryDataset(train_data_donateacry,
                      transform=torch.nn.Sequential(
                          noise, 
                          # NormalizeTransform(),
                          UnsqueezeTransform(),
                      ), 
                      target_transform=FixedValueTransform(value=20)
                     )

def get_esc50_train_dataset():
    return CryDataset(train_data_esc50, 
                      transform=torch.nn.Sequential(
                          noise, 
                          # NormalizeTransform(),
                          UnsqueezeTransform(),
                      ), 
                      # target_transform=ESC50LabelTransform()
                     )

def get_esc50_valid_dataset():
    return CryDataset(valid_data_esc50,
                      transform=torch.nn.Sequential(
                          #  NormalizeTransform(),
                          UnsqueezeTransform(),
                      ), 
                      #  target_transform=ESC50LabelTransform()
                     )

def get_us8k_train_dataset():
    return CryDataset(train_data_us8k, 
                      transform=torch.nn.Sequential(
                          noise, 
                          #  NormalizeTransform(),
                          UnsqueezeTransform(),
                      ), 
                    #   target_transform=US8KLabelTransform()
                     )

def get_us8k_valid_dataset():
    return CryDataset(valid_data_us8k,
                      transform=torch.nn.Sequential(
                          #  NormalizeTransform(),
                          UnsqueezeTransform(),
                      ), 
                    #   target_transform=US8KLabelTransform()
                     )
def get_big_valid_dataset():
    return ConcatDataset([get_esc50_valid_dataset(), get_us8k_valid_dataset()])

train_loader = DataLoader(ConcatDataset([
                                get_esc50_train_dataset(),
                                # get_donateacry_dataset(),
                                # get_noise_dataset(num_samples=300)
                            ]), 
                          batch_size=train_batch_size, shuffle=True)

test_loader = DataLoader(ConcatDataset([
                                get_esc50_valid_dataset(),
                            ]), 
                          batch_size=test_batch_size, shuffle=False)

print(f'train_dataset:{len(train_loader.dataset)}')
print(f'test_dataset:{len(test_loader.dataset)}')
# dump_dataset_info(train_loader.dataset)

train_dataset:11610
test_dataset:2894


In [2]:
import os
import torch
from tqdm.auto import tqdm

from torch.optim.lr_scheduler import MultiStepLR

# 混合精度
from torch.cuda.amp import autocast, GradScaler

from tensorboardX import SummaryWriter

from model.mobilenet_v3 import MobileNetV3
from model.mobilenet_v2 import MobileNetV2
from model.AudioClassifier import AudioClassifier

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# model = AudioClassifier(w=256, h=128, classes=21, num_conv_layers=3).to(device)
# model = MobileNetV2((1, 128, 100), 21).to(device)
model = MobileNetV3((1, 128, 256), classes, width_multiplier=1.0, dropout_rate=0.2).to(device)
# print(model)
# model = torch.hub.load('pytorch/vision:v0.10.0', 'mobilenet_v2', pretrained=True)

with_tqdm = False
with_l2_regularization = True
with_grad_scaler = True
start_epoch = 0
end_epoch = 100
learning_rate = 1e-3
save_model_start = 10
save_model_period = 10

# L2 正则化系数
l2_reg = 0.01


model_name = f'class{classes}'
model_folder = f'./weights/{model_name}'
model_pattern = f'{model_folder}/epoch_{{epoch:03d}}.pt'

if not os.path.exists(model_folder):
    os.makedirs(model_folder)

if os.path.exists(model_pattern.format(epoch=start_epoch)):
    print(f'loading model: {model_pattern.format(epoch=start_epoch)}')
    model.load_state_dict(torch.load(model_pattern.format(epoch=start_epoch)))

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# 定义损失函数和优化器
scheduler = MultiStepLR(optimizer, milestones=[10, 30, 60], gamma=0.1)

def evalute(model, dataloader):
    # 测试模型
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for i, (inputs, targets) in enumerate(dataloader):
            inputs = inputs.to(device)
            targets = targets.to(device)

            logits = model(inputs)
            _, predicted = torch.max(logits.data, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()
            # print(f'test[{i}]: actual={y[0]} predicted={predicted[0]}')

        accuracy = 100 * correct / total
        # print(f'Accuracy: {accuracy:.2f}%')
        return accuracy


# 创建 GradScaler
if with_grad_scaler:
    scaler = GradScaler()

writer = SummaryWriter()

# 训练模型
print(f'Training, lr={learning_rate}')
for epoch in range(start_epoch, end_epoch):
    model.train()
    iteratable = tqdm(enumerate(train_loader)) if with_tqdm else enumerate(train_loader)
    for i, (inputs, targets) in iteratable:
        inputs = inputs.to(device)
        targets = targets.to(device)
        
        # 前向过程开启 autocast
        with autocast():
            logits = model(inputs)
            loss = criterion(logits, targets)

        # 添加 L2 正则化
        if with_l2_regularization:
            l2_loss = 0
            for param in model.parameters():
                l2_loss += torch.norm(param)
            loss += l2_reg * l2_loss

        # 反向传播在 autocast 上下文之外
        optimizer.zero_grad()
        if with_grad_scaler:
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            loss.backward()
            optimizer.step()

    if (epoch + 1) >= save_model_start and (epoch + 1) % save_model_period == 0:
        torch.save(model.state_dict(), model_pattern.format(epoch=epoch + 1))

    train_acc = evalute(model, train_loader)
    test_acc = evalute(model, test_loader)

    writer.add_scalar('Loss', loss, epoch)
    writer.add_scalar('Train Accuracy', train_acc, epoch)
    writer.add_scalar('Test Accuracy', test_acc, epoch)
    print(f'Epoch [{epoch+1}/{end_epoch}], LR:{scheduler.get_last_lr()} NL:{noise.get_level()} Loss:{loss.item():.4f} Accuracy/Train:{train_acc:.2f}% Accuracy/Test:{test_acc:.2f}%')
    
    noise.step()
    scheduler.step()


Training, lr=0.001
Epoch [1/100], LR:[0.001] NL:1e-06 Loss:7.9744 Train Accuracy:3.99% Test Accuracy:2.35%
Epoch [2/100], LR:[0.001] NL:1e-06 Loss:7.0543 Train Accuracy:39.15% Test Accuracy:27.16%
Epoch [3/100], LR:[0.001] NL:1e-06 Loss:6.4236 Train Accuracy:51.65% Test Accuracy:37.91%
Epoch [4/100], LR:[0.001] NL:1e-06 Loss:5.8408 Train Accuracy:66.03% Test Accuracy:47.13%
Epoch [5/100], LR:[0.001] NL:1e-06 Loss:5.4146 Train Accuracy:69.22% Test Accuracy:46.20%
Epoch [6/100], LR:[0.001] NL:1e-06 Loss:5.0196 Train Accuracy:72.98% Test Accuracy:46.20%
Epoch [7/100], LR:[0.001] NL:1e-06 Loss:4.8299 Train Accuracy:76.80% Test Accuracy:46.86%
Epoch [8/100], LR:[0.001] NL:1e-06 Loss:4.5591 Train Accuracy:84.22% Test Accuracy:51.35%
Epoch [9/100], LR:[0.001] NL:1e-06 Loss:4.3801 Train Accuracy:76.40% Test Accuracy:47.51%
Epoch [10/100], LR:[0.001] NL:1e-06 Loss:4.1711 Train Accuracy:81.94% Test Accuracy:48.31%
Epoch [11/100], LR:[0.0001] NL:9.999999999999999e-06 Loss:4.1197 Train Accuracy:99

In [None]:

# model.load_state_dict(torch.load('./model_epoch_40.pt'))
print(f'Final Test Accuracy: {evalute(model, test_loader):.2f}%')

In [None]:
from matplotlib import pyplot as plt

waveform, sr = torchaudio.load('E:/dataset/ESC-50-master/audio/1-100032-A-0.wav')
resample = torchaudio.transforms.Resample(sr, 8000)
spectrogram = torchaudio.transforms.MelSpectrogram(
    sample_rate=8000,
    n_fft=512,
    win_length=20,
    hop_length=10, 
    n_mels=128)

eps = torch.Tensor([1e-6])
# spec = spec.numpy()
# spec = np.log(spec + eps)


out = spectrogram(resample(waveform[:44100]))
out += eps
out = out.log()
# time = len(out[0]) * 1000 / 8000
# print(time)
# print(out.shape)

plt.pcolormesh(out[0], cmap='gray')
plt.show()

In [None]:
import torch
from AudioClassifier import AudioClassifier

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = AudioClassifier(w=128, h=100, classes=21, num_conv_layers=3).to(device)
model.load_state_dict(torch.load('models/audioclassifier3/epoch_100.pt'))


In [None]:
import torch
import torchaudio
import librosa
import numpy as np

def get_spec(waveform, sampling_rate=24000, n_fft=512, window_length=20, hop_length=10):
	specs = []
	waveform = torch.Tensor(waveform)
	transform = torchaudio.transforms.MelSpectrogram(
		sample_rate=sampling_rate, 
		n_fft=n_fft, 
		win_length=window_length, 
		hop_length=hop_length, 
		n_mels=128)
	spec = transform(waveform)
	eps = 1e-6
	spec = spec.numpy()
	spec = np.log(spec + eps)
	x_min = spec.min()
	x_max = spec.max()
	spec = (spec - x_min) / (x_max - x_min)
	for j in range(0, spec.shape[1] - 51, 50):
		slice = spec[:, j:j+100]
		# print(f'slice shape: {slice.shape}, range: {j}:{j+100}')
		specs.append(slice)
	return specs

def extract_spectrogram(values, clip, entries):
	for data in entries:

		num_channels = 2
		window_sizes = [20, 40]
		hop_sizes = [10, 20]
		# window_sizes = [20]
		# hop_sizes = [10]

		specs = []
		for i in range(num_channels):
			window_length = int(round(window_sizes[i]*args.sampling_rate/1000))
			hop_length = int(round(hop_sizes[i]*args.sampling_rate/1000))

			clip = torch.Tensor(clip)
			spec = torchaudio.transforms.MelSpectrogram(sample_rate=args.sampling_rate, n_fft=512, win_length=window_length, hop_length=hop_length, n_mels=128)(clip)
			eps = 1e-6
			spec = spec.numpy()
			spec = np.log(spec + eps)
			# print(f'channel: {i} shape: {spec.shape}')
			for j in range(0, spec.shape[1] - 51, 50):
				slice = spec[:, j:j+100]
				# print(f'slice shape: {slice.shape}, range: {j}:{j+100}')
				specs.append(slice)
			# print(spec.shape)
			# spec = np.asarray(torchvision.transforms.Resize((128, 250))(Image.fromarray(spec)))
			# specs.append(spec)
		new_entry = {}
		# new_entry["audio"] = clip.numpy()
		new_entry["values"] = np.array(specs)
		new_entry["target"] = data["target"]
		values.append(new_entry)

clip, sr = librosa.load("d:\\code\\jupyter\\audio\\positive\\baby_cry_16bit_8k.wav", sr=24000)
# clip, sr = librosa.load("E:\\dataset\\bilibili\\cry2.m4s", sr=24000)
print(clip.shape, sr)
clip = clip[:len(clip) // 1000 * 1000]
print(clip.shape)
# entries = audios.loc[audios["filename"]==audio].to_dict(orient="records")
values = get_spec(clip, sampling_rate=sr)
values = [np.expand_dims(value, 0) for value in values[:1000]]
values = torch.Tensor(values).to(device)
print(values.size())
# print(len(values))
# print(values[0].shape)

predict = model(values).detach().cpu().numpy()
predict = [np.argmax(p) for p in predict]
print(predict)
# print(torch.argmax(predict[0]))

In [None]:
# print(f'audio length: {7659000 / 8000}s')
# print(7659000 / 1990)
print(predict.index(0))
