In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
os.chdir("/content/drive/MyDrive/hj_data/train")

In [None]:
import pandas as pd 
from pathlib import Path 
import math, random 
import torch 
import torchaudio 
from torchaudio import transforms 
from IPython.display import Audio 
from torch.utils.data import DataLoader, Dataset, random_split 
import torch.nn.functional as F 
from torch.nn import init 
import torch.nn as nn
import numpy as np
from sklearn.metrics import f1_score

#读取csv文件 
download_path = Path.cwd()/'' 
 
# Read metadata file 
metadata_file = download_path/'data.csv' 
df = pd.read_csv(metadata_file) 
df.head() 
 
# Construct file path by concatenating fold and file name 
df['relative_path'] = '/' + df['fold'].astype(str) + '/' + df['slice_file_name'].astype(str) 
 
# Take relevant columns 
df = df[['relative_path', 'classID']] 
df.head()

#读取文件中的音频
# import math, random 
# import torch 
# import torchaudio 
# from torchaudio import transforms 
# from IPython.display import Audio 
 
class AudioUtil(): 
  # ---------------------------- 
  # Load an audio file. Return the signal as a tensor and the sample rate 
  # ---------------------------- 
  @staticmethod 
  #audio_file = download_path/'fold1'/'101415-3-0-2.wav'
  def open(audio_file): 
    sig, sr = torchaudio.load(audio_file) 
    return (sig, sr)

#转换为立体声
# ---------------------------- 未执行
  # Convert the given audio to the desired number of channels 
  # ---------------------------- 
  @staticmethod 
  def rechannel(aud, new_channel): 
    sig, sr = aud 
 
    if (sig.shape[0] == new_channel): 
      # Nothing to do 
      return aud 
 
    if (new_channel == 1): 
      # Convert from stereo to mono by selecting only the first channel 
      resig = sig[:1, :] 
    else: 
      # Convert from mono to stereo by duplicating the first channel 
      resig = torch.cat([sig, sig]) 
 
    return ((resig, sr))

#标准化采样率
# ---------------------------- 
  # Since Resample applies to a single channel, we resample one channel at a time 
  # ---------------------------- 
  @staticmethod 
  def resample(aud, newsr): 
    sig, sr = aud 
 
    if (sr == newsr): 
      # Nothing to do 
      return aud 
 
    num_channels = sig.shape[0] 
    # Resample first channel 
    resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1,:]) 
    if (num_channels > 1): 
      # Resample the second channel and merge both channels 
      retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:,:]) 
      resig = torch.cat([resig, retwo]) 
 
    return ((resig, newsr))

#调整为相同长度
# ---------------------------- 
  # Pad (or truncate) the signal to a fixed length 'max_ms' in milliseconds 
  # ---------------------------- 
  @staticmethod 
  def pad_trunc(aud, max_ms): 
    sig, sr = aud 
    num_rows, sig_len = sig.shape 
    max_len = sr//1000 * max_ms 
 
    if (sig_len > max_len): 
      # Truncate the signal to the given length 
      sig = sig[:,:max_len] 
 
    elif (sig_len < max_len): 
      # Length of padding to add at the beginning and end of the signal 
      pad_begin_len = random.randint(0, max_len - sig_len) 
      pad_end_len = max_len - sig_len - pad_begin_len 
 
      # Pad with 0s 
      pad_begin = torch.zeros((num_rows, pad_begin_len)) 
      pad_end = torch.zeros((num_rows, pad_end_len)) 
 
      sig = torch.cat((pad_begin, sig, pad_end), 1) 
 
    return (sig, sr)

#数据扩充增广（时移）
# ---------------------------- 
  # Shifts the signal to the left or right by some percent. Values at the end 
  # are 'wrapped around' to the start of the transformed signal. 
  # ---------------------------- 
  @staticmethod 
  def time_shift(aud, shift_limit): 
    sig,sr = aud 
    _, sig_len = sig.shape 
    shift_amt = int(random.random() * shift_limit * sig_len) 
    return (sig.roll(shift_amt), sr)

#梅尔谱图
# ---------------------------- 
  # Generate a Spectrogram 
  # ---------------------------- 
  @staticmethod 
  def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None): 
    sig,sr = aud 
    top_db = 80 
 
    # spec has shape [channel, n_mels, time], where channel is mono, stereo etc 
    spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig) 
 
    # Convert to decibels 
    spec = transforms.AmplitudeToDB(top_db=top_db)(spec) 
    return (spec)

#数据扩充：时间和频率屏蔽
# ---------------------------- 
  # Augment the Spectrogram by masking out some sections of it in both the frequency 
  # dimension (ie. horizontal bars) and the time dimension (vertical bars) to prevent 
  # overfitting and to help the model generalise better. The masked sections are 
  # replaced with the mean value. 
  # ---------------------------- 
  @staticmethod 
  def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1): 
    _, n_mels, n_steps = spec.shape 
    mask_value = spec.mean() 
    aug_spec = spec 
 
    freq_mask_param = max_mask_pct * n_mels 
    for _ in range(n_freq_masks): 
      aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value) 
 
    time_mask_param = max_mask_pct * n_steps 
    for _ in range(n_time_masks): 
      aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value) 
 
    return aug_spec

#自定义数据加载器
# from torch.utils.data import DataLoader, Dataset, random_split 
# import torchaudio 
 
# ---------------------------- 
# Sound Dataset 
# ---------------------------- 
# SoundDS：MyDataset类
class SoundDS(Dataset): 
  def __init__(self, df, data_path): 
    self.df = df #df为csv文件
    self.data_path = str(data_path) 
    self.duration = 6000 
    self.sr = 22050 #采样率
    self.channel = 2 #通道数
    self.shift_pct = 0.4        
 
  # ---------------------------- 
  # Number of items in dataset 
  # ---------------------------- 
  def __len__(self): 
    return len(self.df)     
 
  # ---------------------------- 
  # Get i'th item in dataset 
  # ---------------------------- 
  def __getitem__(self, idx): 
    # Absolute file path of the audio file - concatenate the audio directory with 
    # the relative path 
    audio_file = self.data_path + self.df.loc[idx, 'relative_path']  #.loc:取idx对应行的所有数据，.loc[idx, 'relative_path']:取idx对应行的relative_path
    # Get the Class ID 
    class_id = self.df.loc[idx, 'classID'] #取idx对应行的classID
 
    aud = AudioUtil.open(audio_file) 
    # 有些声音有更高的采样率，或者比大多数声音更少的通道。所以让所有声音都有相同数量的通道和相同的采样率。除非采样速率相同，否则pad_trunc仍然会产生不同长度的数组，即使声音持续时间相同。
    reaud = AudioUtil.resample(aud, self.sr) #标准化采样率
    rechan = AudioUtil.rechannel(reaud, self.channel) #转换为立体声，统一为两个声道
 
    dur_aud = AudioUtil.pad_trunc(rechan, self.duration) #调整为相同长度
    shift_aud = AudioUtil.time_shift(dur_aud, self.shift_pct) #时移
    sgram = AudioUtil.spectro_gram(shift_aud, n_mels=64, n_fft=1024, hop_len=None) #转换为mel谱图
    aug_sgram = AudioUtil.spectro_augment(sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2) #时间和频率屏蔽
 
    return aug_sgram, class_id

#使用数据加载器准备一批数据
from torch.utils.data import random_split 
 
myds = SoundDS(df,download_path) 
 
# Random split of 90:10 between training and validation 
num_items = len(myds) 
num_train = round(num_items * 0.9) 
num_val = num_items - num_train 
train_ds, val_ds = random_split(myds, [num_train, num_val]) 
 
# Create training and validation data loaders 
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=64, shuffle=True) 
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=64, shuffle=False)

#建立模型
# import torch.nn.functional as F 
# from torch.nn import init 
 
# ---------------------------- 
# Audio Classification Model 
# ---------------------------- 
class AudioClassifier (nn.Module): 
    # ---------------------------- 
    # Build the model architecture 
    # ---------------------------- 
    def __init__(self): 
        super().__init__() 
        conv_layers = [] 
 
        # First Convolution Block with Relu and Batch Norm. Use Kaiming Initialization 
        self.conv1 = nn.Conv2d(2, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2)) #in_channels=2,out_channels=8
        self.relu1 = nn.ReLU() 
        self.bn1 = nn.BatchNorm2d(8) 
        init.kaiming_normal_(self.conv1.weight, a=0.1) #a-此层之后使用的整流器的负斜率(仅与 'leaky_relu' 一起使用)
        self.conv1.bias.data.zero_() 
        conv_layers += [self.conv1, self.relu1, self.bn1] 
 
        # Second Convolution Block 
        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1)) 
        self.relu2 = nn.ReLU() 
        self.bn2 = nn.BatchNorm2d(16) 
        init.kaiming_normal_(self.conv2.weight, a=0.1) 
        self.conv2.bias.data.zero_() 
        conv_layers += [self.conv2, self.relu2, self.bn2] 
 
        # third Convolution Block 
        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1)) 
        self.relu3 = nn.ReLU() 
        self.bn3 = nn.BatchNorm2d(32) 
        init.kaiming_normal_(self.conv3.weight, a=0.1) 
        self.conv3.bias.data.zero_() 
        conv_layers += [self.conv3, self.relu3, self.bn3] 
 
        # fourth Convolution Block 
        self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1)) 
        self.relu4 = nn.ReLU() 
        self.bn4 = nn.BatchNorm2d(64) 
        init.kaiming_normal_(self.conv4.weight, a=0.1) 
        self.conv4.bias.data.zero_() 
        conv_layers += [self.conv4, self.relu4, self.bn4] 
 
        # Linear Classifier 
        self.ap = nn.AdaptiveAvgPool2d(output_size=1) 
        self.lin = nn.Linear(in_features=64, out_features=6) 
 
        # Wrap the Convolutional Blocks 打包卷积块
        self.conv = nn.Sequential(*conv_layers) 
 
    # ---------------------------- 
    # Forward pass computations 
    # ---------------------------- 
    def forward(self, x): 
        # Run the convolutional blocks 
        x = self.conv(x) 
 
        # Adaptive pool and flatten for input to linear layer 
        x = self.ap(x) 
        x = x.view(x.shape[0], -1) #=reshape 重新定义矩阵形状，-1代表动态调整这个维度上的元素个数，以保证元素的总数不变
 
        # Linear layer 
        x = self.lin(x) 
 
        # Final output 
        return x 
 
# Create the model and put it on the GPU if available 
myModel = AudioClassifier() 
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") 
myModel = myModel.to(device) 
# Check that it is on Cuda 
next(myModel.parameters()).device

#训练
# ---------------------------- 
# Training Loop 
# ---------------------------- 
def training(model, train_dl, num_epochs): 
  # Loss Function, Optimizer and Scheduler 
  criterion = nn.CrossEntropyLoss() 
  optimizer = torch.optim.Adam(model.parameters(),lr=0.001) 
  scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001, 
                                                steps_per_epoch=int(len(train_dl)), 
                                                epochs=num_epochs, 
                                                anneal_strategy='linear') #退火策略，'cos'和'liner'分别表示余弦退火和线性退火
 
  expdir = Path.cwd()/'/content/drive/MyDrive/hj_data/experiments'
  version = 'v1'
  logfilepath = expdir / version / 'logs_{}.txt'.format(version)
  logfilepath_eval = expdir / version / 'logs_eval_{}.txt'.format(version)
  model_savedir = expdir / version 
  def init_logger(self):
        self.logFileName = str(self.logfilepath)
        if self.logfilepath.exists == True:
            self.logfilepath.exists.unlink()  ## 删除已经存在的log文件 (方便重启实验)
        if self.logfilepath_eval.exists == True:
            self.logfilepath_eval.exists.unlink()  ## 删除已经存在的log文件 (方便重启实验)
        self.logFileName_eval = str(self.logfilepath_eval)

        ## 存储training log文件
        with open(self.logFileName, 'a', encoding='utf-8') as wf:
            # #将参数类中的所有参数写入log
            # for k, v in self.hp.__dict__.items():
            #     wf.write("{} : {}\n".format(k, v))
            # wf.write('-' * 50 + "Experiment & Hparams Created" + "-" * 50 + "\n")
            wf.write("*" * 100 + "\n")
            wf.close()

    ## 该方法将一个字典  ，按kv的顺序，写入一行到 log.txt
  def write_line2log(log_dict: dict, filedir, isprint: True):
      strp = ''
      with open(filedir, 'a', encoding='utf-8') as f:
          for key, value in log_dict.items():
              witem = '{}'.format(key) + ':{},'.format(value)
              strp += witem
          f.write(strp)
          #f.write('当前进程的内存使用：%.4f GB' % (psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 / 1024))
          f.write('\n')
      if isprint:
          print(strp)
      pass

  def print_network(model, name):
      """Print out the network information."""
      num_params = 0
      for p in model.parameters():
          num_params += p.numel()
      print("Model {},the number of parameters: {}".format(name, num_params))

  #保存模型
  def save_model(i):
      pdict = {"model":model.state_dict(),
                }
      path = model_savedir / "{:04}.pth".format(i)
      torch.save(pdict, str(path))
      print("---------------- model saved ------------------- ")
  
  def micro_f1(preds, truths):
    """
    Micro f1 on binary arrays.

    Args:
        preds (list of lists of ints): Predictions.
        truths (list of lists of ints): Ground truths.

    Returns:
        float: f1 score.
    """
    # Micro : aggregating over all instances
    # device = torch.device('cpu')
    # # preds = preds.to(device)
    # # truths = preds.to(device)
    # preds.cpu()
    # truths.cpu()
    preds = np.concatenate(preds)
    truths = np.concatenate(truths)
    return f1_score(truths, preds,average='micro')

  #测试
  def test_acc (model, val_dl):
    correct_prediction = 0
    total_prediction = 0

    truths = []
    preds = []
    # Disable gradient updates
    with torch.no_grad():
      for data in val_dl:
        # Get the input features and target labels, and put them on the GPU
        inputs, labels = data[0].to(device), data[1].to(device)

        # Normalize the inputs
        inputs_m, inputs_s = inputs.mean(), inputs.std()
        inputs = (inputs - inputs_m) / inputs_s

        # Get predictions
        outputs = model(inputs)

        # Get the predicted class with the highest score
        _, prediction = torch.max(outputs,1)
        # Count of predictions that matched the target label
        correct_prediction += (prediction == labels).sum().item()
        total_prediction += prediction.shape[0]
        print(f'labes: {labels},preds: {prediction}')
        # state = torch.device('cpu')
        # labels = labels.to(device)
        # prediction = prediction.to(device)
        labels = labels.cpu()
        prediction = prediction.cpu()
        print(f'labes: {labels},preds: {prediction}')
        truths.append(labels)
        preds.append(prediction)

    
    acc = correct_prediction/total_prediction
    f1 = micro_f1(truths,preds)
    losse_curves  = {"eval_step--":"",
                    "Epoch":epoch,
                    "Accuracy":acc,
                    "Micro_f1":f1,
                    "Total items":total_prediction}
    write_line2log(losse_curves, logfilepath_eval, isprint=True)


  # Repeat for each epoch 
  for epoch in range(1,num_epochs): 
    running_loss = 0.0 
    correct_prediction = 0 
    total_prediction = 0 
 
    # Repeat for each batch in the training set 
    for i, data in enumerate(train_dl): 
        # Get the input features and target labels, and put them on the GPU 
        inputs, labels = data[0].to(device), data[1].to(device) 
 
        # Normalize the inputs 
        inputs_m, inputs_s = inputs.mean(), inputs.std() #标准差
        inputs = (inputs - inputs_m) / inputs_s 
 
        # Zero the parameter gradients 
        optimizer.zero_grad() 
 
        # forward + backward + optimize 
        outputs = model(inputs) 
        loss = criterion(outputs, labels) 
        loss.backward() 
        optimizer.step() 
        scheduler.step() 
 
        # Keep stats for Loss and Accuracy 
        running_loss += loss.item() 
 
        # Get the predicted class with the highest score 
        _, prediction = torch.max(outputs,1) 
        # Count of predictions that matched the target label 
        correct_prediction += (prediction == labels).sum().item() 
        total_prediction += prediction.shape[0] 
 
        # if i % 10 == 0:    # print every 10 mini-batches 
        #     print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 10)) 
 
    # Print stats at the end of the epoch 
    num_batches = len(train_dl)
    avg_loss = running_loss / num_batches 
    acc = correct_prediction/total_prediction 
    losse_curves  = {"Epoch":epoch,
                            "Loss":avg_loss,
                             "Accuracy":acc,
                             }
    if epoch == 1:
      print("create loss dict")
      loss_log_dict = {}
      for k, v in losse_curves.items():
        loss_log_dict[k] = []
      print("loss dict created")
    
    for k, v in loss_log_dict.items():
      loss_log_dict[k].append(losse_curves[k])  # 把每batch的loss数据加入到 loss curves中
    write_line2log(losse_curves, logfilepath, isprint=True)

    ## 模型保存
    if epoch % 50 == 0:
      test_acc(myModel,val_dl)
      save_model(epoch)
    # print(f'Epoch: {epoch}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}') 
 
  print('Finished Training') 
 
num_epochs=251   # Just for demo, adjust this higher. 
training(myModel, train_dl, num_epochs)

create loss dict
loss dict created
Epoch:1,Loss:1.7692620356877644,Accuracy:0.2111913357400722,
Epoch:2,Loss:1.7481220960617065,Accuracy:0.26895306859205775,
Epoch:3,Loss:1.729433496793111,Accuracy:0.2527075812274368,
Epoch:4,Loss:1.708642601966858,Accuracy:0.26534296028880866,
Epoch:5,Loss:1.6821375687917073,Accuracy:0.3231046931407942,
Epoch:6,Loss:1.65176522731781,Accuracy:0.34296028880866425,
Epoch:7,Loss:1.6347358491685655,Accuracy:0.34115523465703973,
Epoch:8,Loss:1.616469489203559,Accuracy:0.351985559566787,
Epoch:9,Loss:1.5883971982532077,Accuracy:0.37906137184115524,
Epoch:10,Loss:1.5659896002875433,Accuracy:0.3916967509025271,
Epoch:11,Loss:1.5445939302444458,Accuracy:0.41335740072202165,
Epoch:12,Loss:1.5255808432896931,Accuracy:0.44223826714801445,
Epoch:13,Loss:1.495904525121053,Accuracy:0.43501805054151627,
Epoch:14,Loss:1.474489410718282,Accuracy:0.4602888086642599,
Epoch:15,Loss:1.461493624581231,Accuracy:0.4584837545126354,
Epoch:16,Loss:1.4376066393322415,Accuracy:0.4

# **推理**

In [None]:
import csv #调用数据保存文件
import pandas as pd #用于数据输出
from pathlib import Path 

#读取csv文件 
test_path = Path.cwd()/'../test' 
#model_savedir = '/content/drive/MyDrive/hj_data/experiments/v0' 
 
# Read metadata file 
testdata_file = test_path/'test.csv' 
tf = pd.read_csv(testdata_file) 
tf.head() 
 
# Construct file path by concatenating fold and file name 
tf['relative_path'] = '/' + tf['slice_file_name'].astype(str) 
 
# Take relevant columns 
tf = tf[['relative_path', 'classID']] 
tf.head()


#加载测试数据
testds = SoundDS(tf,test_path) 

# Create test data loaders 
test_dl = torch.utils.data.DataLoader(testds, batch_size=1, shuffle=False) 

# def __getitem__(self, idx):
#   id = self.tf['slice_file_name'].astype(str)

# ---------------------------- 
# Inference 
# ---------------------------- 
def inference (model, test_dl): 
  correct_prediction = 0 
  total_prediction = 0 

  # Disable gradient updates 
  with torch.no_grad(): 
    list_classid = []
    list1 = []
    id = 0
    for data in test_dl: 
      id = id + 1
      # Get the input features and target labels, and put them on the GPU 
      inputs, labels = data[0].to(device), data[1].to(device) 
 
      # Normalize the inputs 
      inputs_m, inputs_s = inputs.mean(), inputs.std() 
      inputs = (inputs - inputs_m) / inputs_s 
 
      # Get predictions 
      outputs = model(inputs) 
 
      # Get the predicted class with the highest score 
      _, prediction = torch.max(outputs,1) 
      ClassId = prediction.item() + 1
      list_classid.append(ClassId)
      # Count of predictions that matched the target label 
      #correct_prediction += (prediction == labels).sum().item() 
      #total_prediction += prediction.shape[0] 
      list1.append(id)
      list=[]
      list.append(list1)
      list.append(list_classid)
      #print(list)
      column=['id','label'] #列表头名称
      test=pd.DataFrame(zip(list1,list_classid),columns=['id','label'])#将数据放进表格
      test.to_csv('result.csv') #数据存入csv,存储位置及文件名称
      print(f'{data[0]}, ClassId: {ClassId}')



 
  #acc = correct_prediction/total_prediction 
  #print(f'Accuracy: {acc:.2f}, Total items: {total_prediction}') 
 
# Run inference on trained model with the validation set
i = 100 #用于指定加载哪个模型
#model_path = model_savedir / "{:04}.pth".format(i)
model_path = '/content/drive/MyDrive/hj_data/experiments/v0/0250.pth'
model = myModel
state = torch.load(model_path)
model.load_state_dict(state['model'])
inference(model, test_dl)