In [None]:
import os

def count_files_in_directory(directory):
    total_files = 0

    for root, dirs, files in os.walk(directory):
        total_files += len(files)

    return total_files

path_all_data = 'data/all_data_binarylabel'  # 替换为你要统计的目录的路径

total_files = count_files_in_directory(path_all_data)

print("询问者语音数:", total_files)

In [None]:
%env LC_ALL=C.UTF-8
%env LANG=C.UTF-8
%env TRANSFORMERS_CACHE=cache
%env HF_DATASETS_CACHE=cache
%env CUDA_LAUNCH_BLOCKING=1

In [None]:
import pandas as pd

from pathlib import Path
from tqdm import tqdm

import torchaudio
from sklearn.model_selection import train_test_split

import os

In [None]:
data = []
i=0
for path in tqdm(Path("data/all_data_binarylabel/").glob("**/*.wav")):
    name = str(path).split('/')[-1].split('.')[0]
    label = str(path).split('/')[-2]
    # print('name:',name,'label:',label)
    i=i+1

    # 尝试使用torchaudio.load加载音频文件，果加载成功，将音频文件的名称、路径和情感标签存储为一个字典，并将该字典添加到data列表中
    try:
        # There are some broken files
        s = torchaudio.load(path)
        data.append({
            "name": name,
            "path": path,
            "emotion": label
        })
    except Exception as e:
        print(str(path), e)
        pass

print(len(data),i)
    # break

In [None]:
df = pd.DataFrame(data)
df.head()

In [None]:
# Filter broken and non-existed paths

print(f"Step 0: {len(df)}")

df["status"] = df["path"].apply(lambda path: True if os.path.exists(path) else None)
df = df.dropna(subset=["path"])
df = df.drop(labels="status", axis=1)
print(f"Step 1: {len(df)}")

df = df.sample(frac=1)
df = df.reset_index(drop=True)
df.head()

Let's explore how many labels (emotions) are in the dataset with what distribution.

In [None]:
print("Labels: ", df["emotion"].unique())
print()
df.groupby("emotion").count()[["path"]]

Let's display some random sample of the dataset and run it a couple of times to get a feeling for the audio and the emotional label.

In [None]:
import torchaudio
import librosa
import IPython.display as ipd
import numpy as np

idx = np.random.randint(0, len(df))
sample = df.iloc[idx]
path = sample["path"]
label = sample["emotion"]


print(f"ID Location: {idx}")
print(f"      Label: {label}")
print()

speech, sr = torchaudio.load(path)
speech = speech[0].numpy().squeeze()
speech = librosa.resample(np.asarray(speech),orig_sr= sr, target_sr=16_000)
ipd.Audio(data=np.asarray(speech), autoplay=True, rate=16000)

For training purposes, we need to split data into train test sets; in this specific example, we break with a 20% rate for the test set.

In [None]:
save_path = "data/all_data_binarylabel"

train_df, test_df = train_test_split(df, test_size=0.2, random_state=101, stratify=df["emotion"])

train_df = train_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

train_df.to_csv(f"{save_path}/train.csv", sep="\t", encoding="utf-8", index=False)
test_df.to_csv(f"{save_path}/test.csv", sep="\t", encoding="utf-8", index=False)

print(train_df.shape)
print(test_df.shape)

Prepare Data for Training

In [None]:
# Loading the created dataset using datasets
from datasets import load_dataset, load_metric


data_files = {
    "train": "data/all_data_binarylabel/train.csv",
    "validation": "data/all_data_binarylabel/test.csv",
}

dataset = load_dataset("csv", data_files=data_files, delimiter="\t", )
train_dataset = dataset["train"]
eval_dataset = dataset["validation"]

print(train_dataset)
print(eval_dataset)

In [None]:
# We need to specify the input and output column
input_column = "path"
output_column = "emotion"

In [None]:
# we need to distinguish the unique labels in our SER dataset
label_list = train_dataset.unique(output_column)
label_list.sort()  # Let's sort it for determinism
num_labels = len(label_list)
print(f"A classification problem with {num_labels} classes: {label_list}")

In [None]:
# !source /etc/network_turbo

from transformers import AutoConfig, AutoFeatureExtractor
model_name_or_path = "models/openai-whisper-large-v3"
pooling_mode = "mean"

# config
config = AutoConfig.from_pretrained(
    model_name_or_path,
    num_labels=num_labels,
    label2id={label: i for i, label in enumerate(label_list)},
    id2label={i: label for i, label in enumerate(label_list)},
    finetuning_task="whisper_clf",
)
setattr(config, 'pooling_mode', pooling_mode)

In [None]:
feature_extractor = AutoFeatureExtractor.from_pretrained(model_name_or_path,)
target_sampling_rate = feature_extractor.sampling_rate
print(f"The target sampling rate: {target_sampling_rate}")

In [None]:
def speech_file_to_array_fn(path):
    speech_array, sampling_rate = torchaudio.load(path)
    resampler = torchaudio.transforms.Resample(sampling_rate, target_sampling_rate)
    speech = resampler(speech_array).squeeze().numpy()
    return speech

def label_to_id(label, label_list):
    if len(label_list) > 0:
        return label_list.index(label) if label in label_list else -1

    return label

def preprocess_function(examples):
    speech_list = [speech_file_to_array_fn(path) for path in examples[input_column]]
    target_list = [label_to_id(label, label_list) for label in examples[output_column]]

    res = {}
    result = feature_extractor(speech_list, return_tensors="pt", sampling_rate=target_sampling_rate)
    res['input_features'] = result.input_features
    res["labels"] = list(target_list)

    return res

In [None]:
train_dataset = train_dataset.map(
    preprocess_function,
    batch_size=5,
    batched=True,
    num_proc=4
)
eval_dataset = eval_dataset.map(
    preprocess_function,
    batch_size=5,
    batched=True,
    num_proc=4
)

In [None]:
idx = 6
print(len(train_dataset))
print(f"Training input_values: ", len(train_dataset[idx]['input_features']))
print(f"Training labels: {train_dataset[idx]['labels']} - {train_dataset[idx]['emotion']}")

Great, now we've successfully read all the audio files, resampled the audio files to 16kHz, and mapped each audio to the corresponding label.

In [None]:
from dataclasses import dataclass
from typing import Optional, Tuple
import torch
from transformers.file_utils import ModelOutput


@dataclass
class SpeechClassifierOutput(ModelOutput):
    loss: Optional[torch.FloatTensor] = None
    logits: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None


In [None]:
import torch
import torch.nn as nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

from transformers.models.whisper.modeling_whisper import WhisperModel, WhisperPreTrainedModel

# 这个头部模块负责将从Whisper模型的特征中提取的音频表示转化为用于进行分类任务的输出
class WhisperClassificationHead(nn.Module):
    """Head for wav2vec classification task."""

    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(0.)
        self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

    # 这个方法定义了数据在模块中的前向传播过程。
    # 它首先通过self.dropout应用丢弃以随机丢弃一些特征，然后通过self.dense进行线性映射
    # 并应用tanh激活函数，最后通过self.out_proj进行线性映射，以生成分类任务的输出分数。
    def forward(self, features, **kwargs):
        x = features
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x

# 将Whisper的特征提取器和分类头部组合在一起，以创建一个端到端的语音分类模型。
class WhisperForSpeechClassification(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.num_labels = config.num_labels
        self.pooling_mode = config.pooling_mode
        self.config = config

        self.whisper = WhisperModel.from_pretrained(model_name_or_path)
        self.classifier = WhisperClassificationHead(config)

        self._init_weights(self)
    
    def _init_weights(self, module):
        std = self.config.init_std
        if isinstance(module, (nn.Linear, nn.Conv1d)):
            module.weight.data.normal_(mean=0.0, std=std)
            if module.bias is not None:
                module.bias.data.zero_()
        elif isinstance(module, nn.Embedding):
            module.weight.data.normal_(mean=0.0, std=std)
            if module.padding_idx is not None:
                module.weight.data[module.padding_idx].zero_()

    def freeze_feature_extractor(self):
        self.whisper.encoder._freeze_parameters()

    def merged_strategy(
            self,
            hidden_states,
            mode="mean"
    ):
        if mode == "mean":
            outputs = torch.mean(hidden_states, dim=1)
        elif mode == "sum":
            outputs = torch.sum(hidden_states, dim=1)
        elif mode == "max":
            outputs = torch.max(hidden_states, dim=1)[0]
        else:
            raise Exception(
                "The pooling method hasn't been defined! Your pooling mode must be one of these ['mean', 'sum', 'max']")

        return outputs

    def forward(
            self,
            input_features,
            labels=None
    ):
        decoder_input_ids = torch.ones([input_features.shape[0], 1], dtype=torch.long) * self.whisper.config.decoder_start_token_id
        decoder_input_ids = decoder_input_ids.cuda()
        hidden_states = self.whisper(input_features, decoder_input_ids=decoder_input_ids).last_hidden_state
        hidden_states = self.merged_strategy(hidden_states, mode=self.pooling_mode)
        logits = self.classifier(hidden_states)
        
        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)
        
        output = (logits,)
        return ((loss,) + output) if loss is not None else output

In [None]:
from dataclasses import dataclass
from typing import Dict, List, Union
import torch


@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
        max_length (:obj:`int`, `optional`):
            Maximum length of the ``input_values`` of the returned list and optionally padding length (see above).
        max_length_labels (:obj:`int`, `optional`):
            Maximum length of the ``labels`` returned list and optionally padding length (see above).
        pad_to_multiple_of (:obj:`int`, `optional`):
            If set will pad the sequence to a multiple of the provided value.
            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.5 (Volta).
    """
    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # input_features = [{"input_features": feature["input_features"]} for feature in features]
        input_features = [feature["input_features"] for feature in features]
        label_features = [feature["labels"] for feature in features]

        d_type = torch.long if isinstance(label_features[0], int) else torch.float

        batch = {}
        batch['input_features'] = torch.tensor(input_features, dtype=torch.half)
        batch["labels"] = torch.tensor(label_features, dtype=d_type)

        return batch

In [None]:
data_collator = DataCollatorCTCWithPadding()

Next, the evaluation metric is defined. There are many pre-defined metrics for classification/regression problems, but in this case, we would continue with just Accuracy for classification and MSE for regression. You can define other metrics on your own.

In [None]:
is_regression = False

In [None]:
import numpy as np
from transformers import EvalPrediction
from sklearn.metrics import f1_score, recall_score

def compute_metrics(p: EvalPrediction):
    preds = p.predictions[0] if (isinstance(p.predictions, tuple) or isinstance(p.predictions, list)) else p.predictions
    preds = np.squeeze(preds) if is_regression else np.argmax(preds, axis=1)

    if is_regression:
        mse = ((preds - p.label_ids) ** 2).mean().item()
        return {"mse": mse}
    else:
        accuracy = (preds == p.label_ids).astype(np.float32).mean().item()
        f1 = f1_score(p.label_ids, preds, average='weighted')
        recall = recall_score(p.label_ids, preds, average='weighted')
        print({"accuracy": accuracy, "f1": f1, "recall": recall})

        return {"accuracy": accuracy, "f1": f1, "recall": recall}

In [None]:
model = WhisperForSpeechClassification(config)

In [None]:
model.freeze_feature_extractor()

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="outputs/all_data_whisper_large_output",
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    gradient_accumulation_steps=2,
    evaluation_strategy="steps",
    num_train_epochs=1.0,
    fp16=True,
    save_steps=10,
    eval_steps=10,
    logging_steps=10,
    learning_rate=1e-4,
    save_total_limit=2,
)

For future use we can create our training script, we do it in a simple way. You can add more on you own.

In [None]:
from typing import Any, Dict, Union

import torch
from packaging import version
from torch import nn

from transformers import (
    Trainer,
    is_apex_available,
)

if is_apex_available():
    from apex import amp

if version.parse(torch.__version__) >= version.parse("1.6"):
    _is_native_amp_available = True
    from torch.cuda.amp import autocast


class CTCTrainer(Trainer):
    def training_step(self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]]) -> torch.Tensor:
        """
        Perform a training step on a batch of inputs.

        Subclass and override to inject custom behavior.

        Args:
            model (:obj:`nn.Module`):
                The model to train.
            inputs (:obj:`Dict[str, Union[torch.Tensor, Any]]`):
                The inputs and targets of the model.

                The dictionary will be unpacked before being fed to the model. Most models expect the targets under the
                argument :obj:`labels`. Check your model's documentation for all accepted arguments.

        Return:
            :obj:`torch.Tensor`: The tensor with training loss on this batch.
        """
        model.train()
        inputs = self._prepare_inputs(inputs)

        with self.compute_loss_context_manager():
            loss = self.compute_loss(model, inputs)
        
        if self.args.gradient_accumulation_steps > 1:
            loss = loss / self.args.gradient_accumulation_steps

        if self.use_apex:
            with amp.scale_loss(loss, self.optimizer) as scaled_loss:
                scaled_loss.backward()
        else:
            self.accelerator.backward(loss)

        return loss.detach()


Now, all instances can be passed to Trainer and we are ready to start training!

In [None]:
trainer = CTCTrainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    tokenizer=feature_extractor,
)

In [None]:
trainer.train()

In [None]:
metrics = trainer.evaluate()
max_eval_samples = len(eval_dataset)
metrics["eval_samples"] = min(max_eval_samples, len(eval_dataset))

trainer.log_metrics("eval", metrics)
trainer.save_metrics("eval", metrics)