# Parameters

In [None]:
FS = 16000

# Import and install libraries


In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
%cd /content/gdrive/MyDrive/Colab\ Notebooks/project #change to ur dir

In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
%cd gdrive/MyDrive/audio_recognition/main

Mounted at /content/gdrive
/content/gdrive/MyDrive/audio_recognition/main


In [None]:
# imports:
import os
import time
import fnmatch
import glob
import random
import numpy as np
import librosa
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
import torch.nn.functional as F
import matplotlib.pyplot as plt
from functools import partial
from tqdm import tqdm

# PEFT & Whisper related
from peft import get_peft_model, LoraConfig
from transformers import WhisperModel
from transformers import WhisperProcessor

# Optional: for argument parsing if args is used (you may already have this in your full code)
import argparse

# Data Processing

In [None]:
base_dir = "/content/gdrive/MyDrive/Colab Notebooks/project/Data/genres_original" #change to ur dir
genre_dirs = [os.path.join(base_dir, genre)
              for genre in os.listdir(base_dir)
              if os.path.isdir(os.path.join(base_dir, genre))]

train_files = []
val_files = []

for genre_dir in genre_dirs:
    genre_files = glob.glob(os.path.join(genre_dir, "*.wav"))
    print(f"Found {len(genre_files)} files in {os.path.basename(genre_dir)}")
    random.seed(42)
    random.shuffle(genre_files)
    split_idx = int(0.8 * len(genre_files))
    train_files.extend(genre_files[:split_idx])
    val_files.extend(genre_files[split_idx:])

print(f"Total train files: {len(train_files)}")
print(f"Total validation files: {len(val_files)}")

Found 100 files in pop
Found 100 files in reggae
Found 100 files in country
Found 100 files in rock
Found 100 files in classical
Found 100 files in disco
Found 100 files in blues
Found 100 files in hiphop
Found 100 files in jazz
Found 100 files in metal
Total train files: 800
Total validation files: 200


In [None]:
def load_audio(file_path, sr=16000):
    try:
        audio, _ = librosa.load(file_path, sr=sr, mono=True)
    except Exception as e:
        print(f"Error loading {file_path}: {e}")
        audio = None
    return audio

In [None]:
train_data = []
for f in train_files:
    label = os.path.basename(f).split('.')[0]
    audio_array = load_audio(f)
    if audio_array is not None:
        train_data.append({"array": audio_array, "label": label})

val_data = []
for f in val_files:
    label = os.path.basename(f).split('.')[0]
    audio_array = load_audio(f)
    if audio_array is not None:
        val_data.append({"array": audio_array, "label": label})

print(f"Train data samples: {len(train_data)}")
print(f"Validation data samples: {len(val_data)}")

  audio, _ = librosa.load(file_path, sr=sr, mono=True)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)


Error loading ../dataset/GTZAN/genres_original/jazz/jazz.00054.wav: 
Train data samples: 799
Validation data samples: 200


In [None]:
audio, _ = librosa.load(train_files[:2][1], sr=16000, mono=True)
processor = WhisperProcessor.from_pretrained("openai/whisper-tiny")
processor([audio], sampling_rate=16000, return_tensors="pt")

{'input_features': tensor([[[ 0.3245, -0.1991,  0.3658,  ...,  1.0984,  1.1399,  1.0821],
         [ 0.5686,  0.5699,  0.5394,  ...,  0.9421,  1.1574,  0.9613],
         [ 0.8102,  0.7244,  0.6339,  ...,  0.9872,  1.0744,  0.9841],
         ...,
         [-0.0594, -0.2086, -0.2773,  ...,  0.2655,  0.2060,  0.0535],
         [-0.0990, -0.3853, -0.3440,  ...,  0.1312,  0.0165,  0.0591],
         [ 0.0312, -0.3994, -0.4800,  ...,  0.0150, -0.0579, -0.0040]]])}

In [None]:
class AudioDataset(Dataset):
    def __init__(self, data, processor, label_to_idx, sr=16000):
        self.data = data
        self.processor = processor
        self.label_to_idx = label_to_idx
        self.sr = sr

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        processed = self.processor(sample["array"],
                                   sampling_rate=self.sr,
                                   return_tensors="pt")
        input_features = processed.input_features.squeeze(0)
        label = self.label_to_idx[sample["label"]]
        return {"input_features": input_features, "label": label}

def collate_fn(batch):
    inputs = [item["input_features"] for item in batch]
    labels = [item["label"] for item in batch]
    inputs = torch.stack(inputs)
    labels = torch.tensor(labels, dtype=torch.long)
    return inputs, labels

In [None]:
genre_dict = {
    "pop": 0,
    "reggae": 1,
    "country": 2,
    "rock": 3,
    "classical": 4,
    "disco": 5,
    "blues": 6,
    "hiphop": 7,
    "jazz": 8,
    "metal": 9
}

train_dataset = AudioDataset(train_data, processor, genre_dict)
val_dataset = AudioDataset(val_data, processor, genre_dict)

train_dataloader = DataLoader(train_dataset,
                              batch_size=32,
                              shuffle=True,
                              collate_fn=collate_fn)

validation_dataloader = DataLoader(val_dataset,
                                   batch_size=32,
                                   shuffle=False,
                                   collate_fn=collate_fn)

# Models and Loss Functions

In [None]:
class audio_classifier(nn.Module):
    def __init__(self, encoder, num_classes=10):
        super().__init__()
        self.encoder = encoder
        hidden_dim = encoder.config.d_model
        self.classifier = nn.Sequential(
            nn.Linear(hidden_dim, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        with torch.no_grad():
            x = self.encoder(x).last_hidden_state
        x = x.mean(dim=1)  # [batch, seq_len, hidden_dim] -> [batch, hidden_dim]
        return self.classifier(x)

class EarlyStopper:
    def __init__(self, patience=1, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.min_validation_loss = float('inf')

    def early_stop(self, validation_loss):
        if validation_loss < self.min_validation_loss:
            self.min_validation_loss = validation_loss
            self.counter = 0
        elif validation_loss > (self.min_validation_loss + self.min_delta):
            self.counter += 1
            if self.counter >= self.patience:
                return True
        return False

# Train

In [None]:
from collections import Counter
def save_models(results_path, peft_model, dense_layers, lora_weights_path, dense_layers_path):
    peft_model.to("cpu")
    dense_layers.to("cpu")

    peft_model.save_pretrained(results_path + 'models/' + lora_weights_path)
    torch.save(dense_layers.state_dict(), results_path + 'models/' + dense_layers_path)

def train(model, train_loader, val_loader, optimizer, criterion, device, num_epochs, results_path, checkpoint_path, model_name, writer, save_paths):
    model.to(device)
    criterion = criterion.to(device)
    best_val_loss = float('inf')
    early_stopper = EarlyStopper(patience=15)

    train_losses = []
    val_losses = []
    val_aucs = []
    train_times = []

    best_lora_weights_path = os.path.join(results_path, f"best_{save_paths['lora_weights']}")
    best_dense_layers_path = os.path.join(results_path, f"best_{save_paths['dense_layers']}")

    for epoch in range(num_epochs):
        model.train()
        start_time = time.time()

        train_loss = 0.0
        for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}", unit="batch"):
            inputs = inputs.to(device)

            #labels = labels.view(-1, 1).float().to(device)
            labels = labels.view(-1).long().to(device)


            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        train_loss /= len(train_loader)
        train_losses.append(train_loss)

        end_time = time.time()
        epoch_time = end_time - start_time
        train_times.append(epoch_time)

        eval_out = evaluate(model, val_loader, device, criterion)
        val_loss = eval_out['loss']
        val_auc = eval_out['auc']

        val_losses.append(val_loss)
        val_aucs.append(val_auc)

        writer.add_scalar(f'{model_name}/train_loss', train_loss, epoch)
        writer.add_scalar(f'{model_name}/val_loss', val_loss, epoch)
        writer.add_scalar(f'{model_name}/val_auc', val_auc, epoch)

        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val AUC: {val_auc:.4f}")

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), checkpoint_path)
            model.encoder.save_pretrained(best_lora_weights_path)
            torch.save(model.classifier.state_dict(), best_dense_layers_path)

        if early_stopper.early_stop(val_loss):
            print(f"Early stopping at epoch {epoch+1}")
            break

    save_models(
        results_path,
        peft_model=model.encoder,
        dense_layers=model.classifier,
        lora_weights_path=save_paths['lora_weights'],
        dense_layers_path=save_paths['dense_layers']
    )

    return train_losses, val_losses, val_aucs, train_times

def main(log_dir, train_data, valid_data, batch_size, num_workers, num_class, num_epochs, results_path, method="frozen_encoder"):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    writer = SummaryWriter(log_dir=log_dir)
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True, collate_fn=collate_fn)
    valid_loader = DataLoader(valid_data, batch_size=batch_size, shuffle=True, num_workers=num_workers, pin_memory=True, collate_fn=collate_fn)
    print("Train labels:", Counter([y for _, y in train_loader]))
    print("Val   labels:",   Counter([y for _, y in valid_loader]))

    whisper_model = WhisperModel.from_pretrained("openai/whisper-tiny")
    whisper_model = whisper_model.encoder

    module_names = [name for name, module in whisper_model.named_modules()]

    patterns = ["layers.*.self_attn.q_proj", "layers.*.self_attn.k_proj", "layers.*.self_attn.v_proj", "layers.*.self_attn.o_proj"]

    # Fetching all strings that match the patterns
    matched_modules = []
    for pattern in patterns:
        matched_modules.extend(fnmatch.filter(module_names, pattern))

    lora_config = LoraConfig(use_dora=True, r=8, lora_alpha=32, target_modules=matched_modules)
    whisper_model_with_lora = get_peft_model(whisper_model, lora_config).to(device)

    for name, param in whisper_model_with_lora.named_parameters():
        param.requires_grad = 'lora' in name

    models = []
    encoder = WhisperModel.from_pretrained(f"openai/whisper-{args.encoder}").encoder.to(device)

    if method == 'frozen_encoder':
        models.append(('frozen_encoder', audio_classifier(encoder, num_class)))
    elif method == 'lora':
        models.append(('lora', audio_classifier(encoder, num_class)))

    for model_name, model in models:
        if model_name in ['frozen_encoder', 'lora']:
            for param in model.encoder.parameters():
                param.requires_grad = False


        model.to(device)

        criterion = nn.CrossEntropyLoss().to(device)
        optimizer = torch.optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=args.learning_rate, betas=(0.9, 0.999), eps=1e-08)
        save_paths = {
            'lora_weights': f'lora_weights_8_32',
            'dense_layers': f'dense_layers_8_32.pth'
        }


        print(f"Training {model_name} model from scratch...")
        train_losses, val_losses, val_aucs, train_times = train(
        model, train_loader, valid_loader, optimizer, criterion, device, num_epochs, results_path,
        f'{model_name}_checkpoint.pt', model_name, writer, save_paths)



        plt.figure()
        plt.rc('font', family='serif')
        plt.plot(train_losses, label='Train Loss')
        plt.plot(val_losses, label='Validation Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
#        plt.title(f'{model_name} Loss vs Epoch')
        plt.savefig(f'{args.results_path}/figures/{model_name}_loss_lr-8e-4.png')

        plt.figure()
        plt.rc('font', family='serif')
        plt.plot(np.cumsum(train_times), train_losses, label='Train Loss')
        plt.plot(np.cumsum(train_times), val_losses, label='Validation Loss')
        plt.xlabel('Time (s)')
        plt.ylabel('Loss')
        plt.legend()
        plt.title(f'{model_name} Loss vs Time')
        plt.savefig(f'{args.results_path}/figures/{model_name}_loss_time_lr-8e-4.png')

        plt.figure()
        plt.rc('font', family='serif')
        plt.plot(val_aucs)
        plt.xlabel('Epoch')
        plt.ylabel('Validation AUC')
#        plt.title(f'{model_name} Val AUC vs Epoch')
        plt.savefig(f'{args.results_path}/figures/{model_name}_val_auc_lr-8e-4.png')

        plt.figure()
        plt.rc('font', family='serif')
        plt.plot(np.cumsum(train_times), val_aucs)
        plt.xlabel('Time (s)')
        plt.ylabel('Validation AUC')
        plt.title(f'{model_name} Val AUC vs Time')
        plt.savefig(f'{args.results_path}/figures/{model_name}_val_auc_time_lr-8e-4.png')

    writer.close()

In [None]:
import argparse

args = argparse.Namespace(
    log_dir="./runs/audio_exp1",
    results_path="./results/",
    encoder="tiny",
    learning_rate=8e-4,
    batch_size=16,
    num_epochs=20,
    num_workers=0,
    method="frozen_encoder"
)

In [None]:
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import label_binarize
def evaluate(model, data_loader, device, criterion):
    model.eval()
    total_loss = 0.0
    all_logits = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs = inputs.to(device)
            labels = labels.view(-1).long().to(device)  # (batch,)

            logits = model(inputs)                     # (batch, num_classes)
            loss   = criterion(logits, labels)
            total_loss += loss.item() * inputs.size(0)

            all_logits.append(logits.cpu().numpy())    # list of (batch, num_classes)
            all_labels.append(labels.cpu().numpy())    # list of (batch,)

    avg_loss = total_loss / len(data_loader.dataset)

    all_logits = np.concatenate(all_logits, axis=0)  # (N, num_classes)
    all_labels = np.concatenate(all_labels, axis=0)  # (N,)

    y_true_oh = label_binarize(all_labels, classes=np.arange(10))  # (N, num_classes)

    y_prob = F.softmax(torch.from_numpy(all_logits), dim=1).numpy()         # (N, num_classes)

    auc = roc_auc_score(
        y_true_oh,
        y_prob,
        multi_class='ovr',    # one-vs-rest
        average='macro'       # macro 平均
    )

    return {"loss": avg_loss, "auc": auc}

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

main(
    log_dir="/content/gdrive/MyDrive/Colab Notebooks/project/runs/audio_exp1",#change to ur dir
    train_data=train_dataset,
    valid_data=val_dataset,
    batch_size=16,
    num_workers=0,
    num_class=10,
    num_epochs=20,
    results_path="/content/gdrive/MyDrive/Colab Notebooks/project/results", #change to ur dir
    method="frozen_encoder"
)

After file generated, change the file "config.json", "model.safetensors" to "adapter_config.json", "adapter_model.safetensors"

# Test

In [None]:
!pip install datasets transformers evaluate

In [None]:
!sudo apt-get install libav-tools

In [None]:
from datasets import load_dataset
fma    = load_dataset("rpmon/fma-genre-classification")
val_ds = fma["validation"]
fma_genres = val_ds.features["genre"].names

In [None]:
fma_genres = val_ds.features["genre"].names  # ['Electronic', ..., 'Rock']
genre_list = ["blues","classical","country","disco",
              "hiphop","jazz","metal","pop","reggae","rock"]
GTZAN_MAP = {
  "Electronic":"disco",
  "Experimental":"metal",
  "Folk":"country",
  "Hip-Hop":"hiphop",
  "Instrumental":"classical",
  "International":"reggae",
  "Pop":"pop",
  "Rock":"rock"
}

data = []
num_samples = len(val_ds)
for i in range(num_samples):
    try:
        ex = val_ds[i]
        waveform = ex["audio"]["array"]
    except Exception as e:
        print(f"Skipping sample {i} due to error: {e}")
        continue

    genre_str = fma_genres[ex["genre"]]
    if genre_str not in GTZAN_MAP:
        continue

    gtzn_lbl = GTZAN_MAP[genre_str]
    data.append({"array": waveform, "label": gtzn_lbl})

In [None]:
import torch
import torch.nn as nn
from transformers import WhisperModel, WhisperProcessor
from datasets import load_dataset
from torch.utils.data import DataLoader
from peft import PeftModel

num_classes = 10
encoder = WhisperModel.from_pretrained("openai/whisper-tiny").encoder
model = audio_classifier(encoder, num_classes)
ckpt = torch.load("/content/gdrive/MyDrive/Colab Notebooks/project/results/best_dense_layers_8_32.pth", map_location="cpu") #also change to ur dir
model.classifier.load_state_dict(ckpt)

module_names = [name for name, module in model.encoder.named_modules()]
patterns = ["layers.*.self_attn.q_proj", "layers.*.self_attn.k_proj", "layers.*.self_attn.v_proj", "layers.*.self_attn.o_proj"]
matched_modules = []
for pattern in patterns:
    matched_modules.extend(fnmatch.filter(module_names, pattern))

lora_config = LoraConfig(use_dora=True, r=8, lora_alpha=32, target_modules=matched_modules)
model.encoder = get_peft_model(model.encoder, lora_config)

In [None]:
label_to_idx = {
    "pop": 0,
    "reggae": 1,
    "country": 2,
    "rock": 3,
    "classical": 4,
    "disco": 5,
    "blues": 6,
    "hiphop": 7,
    "jazz": 8,
    "metal": 9
}
processor = WhisperProcessor.from_pretrained("openai/whisper-tiny")
test_dataset = AudioDataset(data, processor, label_to_idx)
test_loader  = DataLoader(test_dataset,batch_size=32,shuffle=False,collate_fn=collate_fn)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device).eval()
criterion = nn.CrossEntropyLoss().to(device)

# 4) Run evaluation
metrics = evaluate(
    model,
    test_loader,
    device,
    criterion,
)

print(f"Test Loss: {metrics['loss']:.4f},   Test AUC: {metrics['auc']:.4f}")

Since there isn't good test datasets, the auc would be nan and loss would be higher