In [1]:
from google.colab import drive
drive.mount('/content/gdrive')
%cd ./gdrive/MyDrive/ML-KTH/birdclef-2023
!ls

Mounted at /content/gdrive
/content/gdrive/MyDrive/ML-KTH/birdclef-2023
Bird.ipynb		  pt			 test_soundscapes    val_loader.pth
data_preprocessing.ipynb  sample_submission.csv  train_audio	     wav2vec2-base-finetuned-ks
eBird_Taxonomy_v2021.csv  test			 train_loader.pth
mel_spectrograms	  test_loader.pth	 train_metadata.csv


In [None]:
!pip install evaluate
!pip install -U accelerate
!pip install -U transformers

In [3]:
import os

os.environ['KMP_DUPLICATE_LIB_OK']='True'
# Root directory containing bird sound folders
root_dir = './test'

# Get folder names (bird types)
bird_types = os.listdir(root_dir)
bird_types.sort()  # Ensure consistent order for label assignment
bird_types = bird_types[1:]
# Initialize lists for file paths and labels
audio_files = []
numeric_labels = []

# Label encoder - map bird type to a numeric value
label2id = {bird: idx for idx, bird in enumerate(bird_types)}
print(bird_types)
# Traverse each directory and collect file paths and labels
for bird_type in bird_types:
    bird_folder = os.path.join(root_dir, bird_type)
    for file in os.listdir(bird_folder):
        if file.endswith('.ogg'):
            file_path = os.path.join(bird_folder, file)
            audio_files.append(file_path)
            numeric_labels.append(label2id[bird_type])

import numpy as np

from torch.utils.data import Dataset, DataLoader
import torch
import librosa
def preprocess_function(example, feature_extractor, max_duration=15.0):
    audio_array = example["audio"]["array"]
    inputs = feature_extractor(
        audio_array,
        sampling_rate=16000,
        max_length=int(16000 * max_duration),
        truncation=True,
        padding=True,
        return_tensors="pt"
    )
    return inputs

def pad_audio_array(audio_array, target_length):
    # Calculate the number of zeros to add
    padding_length = target_length - audio_array.shape[0]
    if padding_length > 0:
        # Pad with zeros if the array is shorter than the target length
        padded_array = np.pad(audio_array, (0, padding_length), mode='constant')
    else:
        # Truncate the array if it is longer than the target length
        padded_array = audio_array[:target_length]
    return padded_array

from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    # 'batch' is a list of tuples with (input_values, label)
    input_values = [item[0] for item in batch]
    labels = [item[1] for item in batch]

    # Stack input values and labels into tensors
    input_values_tensor = torch.stack(input_values)
    labels_tensor = torch.tensor(labels)

    return input_values_tensor.squeeze(1), labels_tensor

class CustomDataset(Dataset):
    def __init__(self, file_paths, labels, label2id, id2label, feature_extractor, target_sampling_rate=16000):
        self.file_paths = file_paths
        self.labels = labels
        self.label2id = label2id
        self.id2label = id2label
        self.target_sampling_rate = target_sampling_rate
        self.feature_extractor = feature_extractor

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        file_path = self.file_paths[idx]
        file_name = file_path.split('/')[-2]
        label = self.label2id[file_name]

        # Load and resample the audio file
        audio_data, sampling_rate = librosa.load(file_path, sr=self.target_sampling_rate)
        audio_data_padded = pad_audio_array(audio_data, 120000)
        example = {"audio": {"array": audio_data_padded, "sampling_rate": self.target_sampling_rate}}
        processed_example = preprocess_function(example, self.feature_extractor)

        # label = self.label2id[self.labels[idx]]
        return processed_example["input_values"], label


labels = bird_types
label2id = {label: i for i, label in enumerate(labels)}
id2label = {i: label for i, label in enumerate(labels)}


from sklearn.model_selection import train_test_split
from transformers import Wav2Vec2Processor
from transformers import AutoFeatureExtractor, ASTForAudioClassification
# model_checkpoint = "facebook/wav2vec2-base"
# feature_extractor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")

model_checkpoint = "MIT/ast-finetuned-audioset-10-10-0.4593"
feature_extractor = AutoFeatureExtractor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")

# Assuming audio_files and numeric_labels are already defined
train_files, test_files, train_labels, test_labels = train_test_split(audio_files, numeric_labels, test_size=0.2, random_state=42)
val_files, test_files, val_labels, test_labels = train_test_split(test_files, test_labels, test_size=0.5, random_state=42)

train_dataset = CustomDataset(train_files, train_labels, label2id, id2label, feature_extractor)
val_dataset = CustomDataset(val_files, val_labels, label2id, id2label, feature_extractor)
test_dataset = CustomDataset(test_files, test_labels, label2id, id2label, feature_extractor)



custom_datasets = {"train": train_dataset, "validation": val_dataset, "test": test_dataset}


batch_size = 4
from torch.utils.data import DataLoader

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=batch_size, collate_fn=collate_fn)



# from datasets import load_dataset, load_metric
# metric = load_metric("accuracy")
import evaluate
metric = evaluate.load("accuracy")
dataset = custom_datasets
print(dataset['train'])

from transformers import AutoModelForAudioClassification, TrainingArguments, Trainer

num_labels = len(id2label)
# model = AutoModelForAudioClassification.from_pretrained(
#     model_checkpoint,
#     num_labels=num_labels,
#     label2id=label2id,
#     id2label=id2label,
# )
model = ASTForAudioClassification.from_pretrained(
    model_checkpoint,
    num_labels=num_labels,
    label2id=label2id,
    id2label=id2label,
    ignore_mismatched_sizes=True
)
import torch.nn as nn

model.classifier = nn.Sequential(
    nn.Linear(768, 20),
    nn.LogSoftmax(dim=1)
)

model_name = model_checkpoint.split("/")[-1]

args = TrainingArguments(
    f"{model_name}-finetuned-ks",
    evaluation_strategy = "epoch",
    save_strategy = "epoch",
    learning_rate=3e-5,
    per_device_train_batch_size=batch_size,
    gradient_accumulation_steps=4,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=5,
    warmup_ratio=0.1,
    logging_steps=10,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    push_to_hub=False,
)

import numpy as np

# def compute_metrics(eval_pred):
#     """Computes accuracy on a batch of predictions"""
#     predictions = np.argmax(eval_pred.predictions, axis=1)
#     return metric.compute(predictions=predictions, references=eval_pred.label_ids)

def compute_metrics(pred_logits, true_labels):
    preds = np.argmax(pred_logits, axis=1)
    return metric.compute(predictions=preds, references=true_labels)



['blakit1', 'cohmar1', 'colsun2', 'combul2', 'combuz1', 'comsan', 'eaywag1', 'eubeat1', 'gnbcam2', 'greegr', 'hoopoe', 'litegr', 'rbsrob1', 'rerswa1', 'somgre1', 'thrnig1', 'wbrcha2', 'wlwwar', 'woosan']


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


preprocessor_config.json:   0%|          | 0.00/297 [00:00<?, ?B/s]

Downloading builder script:   0%|          | 0.00/4.20k [00:00<?, ?B/s]

<__main__.CustomDataset object at 0x78b0298d2f80>


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/26.8k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/346M [00:00<?, ?B/s]

Some weights of ASTForAudioClassification were not initialized from the model checkpoint at MIT/ast-finetuned-audioset-10-10-0.4593 and are newly initialized because the shapes did not match:
- classifier.dense.bias: found shape torch.Size([527]) in the checkpoint and torch.Size([19]) in the model instantiated
- classifier.dense.weight: found shape torch.Size([527, 768]) in the checkpoint and torch.Size([19, 768]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [4]:
len(train_loader)

1403

In [5]:
import torch

def save_checkpoint(model, optimizer, epoch, filename="checkpoint.pth"):
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict()
    }
    torch.save(checkpoint, filename)

def load_checkpoint(model, optimizer, filename="checkpoint.pth"):
    checkpoint = torch.load(filename)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    return checkpoint['epoch']

In [6]:
import torch
from torch.optim import AdamW
from torch.utils.data import DataLoader
optimizer = AdamW(model.parameters(), lr=3e-5)
criterion = torch.nn.CrossEntropyLoss()  # For classification task
filename = "./pt/AST_checkpoint.pth"
start_epoch = 0
if os.path.exists(filename):
    start_epoch = load_checkpoint(model, optimizer , filename = filename)
    print("Load checkpoint")

checkpoint_interval = 1
num_epochs = 5
classes = 20
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
for epoch in range(start_epoch, num_epochs):
    break
    print(f'now is {epoch}')
    model.train()  # Set model to training mode
    total_loss = 0
    num = 0
    for batch in train_loader:
        optimizer.zero_grad()  # Clear existing gradients

        inputs, labels = batch
        inputs = inputs.squeeze(1)
        inputs = inputs.to(device)
        labels = labels.to(device)
        # print(inputs.shape)
        # print(inputs.shape)
        outputs = model(inputs)  # Forward pass
        logits = outputs.logits
        loss = criterion(logits, labels)  # Compute loss

        loss.backward()  # Backpropagation
        optimizer.step()  # Update weights
        num += 1
        if num % 100 == 0:
            print(num)
        total_loss += loss.item()
        # break

    avg_train_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Training Loss: {avg_train_loss}")

    #Save checkpoint
    if epoch % checkpoint_interval == 0:
        pth_name = "./pt/"+'AST'+ '_' + 'checkpoint'+ ".pth"
        save_checkpoint(model, optimizer, epoch, filename = filename)
        print("Save the pt")
        print(pth_name)
    # Validation step
    model.eval()  # Set model to evaluation mode
    total_eval_accuracy = 0
    total_eval_loss = 0

    for batch in val_loader:
        with torch.no_grad():
            inputs, labels = batch
            inputs = inputs.squeeze(1)
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)

            logits = outputs.logits
            loss = criterion(logits, labels)

            total_eval_loss += loss.item()
            logits = logits.detach().cpu().numpy()
            label_ids = labels.to('cpu').numpy()
            total_eval_accuracy += compute_metrics(logits,label_ids)["accuracy"]
            # print(compute_metrics(logits,label_ids))
        # break


    avg_val_accuracy = total_eval_accuracy / len(val_loader)
    avg_val_loss = total_eval_loss / len(val_loader)

    print(f"Validation Loss: {avg_val_loss}, Accuracy: {avg_val_accuracy}")

# Testing step
model.eval()  # Set model to evaluation mode
total_test_accuracy = 0
total_test_loss = 0

for batch in test_loader:
    with torch.no_grad():
        inputs, labels = batch
        inputs = inputs.squeeze(1)
        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)
        logits = outputs.logits
        loss = criterion(logits, labels)

        total_test_loss += loss.item()
        logits = logits.detach().cpu().numpy()
        label_ids = labels.to('cpu').numpy()
        total_test_accuracy += compute_metrics(logits, label_ids)["accuracy"]
        # break

avg_test_accuracy = total_test_accuracy / len(test_loader)
avg_test_loss = total_test_loss / len(test_loader)

print(f"Test Loss: {avg_test_loss}, Test Accuracy: {avg_test_accuracy}")

Load checkpoint
Test Loss: 0.4150204716267829, Test Accuracy: 0.8934659090909091


In [8]:
#K-fold
from sklearn.model_selection import KFold
import numpy as np

# Assuming 'val_dataset' is your validation dataset
k_folds = 5
kfold = KFold(n_splits=k_folds, shuffle=True)


results = []

for fold, (train_ids, val_ids) in enumerate(kfold.split(val_dataset)):
    # Create data loader for the current fold
    val_subsampler = torch.utils.data.SubsetRandomSampler(val_ids)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, sampler=val_subsampler)

    # Evaluate the model on this validation fold
    total_eval_accuracy = 0
    total_eval_loss = 0

    with torch.no_grad():
        for batch in val_loader:
            inputs, labels = batch
            inputs = inputs.squeeze(1)
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            logits = outputs.logits
            loss = criterion(logits, labels)

            total_eval_loss += loss.item()
            logits = outputs.logits.detach().cpu().numpy()
            label_ids = labels.to('cpu').numpy()
            total_eval_accuracy += compute_metrics(logits, label_ids)["accuracy"]

    avg_val_accuracy = total_eval_accuracy / len(val_loader)
    avg_val_loss = total_eval_loss / len(val_loader)
    results.append((avg_val_loss, avg_val_accuracy))

# Calculate and print overall performance
avg_results = np.mean(results, axis=0)
print(f"Average Validation Loss: {avg_results[0]}, Average Validation Accuracy: {avg_results[1]}")


Average Validation Loss: 0.38943953966651673, Average Validation Accuracy: 0.9036111111111111


In [9]:
results

[(0.5107906798358373, 0.8680555555555556),
 (0.24768242638092489, 0.9357142857142857),
 (0.3458415704509077, 0.9142857142857143),
 (0.5317396703307168, 0.8785714285714286),
 (0.31114335133419707, 0.9214285714285714)]