In [1]:
import os
import librosa
import gc
import ast
import numpy as np
import pandas as pd
from pathlib import Path
# from tqdm import tqdm
import torchaudio  # torchaudio==2.9.0  torch==2.9.0 torchcodec==0.8
import sys
import datasets  # pip install datasets==3.6.0
from datasets import load_dataset
import evaluate
from transformers import AutoFeatureExtractor
from transformers import AutoModelForAudioClassification, TrainingArguments, Trainer

In [2]:
model_checkpoint = "facebook/wav2vec2-base"
batch_size = 32

In [3]:
metric = evaluate.load("accuracy")

Downloading builder script: 0.00B [00:00, ?B/s]

In [5]:
dataset = load_dataset("audiofolder", data_dir="data/for-norm/for-norm/")

Resolving data files:   0%|          | 0/53868 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/10798 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/4634 [00:00<?, ?it/s]

In [5]:
dataset

DatasetDict({
    train: Dataset({
        features: ['audio', 'label'],
        num_rows: 53868
    })
    validation: Dataset({
        features: ['audio', 'label'],
        num_rows: 10798
    })
    test: Dataset({
        features: ['audio', 'label'],
        num_rows: 4634
    })
})

In [6]:
dataset["test"][88]

{'audio': {'path': 'D:\\OpenCV\\data\\for-norm\\for-norm\\testing\\fake\\file1078.wav_16k.wav_norm.wav_mono.wav_silence.wav',
  'array': array([ 0.12478638,  0.12738037,  0.12841797, ..., -0.13717651,
         -0.13647461, -0.1322937 ], shape=(19642,)),
  'sampling_rate': 16000},
 'label': 0}

In [7]:
dataset2sec = load_dataset("audiofolder", data_dir="data/for-2sec/for-2seconds/")

Resolving data files:   0%|          | 0/13956 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/2826 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/1088 [00:00<?, ?it/s]

In [6]:
dataset["train"].features["label"]

ClassLabel(names=['fake', 'real'], id=None)

In [7]:
# labels = dataset["train"].features["label"].names
# label2id, pred_label = dict(), dict()
# for i, label in enumerate(labels):
#     label2id[label] = str(i)
#     pred_label[str(i)] = label

In [8]:
pred_label = {'0': 'fake', '1': 'real'}

In [9]:
feature_extractor = AutoFeatureExtractor.from_pretrained(model_checkpoint)
feature_extractor



Wav2Vec2FeatureExtractor {
  "do_normalize": true,
  "feature_extractor_type": "Wav2Vec2FeatureExtractor",
  "feature_size": 1,
  "padding_side": "right",
  "padding_value": 0.0,
  "return_attention_mask": false,
  "sampling_rate": 16000
}

In [10]:
max_duration = 5.0  # seconds

In [11]:
def preprocess_function(examples):
    audio_arrays = [x["array"] for x in examples["audio"]]
    inputs = feature_extractor(
        audio_arrays, 
        sampling_rate=feature_extractor.sampling_rate, 
        max_length=int(feature_extractor.sampling_rate * max_duration), 
        truncation=True, 
    )
    return inputs

In [12]:
encoded_dataset = dataset.map(preprocess_function, remove_columns=["audio"], batched=True)
encoded_dataset
encoded_dataset.set_format(type="torch")
encoded_dataset

DatasetDict({
    train: Dataset({
        features: ['label', 'input_values'],
        num_rows: 53868
    })
    validation: Dataset({
        features: ['label', 'input_values'],
        num_rows: 10798
    })
    test: Dataset({
        features: ['label', 'input_values'],
        num_rows: 4634
    })
})

In [13]:
model_name = model_checkpoint.split("/")[-1]

In [14]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModel  # or Wav2Vec2Model
# model_checkpoint = "facebook/wav2vec2-base"  # already defined in your notebook

class Wav2Vec2AudioEncoder(nn.Module):
    """
    Encodes raw wav input_values (B, T) into pooled embeddings (B, D).
    - Uses the pretrained Wav2Vec2 base model.
    - Applies masked mean pooling over time using attention_mask if provided.
    - Optionally projects to a smaller dimension and can freeze feature extractor.
    """
    def __init__(self, model_checkpoint="facebook/wav2vec2-base", projection_dim=None, freeze_feature_extractor=True):
        super().__init__()
        self.model = AutoModel.from_pretrained(model_checkpoint)
        self.hidden_size = self.model.config.hidden_size
        self.out_dim = projection_dim or self.hidden_size
        if projection_dim is not None:
            self.projector = nn.Linear(self.hidden_size, projection_dim)
        else:
            self.projector = None

        if freeze_feature_extractor:
            # Freeze convolutional feature extractor parameters to save memory/compute
            if hasattr(self.model, "feature_extractor"):
                for p in self.model.feature_extractor.parameters():
                    p.requires_grad = False

    def forward(self, input_values, attention_mask=None):
        """
        input_values: torch.FloatTensor shape (B, T) or (B, 1, T) depending on model/feature_extractor
        attention_mask: torch.LongTensor shape (B, T_model) matching model output time-steps (optional)
        returns: embeddings (B, out_dim)
        """
        outputs = self.model(input_values, attention_mask=attention_mask, return_dict=True)
        # last_hidden_state: (B, time, hidden)
        hidden = outputs.last_hidden_state

        if attention_mask is not None:
            # attention_mask shape may be (B, seq_len); expand for hidden dims
            mask = attention_mask.unsqueeze(-1).to(hidden.dtype)  # (B, seq_len, 1)
            summed = (hidden * mask).sum(dim=1)  # (B, hidden)
            lengths = mask.sum(dim=1)  # (B, 1)
            pooled = summed / lengths.clamp(min=1e-9)
        else:
            pooled = hidden.mean(dim=1)

        if self.projector is not None:
            pooled = self.projector(pooled)

        return pooled  # (B, out_dim)

In [15]:
class Wav2Vec2Classifier(nn.Module):
    def __init__(self, encoder: Wav2Vec2AudioEncoder, num_labels: int, dropout=0.1):
        super().__init__()
        self.encoder = encoder
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(encoder.out_dim, num_labels)

    def forward(self, input_values, attention_mask=None, labels=None):
        emb = self.encoder(input_values, attention_mask=attention_mask)  # (B, D)
        logits = self.classifier(self.dropout(emb))  # (B, num_labels)
        loss = None
        if labels is not None:
            loss = F.cross_entropy(logits, labels)
            return {"loss": loss, "logits": logits}
        return {"logits": logits}

In [16]:
# num_labels = len(pred_label)
# model = AutoModelForAudioClassification.from_pretrained(
#     model_checkpoint,
#     num_labels=num_labels,
#     label2id=label2id,
#     pred_label=pred_label,
# )

In [17]:
# after you create pred_label/label2id and encoded_dataset.set_format(type="torch")
num_labels = len(pred_label)
encoder = Wav2Vec2AudioEncoder(model_checkpoint=model_checkpoint, projection_dim=None, freeze_feature_extractor=True)
model = Wav2Vec2Classifier(encoder, num_labels=num_labels)

# Then pass `model` to HuggingFace Trainer; ensure your dataset items contain:
# - "input_values": torch.FloatTensor (B, T)
# - "attention_mask": torch.LongTensor (B, T)  <-- optional but recommended
# - "labels": torch.LongTensor (B,)  (integer class ids)


In [18]:
from torch.nn.utils.rnn import pad_sequence

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
batch = encoded_dataset["test"][0:4]
# Suppose batch['input_values'] is a Python list of 1D tensors or numpy arrays
seqs = []
for v in batch["input_values"]:
    if isinstance(v, torch.Tensor):
        seqs.append(v)
    else:
        seqs.append(torch.tensor(v, dtype=torch.float32))

# seqs is a list of 1D tensors with different lengths
padded = pad_sequence(seqs, batch_first=True, padding_value=0.0)  # (B, T_max)
lengths = torch.tensor([s.size(0) for s in seqs], dtype=torch.long)

# build attention mask: 1 for real samples, 0 for padding
max_len = padded.size(1)
mask = (torch.arange(max_len).unsqueeze(0) < lengths.unsqueeze(1)).long()  # (B, T_max)

# move to device
input_values = padded.to(device)
attention_mask = mask.to(device)

print("input_values.shape:", input_values.shape)
print("attention_mask.shape:", attention_mask.shape)

input_values.shape: torch.Size([4, 44073])
attention_mask.shape: torch.Size([4, 44073])


In [19]:
# args = TrainingArguments(
#     f"{model_name}-finetuned-ks",
#     eval_strategy = "epoch",
#     save_strategy = "epoch",
#     learning_rate=3e-5,
#     per_device_train_batch_size=batch_size,
#     gradient_accumulation_steps=4,
#     per_device_eval_batch_size=batch_size,
#     num_train_epochs=5,
#     warmup_ratio=0.1,
#     logging_steps=10,
#     load_best_model_at_end=True,
#     metric_for_best_model="accuracy",
    
# )

In [20]:
args = TrainingArguments(
    f"{model_name}-finetuned-ks",
    eval_strategy = "epoch",
    save_strategy = "epoch",
    learning_rate=3e-5,
    # Keep per-device batch small for 4GB GPUs; adjust as needed
    per_device_train_batch_size=4,
    gradient_accumulation_steps=4,
    per_device_eval_batch_size=4,
    num_train_epochs=5,
    warmup_ratio=0.1,
    logging_steps=10,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    # Enable mixed precision and gradient checkpointing to reduce memory footprint
    fp16=True,
)

In [21]:
def compute_metrics(eval_pred):
    """Computes accuracy on a batch of predictions"""
    predictions = np.argmax(eval_pred.predictions, axis=1)
    return metric.compute(predictions=predictions, references=eval_pred.label_ids)

In [22]:
# trainer = Trainer(
#     model,
#     args,
#     train_dataset=encoded_dataset["train"],
#     eval_dataset=encoded_dataset["validation"],
#     processing_class=feature_extractor,
#     compute_metrics=compute_metrics
# )

In [23]:
trainer.train()

Epoch,Training Loss,Validation Loss


KeyboardInterrupt: 