In [4]:
import os
import torch
import torchaudio
import torchaudio.transforms as T
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from torch.optim import Adam
import torch.nn as nn
from tqdm import tqdm
import numpy as np

import gdown
import zipfile
from google.colab import drive

In [5]:
drive.mount("/content/drive")

dir_main = "/content/drive/MyDrive/Penelitian: Birdsound_Classification/dataset_12"
dir_train = dir_main + "/train_grouped"
dir_val = dir_main + "/val_grouped"
dir_test = dir_main + "/test_grouped"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

N_MELS = 96
N_CTX = 512
N_STATE = 512
N_HEAD = 8
N_LAYER = 6
EPOCHS = 10
LEARNING_RATE = 0.001

In [None]:
class BirdAudioDataset(Dataset):
    def __init__(self, root_dir, target_time=512, target_mels=96, mean=None, std=None):
        self.target_time = target_time
        self.target_mels = target_mels
        self.file_paths = []
        self.labels = []
        self.classes = sorted([d for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))])
        self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}

        first_file = os.path.join(root_dir, self.classes[0], os.listdir(os.path.join(root_dir, self.classes[0]))[0])
        self.sample_rate = torchaudio.info(first_file).sample_rate

        self.n_mels = target_mels
        self.n_fft = 2048
        self.hop_length = 278
        self.mean = mean
        self.std = std

        for class_name in self.classes:
            class_dir = os.path.join(root_dir, class_name)
            for file in os.listdir(class_dir):
                if file.endswith(('.wav', '.mp3')):
                    self.file_paths.append(os.path.join(class_dir, file))
                    self.labels.append(self.class_to_idx[class_name])

    def compute_log_mel_spectrogram(self, waveform):
        mel_spectrogram = T.MelSpectrogram(
            sample_rate=self.sample_rate,
            n_fft=self.n_fft,
            hop_length=self.hop_length,
            n_mels=self.n_mels,
            window_fn=torch.hann_window
        )(waveform)

        mel_spectrogram = torch.log(mel_spectrogram + 1e-6)

        if self.mean is not None and self.std is not None:
            mel_spectrogram = (mel_spectrogram - self.mean) / (self.std + 1e-6)
        else:
            mel_spectrogram = (mel_spectrogram - mel_spectrogram.mean()) / (mel_spectrogram.std() + 1e-6)

        if mel_spectrogram.shape[-1] < self.target_time:
            pad = self.target_time - mel_spectrogram.shape[-1]
            mel_spectrogram = F.pad(mel_spectrogram, (0, pad))
        else:
            mel_spectrogram = mel_spectrogram[..., :self.target_time]

        if mel_spectrogram.shape[-2] < self.target_mels:
            pad = self.target_mels - mel_spectrogram.shape[-2]
            mel_spectrogram = F.pad(mel_spectrogram, (0, 0, 0, pad))
        else:
            mel_spectrogram = mel_spectrogram[..., :self.target_mels, :]

        return mel_spectrogram

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        try:
            waveform, sr = torchaudio.load(self.file_paths[idx])
            if sr != self.sample_rate:
                waveform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=self.sample_rate)(waveform)
            if waveform.shape[0] > 1:
                waveform = torch.mean(waveform, dim=0, keepdim=True)
            log_mel_spec = self.compute_log_mel_spectrogram(waveform)
            label = torch.tensor(self.labels[idx], dtype=torch.long)
            return log_mel_spec.unsqueeze(-1), label
        except Exception as e:
            print(f"Error loading {self.file_paths[idx]}: {e}")
            return self[np.random.randint(0, len(self)-1)]

def compute_dataset_statistics(dataset, num_samples=None):
    if num_samples is None:
        num_samples = len(dataset)

    indices = np.random.choice(len(dataset), min(num_samples, len(dataset)), replace=False)

    specs = []
    for i in indices:
        spec, _ = dataset[i]
        specs.append(spec)

    specs = torch.stack(specs)
    mean = specs.mean()
    std = specs.std()

    return mean, std

temp_train_dataset = BirdAudioDataset(dir_train)
temp_val_dataset = BirdAudioDataset(dir_val)
temp_test_dataset = BirdAudioDataset(dir_test)

train_mean, train_std = compute_dataset_statistics(temp_train_dataset)

train_dataset = BirdAudioDataset(dir_train, mean=train_mean, std=train_std)
val_dataset = BirdAudioDataset(dir_val, mean=train_mean, std=train_std)
test_dataset = BirdAudioDataset(dir_test, mean=train_mean, std=train_std)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=0)

N_CLASSES = len(train_dataset.classes)

In [None]:
class AudioEncoder(nn.Module):
    def __init__(self, n_mels, n_ctx, n_state, n_head, n_layer, n_classes):
        super(AudioEncoder, self).__init__()
        self.conv1 = nn.Conv2d(1, n_state, kernel_size=(3,3), padding=1)
        self.conv2 = nn.Conv2d(n_state, n_state, kernel_size=(3,3), stride=(2,2), padding=1)
        self.conv3 = nn.Conv2d(n_state, n_state, kernel_size=(3,3), stride=(2,2), padding=1)
        self.conv4 = nn.Conv2d(n_state, n_state, kernel_size=(3,3), stride=(2,2), padding=1)
        self.register_buffer("positional_embedding", torch.randn(1, n_ctx, n_state))
        self.blocks = nn.ModuleList([
            nn.TransformerEncoderLayer(d_model=n_state, nhead=n_head, activation="gelu")
            for _ in range(n_layer)
        ])
        self.ln_post = nn.LayerNorm(n_state)
        self.fc1 = nn.Linear(n_state, n_state // 2)
        self.fc2 = nn.Linear(n_state // 2, n_classes)
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        x = x.permute(0, 3, 1, 2)
        x = self.dropout(F.gelu(self.conv1(x)))
        x = self.dropout(F.gelu(self.conv2(x)))
        x = self.dropout(F.gelu(self.conv3(x)))
        x = self.dropout(F.gelu(self.conv4(x)))
        seq_len = x.shape[2] * x.shape[3]
        x = x.reshape(x.size(0), seq_len, x.size(1))
        pos_emb = self.positional_embedding[:, :x.shape[1], :]
        x = (x + pos_emb).to(x.dtype)
        for block in self.blocks:
            x = block(x)
        x = self.ln_post(x)
        x = self.dropout(F.gelu(self.fc1(x)))
        x = self.fc2(F.gelu(x[:, -1, :]))
        return x

model = AudioEncoder(n_mels=N_MELS, n_ctx=N_CTX, n_state=N_STATE, n_head=N_HEAD, n_layer=N_LAYER, n_classes=N_CLASSES)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss()
model.to(device)

In [None]:
def train_one_epoch(epoch):
    model.train()
    total_loss, total_correct = 0, 0
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}")
    for batch, labels in progress_bar:
        batch, labels = batch.to(device), labels.to(device)
        output = model(batch)
        loss = criterion(output, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        total_correct += (output.argmax(1) == labels).sum().item()
        progress_bar.set_postfix(loss=total_loss/len(train_loader), acc=total_correct/(batch_idx * batch_size + len(batch)))

def evaluate(loader, dataset_name):
    model.eval()
    total_correct, total_samples = 0, 0
    with torch.no_grad():
        for batch, labels in tqdm(loader, desc=f"Evaluating {dataset_name}"):
            batch, labels = batch.to(device), labels.to(device)
            output = model(batch)
            total_correct += (output.argmax(1) == labels).sum().item()
            total_samples += labels.size(0)
    accuracy = total_correct / total_samples
    print(f"{dataset_name} Accuracy: {accuracy:.4f}")

In [None]:
for epoch in range(EPOCHS):
    train_one_epoch(epoch)
    evaluate(val_loader, "Validation")

In [None]:
evaluate(test_loader, "Test")