## 导入必要的库

In [None]:
# -*- encoding: utf-8 -*-
'''
@Time    :   2024/06/14 14:17:48
@Author  :   Fenrir 
@Contact :   746055404@qq.com
@Description :   测试训练
'''
import wandb
import random
from dotenv import load_dotenv 
import os
import datetime
import torch
load_dotenv()
WANDB_API_KEY = os.getenv('WANDB_API_KEY')
from utils import config

## wandb 启动

In [None]:
wandb.login(WANDB_API_KEY)
# # start a new wandb run to track this script
wandb.init(
    # set the wandb project where this run will be logged
    project="LKJ-test-training",
    # track hyperparameters and run metadata
    config={
        "learning_rate": 0.001,
        "architecture": "My_CNN_10",
        "dataset": "省数据库",
        "epochs": 5,
    },
    name=datetime.datetime.now().strftime("%Y-%m-%d"),
)

## 实例化config类，并加载配置参数

In [None]:
FeatureParams = config.features
TrainsParams = config.train
DataSources = config.data_sources
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device,torch.cuda.is_available())

***构建特征提取器***

In [None]:
import torch
import torchaudio
import torch.nn as nn
# extractor = torchaudio.transforms.MelSpectrogram(sample_rate=FeatureParams['MEL_SPECTROGRAM']['sample_rate'], hop_length=FeatureParams['MEL_SPECTROGRAM']['hop_length'],
#                                                  n_fft=FeatureParams['MEL_SPECTROGRAM']['n_fft'],n_mels=FeatureParams['MEL_SPECTROGRAM']['n_mels'],f_min=FeatureParams['MEL_SPECTROGRAM']['fmin'],
#                                                  f_max=FeatureParams['MEL_SPECTROGRAM']['fmax'],mel_scale='slaney',center=True, pad_mode='reflect')
# 加载特征提取器
from utils.audio.extractor import Extractor, FeatureType

features = [
    FeatureType.SPECTROGRAM,
    FeatureType.MEL_SPECTROGRAM,
    FeatureType.MFCC   
]

extractor = Extractor(sample_rate=44100, features=features)


## 加载数据集

***自由选取训练数据集，这里选用自然+交通数据集，其中鸟类数据集为混合数据集***

In [None]:

from datasets import DatasetFactory, Category, Label
from datasets.SupportedSources import NATURE, TRAFFIC, ESC50
labels = [
    NATURE.get_child("雷声"),
    NATURE.get_child("蛙声"),
    NATURE.get_child("蝉鸣声"),
    NATURE.get_child("狗叫声"),
    Label(
        name='鸟叫',
        sources=NATURE.get_childs(
            ['北红尾鸲叫声', '叉尾太阳鸟叫声', '大鹰鹃叫声', '强脚树莺叫声', '普通夜鹰叫声', '棕颈钩嘴鹛叫声', '淡脚柳莺叫声']
        )
    )
] + TRAFFIC.childs
# labels = [
#     NATURE.get_child("雷声"),
#     NATURE.get_child("蛙声"),
#     NATURE.get_child("蝉鸣声"),
#     NATURE.get_child("狗叫声"),
#     Label(
#         name='鸟叫',
#         sources=NATURE.get_childs(
#             ['北红尾鸲叫声', '叉尾太阳鸟叫声', '大鹰鹃叫声', '强脚树莺叫声', '普通夜鹰叫声', '棕颈钩嘴鹛叫声', '淡脚柳莺叫声']
#         ) + ESC50.get_childs('chirping_birds')
#     )
# ] + TRAFFIC.childs


category = Category(name="自然+交通", labels=labels)
LABELS = [item['name'] for item in category.labels_info]
len(labels),len(category) # 类别25类, 标签数（11+20）*1200+40=37240

***使用datasetfactory把选择的数据集进行处理排序得到全新排序的数据集，并随机分成训练集和测试集***

In [None]:
dataset_factory = DatasetFactory(category=category, test_ratio=0.2, seed=1202)
# dataset_factory.count_train_test_data()
# dataset_factory.category.get_label(index=22), dataset_factory.category.get_label(index=7)

***生成训练数据集、测试数据集***

In [None]:
train_data = dataset_factory.create_dataset(train=True, target_sr=32000, duration=10, extractor=extractor)
test_data = dataset_factory.create_dataset(train=False, target_sr=32000, duration=10, extractor=extractor)
len(train_data), len(test_data)

***创建dataloader***

In [None]:
from torch.utils.data import DataLoader
train_dataloader = DataLoader(train_data, batch_size=TrainsParams['batch_size'], shuffle=False)
test_dataloader = DataLoader(test_data, batch_size=TrainsParams['batch_size'],  shuffle=False)
len(train_dataloader), len(test_dataloader),TrainsParams['batch_size']


'''测试少量数据集'''
# from torch.utils.data import DataLoader
# from torch.utils.data.dataset import random_split

# # 计算要使用的训练数据集的大小
# train_size = int(len(train_data) * 0.1)

# # 使用 random_split 将数据集分为两部分，只取前百分之十
# train_data_subset, _ = random_split(train_data, [train_size, len(train_data) - train_size])

# # 创建只包含百分之十数据的 DataLoader
# train_dataloader_subset = DataLoader(train_data_subset, batch_size=8, shuffle=False)

# # 对于测试数据集，您也可以做类似的处理
# test_size = int(len(test_data) * 0.1)
# test_data_subset, _ = random_split(test_data, [test_size, len(test_data) - test_size])
# test_dataloader_subset = DataLoader(test_data_subset, batch_size=8, shuffle=False)

# # 输出 DataLoader 的大小和 batch_size
# len(train_dataloader_subset), len(test_dataloader_subset)



***创建模型***

***初始化***

In [None]:
from utils import pytorch as pt
from models import MyCNN10

# INPUT_HEIGHT = 431
INPUT_HEIGHT = 313
model = MyCNN10(num_classes=len(LABELS), input_height=INPUT_HEIGHT).to(device)
optimizer, scheduler, scaler, loss_func = pt.initialization(train_dataloader,model)

***测试model形状***

In [None]:

from torchsummary import summary
summary(model, input_size=(3,512,626))

In [None]:
'''测试特征提取完的shape'''
# from tqdm.notebook import tqdm
# n = 0
# for idx, (features, label) in enumerate(tqdm(train_dataloader_subset,leave=False ,desc="[train]")):  # 遍历数据加载器，使用tqdm显示进度条
#     # label = label.reshape(-1, len(LABELS))  # 用于调整标签的形状
#     n += 1
#     features, label = features.to(device), label.to(device)
#     print(features.shape, label.shape)
#     model(features)  # 模型前向传播
#     # if n > 10:
#     #     break

In [None]:
if True:    
    time_point = datetime.datetime.now().strftime("%Y-%m-%d")
    save_dir_base = f"experiments/training/{TrainsParams['model_name']}-{time_point}"
    save_dir = f"{save_dir_base}/models"

    latest_dir = f"{save_dir_base}/latest"
    latest_model_path = f"{latest_dir}/model.pth"
    latest_optimizer_path = f"{latest_dir}/optimizer.pth"

    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    if not os.path.exists(latest_dir):
        os.makedirs(latest_dir)
    print(f"Save directory: {save_dir}")

In [None]:
from tqdm.notebook import tqdm
import timeit
import time
import gc
import warnings
warnings.filterwarnings("ignore")
tqdm_ = tqdm(range(TrainsParams['start_epoch'], TrainsParams['max_epochs']))
print(device)
# n = 0
best_f1 = TrainsParams['best_f1']
best_auc = TrainsParams['best_auc']
best_loss = TrainsParams['best_loss']
for epoch in tqdm_:
    print(epoch)
    # start_time = time.time()
    # 训练
    model, optimizer, scaler, scheduler, trainloss = pt.train_an_epoch(model=model, optimizer=optimizer, 
                                                                     training_dataloader=train_dataloader, 
                                                                     scheduler=scheduler,scaler=scaler, loss_func=loss_func,tag_wandb=False)
    # 测试
    validloss, auc, prec, rec, acc, f1, f1_micro = pt.test_an_epoch(model=model, testing_dataloader=test_dataloader, loss_func=loss_func, LABELS=LABELS,tag_wandb=False)
    # n += 1
    # if n > 1:
    #     break
    if TrainsParams['best_loss'] < validloss:
        best_model = model
        best_f1 = f1
        best_auc = auc
        best_loss = validloss
        # end_time = time.time()
    pt.save_model(best_model.state_dict(), TrainsParams['best_model_path'])
    print(f"best model saved")
    print(f"单次训练耗时：{timeit.timeit(pt.train_an_epoch, number=1):.2f}")
    print(f"单次测试耗时：{timeit.timeit(pt.test_an_epoch, number=1):.2f}")

In [None]:

if True:
    wandb.log({f"best_loss": best_loss,
                f"best_f1": best_f1,
                f"best_auc":best_auc})
# del model, best_model
gc.collect()
torch.cuda.empty_cache()
print("--")
wandb.finish()
print("end")