In [None]:
import os
import sys
import copy
import librosa
import torch
import random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from tqdm.notebook import tqdm
from IPython.display import Audio, FileLink
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.models import resnet18
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.metrics import precision_score, recall_score, f1_score
from sklearn.metrics import confusion_matrix

# Cчитывание данные

In [None]:
data = []
for path in Path("/kaggle/input/dog-wolf-nature/dog_wolf_nature/not_dog").glob("*.wav"):
    try:
        data.append({"path": path, "label": 0})
    except Exception as e:
        print(e)

In [None]:
for path in Path("/kaggle/input/dog-wolf-nature/dog_wolf_nature/dog").glob("*.wav"):
    try:
        data.append({"path": path, "label": 1})
    except Exception as e:
        print(e)

In [None]:
for path in Path("/kaggle/input/dog-wolf-nature/dog_wolf_nature/wolf").glob("*.wav"):
    try:
        data.append({"path": path, "label": 2})
    except Exception as e:
        print(e)

In [None]:
data = pd.DataFrame(data)
data.head(5)

In [None]:
target_rate = 16_000                                        # частота дискретизации аудио
target_length = target_rate * 10                            # длина аудио
num_classes = 3                                             # количесвто классов для предсказания
device = ('cuda:0' if torch.cuda.is_available() else 'cpu') # устройство

In [None]:
# выделение тестового набора данных (по необходимости)
data, val = train_test_split(data, test_size=0.5, random_state=42, stratify=data["label"])
data = data.reset_index(drop=True)
data.head(5)

In [None]:
# разбиение датасета на train и test
train, test = train_test_split(data, test_size=0.2, random_state=42, stratify=data["label"])
train = train.reset_index(drop=True)
test = test.reset_index(drop=True)
train.head(5)

# Предобработка данных

In [None]:
# определение класса для применения аугментаций
class ComposeTransforms:
    def __init__(self, transforms_probs):
        self.transforms_probs = transforms_probs
    def __call__(self, waveform):
        for transform, prob in self.transforms_probs:
            if random.random() < prob:
                waveform = transform(waveform)
        return waveform

In [None]:
# аугментация изменения скорости
def change_speed(waveform, length=160_000):
    speed = random.choice([0.8, 0.9, 1.1, 1.2])
    result = librosa.effects.time_stretch(waveform, rate=speed)
    if len(result) > length:
        result = result[:length]
    else:
        begin = (length - len(result)) // 2
        end = (length - len(result)) - begin
        result = np.pad(result, (begin, end), "constant")
    return result

In [None]:
# аугментация изменения тона
def change_pitch(waveform, sample_rate=16_000):
    n_steps = random.choice([-1.75, -1.5, -1.25, -1, 1, 1.25, 1.5, 1.75])
    return librosa.effects.pitch_shift(waveform, sr=sample_rate, n_steps=n_steps)

In [None]:
# аугментация сдвига по времени
def time_shift(waveform):
    return np.roll(waveform, random.randint(40_000, 60_000))

In [None]:
# объект для применения аугментаций к аудио
transform_audio = ComposeTransforms([
    (change_pitch, 0.25),
    (change_speed, 0.25),
    (time_shift, 0.25)
])

In [None]:
# аугментнация маскирования спектрограммы аудио
def frequency_mask(mel_sgram, max_width=15):
    aug_sgram = copy.deepcopy(mel_sgram)
    num_mask = random.choice([0, 1, 2])
    for i in range(num_mask):
        width = random.randint(0, max_width)
        start = random.randint(0, aug_sgram.shape[0] - width)
        aug_sgram[start:start+width, :] = 0
    return aug_sgram

In [None]:
# аугментнация маскирования спектрограммы аудио
def time_mask(mel_sgram, max_width=20):
    aug_sgram = copy.deepcopy(mel_sgram)
    num_mask = random.choice([0, 1, 2])
    for i in range(num_mask):
        width = random.randint(0, max_width)
        start = random.randint(0, aug_sgram.shape[1] - width)
        aug_sgram[:, start:start+width] = 0
    return aug_sgram

In [None]:
# объект для примнения аугментаций к спектрограмме аудио
transform_sgram = ComposeTransforms([
    (frequency_mask, 0.25),
    (time_mask, 0.25)
])

In [None]:
# определение класса кастомного датасета
class CustomAudioDataset(Dataset):
    def __init__(self, paths, labels, target_rate, target_length, transform_audio=None, transform_sgram=None, overlay=False):
        self.paths = paths
        self.labels = labels
        self.target_rate = target_rate
        self.target_length = target_length
        self.transform_audio = transform_audio
        self.transform_sgram = transform_sgram
        self.overlay = overlay
    def __len__(self):
        return len(self.labels)
    def __getitem__(self, idx):
        waveform, sr = librosa.load(self.paths[idx])
        label = self.labels[idx]
        waveform = self.preprocess_wave(waveform, sr)
        if self.overlay:
            if (label == 2) or (label == 1):
                mask = np.array(self.labels) == 0
                filtered_paths = np.array(self.paths)[mask]
                random_path = random.choice(filtered_paths)
                zero_label_wave, zero_label_sr = librosa.load(random_path)
                zero_label_wave = self.preprocess_wave(zero_label_wave, zero_label_sr)
                waveform = self.overlay_audio(waveform, zero_label_wave)
        if self.transform_audio:
            waveform = self.transform_audio(waveform)
        mel_sgram = self.get_spectrogram(waveform, sr)
        if self.transform_sgram:
            mel_sgram = self.transform_sgram(mel_sgram)
        mel_sgram = self.standardization(mel_sgram)
        mel_sgram = self.to_tensor(mel_sgram)
        return mel_sgram, label
    # rms номрализация аудио
    def rms_audio_normalization(self, wave, target_dBFS=-20):
        rms = np.sqrt(np.mean(wave ** 2))
        target_rms = 10 ** (rms / 20)
        if rms == 0:
            return wave
        gain = target_rms / rms
        wave_norm = wave * gain
        return wave_norm
    # аугментация наложения аудио 0 класса на аудио 1 и 2 классов
    def overlay_audio(self, positive, negative):
        min_len = min(len(positive), len(negative))
        positive = positive[:min_len]
        negative = negative[:min_len]
        result = positive + negative
        return result
    # стандартизация спектрограммы по временым промежуткам
    def standardization(self, mel_sgram):
        mel_sgram = mel_sgram.T
        mel_sgram_stand = np.zeros_like(mel_sgram)
        for i in range(mel_sgram.shape[0]):
            mean = np.mean(mel_sgram[i])
            std = np.std(mel_sgram[i])
            if std != 0:
                mel_sgram_stand[i] = (mel_sgram[i] - mean) / std
            else:
                mel_sgram_stand[i] = mel_sgram[i] - mean
        mel_sgram_stand = mel_sgram_stand.T
        return mel_sgram_stand
    def to_tensor(self, mel_sgram):
        tensor = torch.tensor(mel_sgram)
        tensor = tensor.unsqueeze(0)
        return tensor.float()
    # изменение длины, частоты дискретизации аудио и перевод аудио в моно
    def preprocess_wave(self, waveform, sr):
        waveform = librosa.resample(waveform, orig_sr=sr, target_sr=self.target_rate)        
        waveform = librosa.to_mono(waveform)        
        if len(waveform) > self.target_length:
            waveform = waveform[:self.target_length]
        elif len(waveform) < self.target_length:
            pad_length = self.target_length - len(waveform)
            zeros = np.zeros(pad_length)
            waveform = np.concatenate((waveform, zeros))
        waveform = self.rms_audio_normalization(waveform)
        return waveform
    # получение спектрограммы аудио
    def get_spectrogram(self, waveform, sr):
        sgram = librosa.stft(waveform)
        magnitude = librosa.magphase(sgram)[0]
        mel_sgram = librosa.feature.melspectrogram(S=magnitude, sr=sr)
        mel_sgram_db = librosa.amplitude_to_db(mel_sgram, ref=np.min)
        return mel_sgram_db

In [None]:
learning_rate = 0.00001
batch_size = 16

In [None]:
# инстансы кастомных датасетов для train и test
train_audio_dataset = CustomAudioDataset(train["path"].tolist(), train["label"].tolist(), target_rate, target_length, transform_audio=transform_audio, transform_sgram=transform_sgram, overlay=True)
test_audio_dataset = CustomAudioDataset(test["path"].tolist(), test["label"].tolist(), target_rate, target_length, overlay=True)
train_dataloader = DataLoader(train_audio_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_audio_dataset, batch_size=batch_size, shuffle=False)

# Обучение модели

In [None]:
# определение класса для измененного ResNet18 для нашей задачи
class CustomResNet(torch.nn.Module):    
    def __init__(self, num_classes):
        super().__init__()
        self.resnet = resnet18(weights=None)
        self.resnet.conv1 = torch.nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)     
        in_features = self.resnet.fc.in_features
        self.resnet.fc = torch.nn.Linear(in_features, num_classes)   
    def forward(self, x):
        return self.resnet(x)

In [None]:
custom_model = CustomResNet(num_classes)
custom_model = custom_model.to(device)
weights = torch.tensor([1.0, 1.0, 1.0]).to(device)
loss_function = torch.nn.CrossEntropyLoss(weight=weights)
optimizer = torch.optim.Adam(custom_model.parameters(), lr=learning_rate)

In [None]:
# цикл обучения
def train_epoch(model, loss_fn, optimizer, dataloader):
    model.train()
    sum_loss = 0
    pbar = tqdm(dataloader, ascii=True, desc="Train")
    for i, (X, y) in enumerate(pbar):
        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        pred = model(X)
        loss = loss_fn(pred, y)
        sum_loss += loss.item()
        loss.backward()
        optimizer.step()
    return sum_loss / len(dataloader)

In [None]:
# цикл оценки
def eval_epoch(model, loss_fn, dataloader):
    model.eval()
    sum_loss = 0
    all_y_true = []
    all_y_pred = []
    pbar = tqdm(dataloader, ascii=True, desc="Validation")
    with torch.no_grad():
        for i, (X, y) in enumerate(pbar):
            X, y = X.to(device), y.to(device)
            pred = model(X)
            loss = loss_fn(pred, y)
            sum_loss += loss.item()            
            _, predicted_labels = torch.max(pred, 1)
            all_y_true = all_y_true + y.tolist()
            all_y_pred = all_y_pred + predicted_labels.tolist()
    target_names = ["Negative", "Dog", "Wolf"]     
    print(classification_report(all_y_true, all_y_pred, target_names=target_names))
    avg_loss = sum_loss / len(dataloader)
    return avg_loss

In [None]:
loss_val_arr = []
loss_train_arr = []
counter = 0  # счетчик для early stopping
patience = 5 # уровень терпения для early stopping
epochs = 15  # количество эпох для обучения
best_val_loss = sys.maxsize
for i in range(epochs):
    print(f"Epoch {i+1}\n-------------------------------")
    train_loss = train_epoch(custom_model, loss_function, optimizer, train_dataloader)
    val_loss = eval_epoch(custom_model, loss_function, test_dataloader)
    loss_train_arr.append(train_loss)
    loss_val_arr.append(val_loss)
    print(f"Train loss: {train_loss}")
    print(f"Val loss: {val_loss}")
    if (val_loss < best_val_loss):
        best_val_loss = val_loss
        counter = 0
        torch.save(custom_model.state_dict(), "best_model")
    else:
        counter += 1
        if counter >= early_stop:
            print(f"Early stop on the epoch: {i + 1}, best validation loss: {best_val_loss}")
            break

In [None]:
# график изменения функций потерь для train и test
plt.figure(figsize=(10, 6))
plt.plot(range(1, len(loss_train_arr) + 1), loss_train_arr, label="train loss")
plt.plot(range(1, len(loss_train_arr) + 1), loss_val_arr, label="validation loss")
plt.title("Loss curves")
plt.legend(loc='upper right')
plt.xlabel("Epoch")
plt.ylabel("Value")

# Тестирование

In [None]:
# инстанс кастомного датасета для val
val_audio_dataset = CustomAudioDataset(val["path"].tolist(), val["label"].tolist(), target_rate, target_length)
val_dataloader = DataLoader(val_audio_dataset, batch_size=batch_size, shuffle=False)

In [None]:
custom_model.eval()
all_y_true = []
all_y_pred = []
pbar = tqdm(val_dataloader, ascii=True, desc="Validation")
with torch.no_grad():
    for i, (X, y) in enumerate(pbar):
        X, y = X.to(device), y.to(device)
        pred = custom_model(X)   
        _, predicted_labels = torch.max(pred, 1)
        all_y_true = all_y_true + y.tolist()
        all_y_pred = all_y_pred + predicted_labels.tolist()
target_names = ["Negative", "Dog", "Wolf"]     
print(classification_report(all_y_true, all_y_pred, target_names=target_names))

In [None]:
# построение матрицы ошибок
cm = confusion_matrix(all_y_true, all_y_pred)

In [None]:
# изображение матрицы ошибок
plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Predicted negative', 'Predicted dog', 'Predicted wolf'], yticklabels=['Actual negative', 'Actual dog', 'Actual wolf'])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()