In [69]:
import os
import torch
import torchvision
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.nn.functional as F

from tqdm.notebook import tqdm
import joblib

from torch.utils.data import DataLoader, Dataset, random_split

# Dataset

In [None]:
#path
data_root = os.path.abspath(os.path.join(os.getcwd(), ".."))  # get data root path
aud_path = os.path.join(data_root, "dataset", "bird")  # flower data set path
assert os.path.exists(aud_path), "{} path does not exist.".format(aud_path)

In [None]:
"""Generate metadata"""
def get_meta(path):
    metadate = []
    label_list = os.listdir(path)
    for i,label in enumerate(label_list):
        label_path = os.path.join(path,label)
        aud_list = os.listdir(label_path)
        for j, aud in enumerate(aud_list):
            aud_path = os.path.join(label_path,aud)
            metadate.append([aud,aud_path,label,i])
    df = pd.DataFrame(metadate,columns=['filename','path','label','label_id'])
    
    return df

In [None]:
# get metadata
aud_meta = get_meta(aud_path)
aud_meta.head()

In [None]:
# data pre
DURATION = 2000
SR = 44100
CHANNEL = 2
N_MELS = 64
N_FFT = 512
F_MIN = 150
F_MAX = 15_000
HOP_LEN = 0.75 * N_FFT

In [None]:
"""Load Data"""
def load_data(df):
    def load_row(row):
        audio_file = str(row.path)
        
        aud = torchaudio.load(audio_file)
        reaud = AudioUtil.resample(aud, SR)# 标准化采样
        rechan = AudioUtil.rechannel(reaud, CHANNEL)#转换为 通道
        dur_aud = AudioUtil.pad_trunc(rechan, DURATION)#统一为 秒
        
        sgram = AudioUtil.spectro_gram(dur_aud, n_mels=N_MELS, n_fft=N_FFT, f_min=F_MIN, f_max=F_MAX, hop_len=HOP_LEN)

        return row.filename, sgram
    pool = joblib.Parallel(4)
    mapper = joblib.delayed(load_row)
    tasks = [mapper(row) for row in df.itertuples(False)]
    res = pool(tqdm(tasks))
    res = dict(res)
    return res

In [None]:
# read memory
dataset = load_data(aud_meta)

In [None]:
class BirdClefDataset(Dataset):

    def __init__(self, dataset, meta, sr=SR, is_train=True, num_classes=NUM_CLASSES, duration=DURATION):
        
        self.dataset = dataset
        self.meta = meta.copy().reset_index(drop=True)
        self.sr = sr
        self.is_train = is_train
        self.num_classes = num_classes
        self.duration = duration
        self.audio_length = self.duration*self.sr
    
    @staticmethod
    def normalize(image):
        image = image / 255.0
#         image = torch.stack([image, image, image])
        return image
    
    @staticmethod
    def Augment(image):
        # mask time and freq
        image = spectro_augment(image, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1)
        return image

    def __len__(self):
        return len(self.meta)
    
    def __getitem__(self, idx):
        row = self.meta.iloc[idx]
        image = self.dataset[row.filename]

#         image = image[np.random.choice(len(image))]
        
        image = self.normalize(image)
        
        t = row.label_id
        return image, t

In [None]:
ds = BirdClefDataset(dataset, meta=df, sr=SR, duration=DURATION)
len(ds)

# Dataloader

In [None]:
SPLIT = 0.8
BATCH_SIZE = 64
NUM_WORKERS = 8

In [None]:
train_size = int(len(ds)*SPLIT)  #这里train_size是一个长度矢量，并非是比例，我们将训练和测试进行8/2划分
test_size = len(ds) - train_size

train_dataset, test_dataset =random_split(ds, [train_size, test_size])
train_dataset.is_train = True
test_dataset.is_train = False
print(train_dataset.is_train,test_dataset.is_train)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, shuffle=False)

# Model


![这是图片](./LeNet.png "LeNet 结构")

In [None]:
class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        
        self.conv1 = nn.Conv2d(2, 6, 5)
        self.pool1 = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.pool2 = nn.MaxPool2d(2, 2)
        
        self.fc1 = nn.Linear(120,120)
        self.fc2 = nn.Linear(120,84)
        self.fc3 = nn.Linear(84,20)
    
    def forward(self,x):
        x = F.relu(self.conv1(x))    # input(2, 32, 32) output(6, 28, 28)
        x = self.pool1(x)            # output(6, 14, 14)
        x = F.relu(self.conv2(x))    # output(16, 10, 10)
        x = self.pool2(x)            # output(16, 5, 5)
        x = x.view(-1, 32*5*5)       # output(16*5*5)
        x = F.relu(self.fc1(x))      # output(120)
        x = F.relu(self.fc2(x))      # output(84)
        x = self.fc3(x)              # output(10)
        return x

In [None]:
net = LeNet()

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)
# Check that it is on Cuda
next(model.parameters()).device

# Train

In [61]:
def training(model, train_dl, test_dl, num_epochs):
    # criterion, Optimizer and Scheduler
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                                   steps_per_epoch=int(len(train_dl)),
                                                   epochs = num_epochs,
                                                   anneal_strategy='cos')
    
    train_accs = []
    train_losses = []
    test_accs = []
    test_losses = []
    begin = datetime.datetime.now()

    for epoch in range(num_epochs):
        running_loss = 0.0
        correct_prediction = 0
        total_prediction = 0

        for i, data in enumerate(train_dl):
            inputs, labels = data[0].to(device), data[1].to(device)
            
            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m) / inputs_s
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            scheduler.step()

            running_loss += loss.item()
            _, prediction = torch.max(outputs,1)
            correct_prediction += (prediction == labels).sum().item()
            total_prediction += prediction.shape[0]

        # Print stats at the end of the epoch
        num_batches = len(train_dl)
        train_loss = running_loss / num_batches
        train_acc = correct_prediction/total_prediction
        
        train_accs.append(train_acc)
        train_losses.append(train_loss)
        
        test_acc, test_loss = testing(model, test_dl)
        test_accs.append(test_acc)
        test_losses.append(test_loss)
        
        with open("1.txt",'a') as f:
            f.writelines(f'Epoch: {epoch}, Loss: {train_loss:.2f}, Accuracy: {train_acc:.2f}, \
            Test Accuracy: {test_acc:.2f}, Test loss: {test_loss:.2f}, Time: {datetime.datetime.now()-begin}\n')
            f.close()
        print(f'Epoch: {epoch}, Loss: {train_loss:.2f}, Accuracy: {train_acc:.2f}',end=" ")
        print(f'Test Accuracy: {test_acc:.2f}, Test loss: {test_loss:.2f}, Time: {datetime.datetime.now()-begin}')
        
        

    print('Finished Training')
    return train_accs, train_losses, test_accs, test_losses

def testing (model, test_dl):
    criterion = nn.CrossEntropyLoss()
    
    correct_prediction = 0
    total_prediction = 0
    running_loss = 0.0

    with torch.no_grad():
        for data in test_dl:
            inputs, labels = data[0].to(device), data[1].to(device)
            
            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m) / inputs_s
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            _, prediction = torch.max(outputs,1)
            correct_prediction += (prediction == labels).sum().item()
            total_prediction += prediction.shape[0]
    
    num_batches = len(test_dl)
    avg_loss = running_loss / num_batches
    acc = correct_prediction/total_prediction
    
    return acc, avg_loss

In [None]:
EPOCHS = 30

In [None]:
train_accs, train_losses, test_accs, test_losses= training(model,train_loader,test_loader,EPOCHS)

In [None]:
def result_plot(train_accs, train_losses, test_accs, test_losses):
    epochs = len(train_accs)
    
    plt.subplot(1,2,1)
    plt.plot(np.linspace(1,epochs,epochs),train_accs,label="train_acc")
    plt.plot(np.linspace(1,epochs,epochs),test_accs,label="test_acc")
    plt.title("ACC Plot")
    plt.legend()
    
    plt.subplot(1,2,2)
    plt.plot(np.linspace(1,epochs,epochs),train_losses,label="train_loss")
    plt.plot(np.linspace(1,epochs,epochs),test_losses,label="test_loss")
    plt.title("LOSS Plot")
    plt.legend()
#     plt.savefig("acc_loss.jpg")
    plt.show()

In [None]:
result_plot(train_accs, train_losses, test_accs, test_losses)