### 配置相关环境

!pip install -r requirement.txt

In [1]:
import numpy as np
import pandas as pd

from pathlib import Path
from tqdm import tqdm

import torchaudio
from sklearn.model_selection import train_test_split

import librosa
import IPython.display as ipd

import os
import sys

### 确定文件路径，并采用单一特征向量进行学习

In [5]:
ravdess_directory_list = os.listdir('data/ravdess/Audio_Speech_Actors_01-24')

file_emotion = [] # emotion和path
file_path = []
for dir in ravdess_directory_list:
    # as their are 20 different actors in our previous directory we need to extract files for each actor.
    actor = os.listdir(os.path.join('data', 'ravdess', 'Audio_Speech_Actors_01-24', dir))
    for file in actor:
        part = file.split('.')[0]
        part = part.split('-')
        # third part in each file represents the emotion associated to that file.
        file_emotion.append(int(part[2]))
        file_path.append('data/ravdess/Audio_Speech_Actors_01-24' + dir + '/' + file)

# dataframe for emotion of files
emotion_df = pd.DataFrame(file_emotion, columns=['Emotions'])

# dataframe for path of files.
path_df = pd.DataFrame(file_path, columns=['Path'])
Ravdess_df = pd.concat([emotion_df, path_df], axis=1)

# changing integers to actual emotions.
Ravdess_df.Emotions.replace({1:'neutral', 2:'calm', 3:'happy', 4:'sad', 5:'angry', 6:'fear', 7:'disgust', 8:'surprise'}, inplace=True)
Ravdess_df.head(20)

FileNotFoundError: [Errno 2] No such file or directory: 'content/drive/MyDrivedata/ravdess/Audio_Speech_Actors_01-24/Actor_20'

In [None]:
data = []
emotion_map = {
    '01': 'neutral',
    '02': 'calm',
    '03': 'happy',
    '04': 'sad',
    '05': 'angry',
    '06': 'fear',
    '07': 'disgust',
    '08': 'surprise'
}
# intensity_map = {
#     '01': 'normal',
#     '02': 'strong'
# }
for path in tqdm(Path("data/ravdess/Audio_Speech_Actors_01-24").glob("**/*.wav")):

    identifiers = (str(path).split('.')[0]).split('/')[-1]
    label = identifiers .split('-')[2] #why not 2?
    # intensity = identifiers .split('-')[4]
    try:
        # There are some broken files
        s = torchaudio.load(path)
        data.append({
            "emotion": emotion_map[label],
            # "intensity": intensity_map[intensity],
            "path": path
        })
    except Exception as e:
        # print(str(path), e)
        pass
df = pd.DataFrame(data)
df.head(20)

1440it [00:02, 527.07it/s]


Unnamed: 0,emotion,path
0,fear,data/ravdess/Audio_Speech_Actors_01-24/Actor_0...
1,neutral,data/ravdess/Audio_Speech_Actors_01-24/Actor_0...
2,fear,data/ravdess/Audio_Speech_Actors_01-24/Actor_0...
3,neutral,data/ravdess/Audio_Speech_Actors_01-24/Actor_0...
4,fear,data/ravdess/Audio_Speech_Actors_01-24/Actor_0...
5,neutral,data/ravdess/Audio_Speech_Actors_01-24/Actor_0...
6,disgust,data/ravdess/Audio_Speech_Actors_01-24/Actor_0...
7,neutral,data/ravdess/Audio_Speech_Actors_01-24/Actor_0...
8,disgust,data/ravdess/Audio_Speech_Actors_01-24/Actor_0...
9,calm,data/ravdess/Audio_Speech_Actors_01-24/Actor_0...


In [None]:
idx = np.random.randint(0, len(df))
sample = df.iloc[idx]
path = sample["path"]
label = sample["emotion"]


print(f"ID Location: {idx}")
print(f"      Label: {label}")
print()

# It returns a tensor (waveform, sample_rate)
# containing waveform samples and an integer sample rate (sr).
speech, sr = torchaudio.load(path)
# waveform is a tensor of shape (channels, samples)
# speech[0] returns channel
speech = speech[0].numpy().squeeze()

# change the sampling rate from its original sr to 16 kHz.
# wav2vec expects 16 kHz audio.
speech = librosa.resample(y=speech, orig_sr=sr, target_sr=16_000)

# wraps the resulting NumPy array into an IPython display object
ipd.Audio(data=np.asarray(speech), autoplay=True, rate=16000)

ID Location: 1126
      Label: angry



In [None]:
save_path = "data/split_data"
Path(save_path).mkdir(parents=True, exist_ok=True)

train_df, test_df = train_test_split(df, test_size=0.2, random_state=101, stratify=df["emotion"])

train_df = train_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

train_df.to_csv(f"{save_path}/train.csv", sep="\t", encoding="utf-8", index=False)
test_df.to_csv(f"{save_path}/test.csv", sep="\t", encoding="utf-8", index=False)


print(train_df.shape)
print(test_df.shape)

(1152, 2)
(288, 2)


In [None]:
from datasets import load_dataset


data_files = {
    "train": "data/split_data/train.csv",
    "validation": "data/split_data/test.csv",
}

dataset = load_dataset("csv", data_files=data_files, delimiter="\t", )
train_dataset = dataset["train"]
eval_dataset = dataset["validation"]

print(train_dataset)
print(eval_dataset)

Generating train split: 0 examples [00:00, ? examples/s]

Generating validation split: 0 examples [00:00, ? examples/s]

Dataset({
    features: ['emotion', 'path'],
    num_rows: 1152
})
Dataset({
    features: ['emotion', 'path'],
    num_rows: 288
})


In [None]:
input_column  = "path"
# output_columns = ["emotion", "speaker_id"]
output_column = 'emotion'

In [None]:
emotion_list = train_dataset.unique("emotion")
emotion_list.sort()  # Let's sort it for determinism
num_emotions = len(emotion_list)
print(f"A classification problem with {num_emotions} emotions: {emotion_list}")
# intensity_list = train_dataset.unique("intensity")
# intensity_list.sort()
# num_intensity = len(intensity_list)
# print(f"A multi-task classification problem with {num_emotions} emotions: {emotion_list} and {num_intensity} intensity levels: {intensity_list}")

A classification problem with 8 emotions: ['angry', 'calm', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']


### 确定8种情绪分类

In [None]:
!pip install transformers

Looking in indexes: http://mirrors.aliyun.com/pypi/simple


In [None]:
from transformers import AutoConfig, Wav2Vec2Processor

In [None]:
# 本地加载路径！
model_name_or_path = "wav2vec2-base-960h"
pooling_mode = "mean"

In [None]:
# 假设的情感标签列表和强度标签列表，实际使用时替换为真实数据
emotion_list = ["angry", "calm", "disgust", "fear", "happy", "neutral", "sad", "surprise"]
# intensity_list = ["normal", "strong"]

# Create separate label mappings for each task
emotion_label2id = {label: i for i, label in enumerate(emotion_list)}
emotion_id2label = {i: label for i, label in enumerate(emotion_list)}

# intensity_label2id = {label: i for i, label in enumerate(intensity_list)}
# intensity_id2label = {i: label for i, label in enumerate(intensity_list)}

# 打印情感标签映射字典
print("情感标签到 ID 的映射 (emotion_label2id):")
print(emotion_label2id)
print(emotion_id2label)


# # 打印强度标签映射字典
# print("强度标签到 ID 的映射 (intensity_label2id):")
# print(intensity_label2id)

情感标签到 ID 的映射 (emotion_label2id):
{'angry': 0, 'calm': 1, 'disgust': 2, 'fear': 3, 'happy': 4, 'neutral': 5, 'sad': 6, 'surprise': 7}
{0: 'angry', 1: 'calm', 2: 'disgust', 3: 'fear', 4: 'happy', 5: 'neutral', 6: 'sad', 7: 'surprise'}


In [None]:
# config
# loading all the default hyperparameters and architecture settings
# from a pre-trained Wav2Vec2 checkpoint
config = AutoConfig.from_pretrained(
    model_name_or_path,
    num_emotion_labels=num_emotions,
    # num_intensity_labels=num_intensity,
    emotion_label2id=emotion_label2id,
    emotion_id2label=emotion_id2label,
    # intensity_label2id=intensity_label2id,
    # intensity_id2label=intensity_id2label,
    finetuning_task="wav2vec2_clf",
)
setattr(config, 'pooling_mode', pooling_mode)

In [None]:
# 采样率
processor = Wav2Vec2Processor.from_pretrained(model_name_or_path,)
target_sampling_rate = processor.feature_extractor.sampling_rate
print(f"The target sampling rate: {target_sampling_rate}")

The target sampling rate: 16000


### 数据预处理

In [None]:
import numpy as np
import torchaudio
from datasets import Dataset

def speech_file_to_array_fn(path):
    try:
        # 1. 加载音频文件前检查文件存在性（需导入os）
        import os
        if not os.path.exists(path):
            print(f"File not found: {path}")
            return np.zeros(1000)  # 返回默认数组，是numpy数组类型

        # 2. 加载音频文件
        speech_array, sampling_rate = torchaudio.load(path)

        # 3. 检查音频数据是否为空
        if speech_array.numel() == 0:
            print(f"Empty audio data: {path}")
            return np.zeros(1000)  # numpy数组类型

        # 4. 检查音频维度和长度
        if speech_array.dim() < 2:
            print(f"Invalid audio dimensions: {path}, dim={speech_array.dim()}")
            return np.zeros(1000)  # numpy数组类型

        # 5. 确保音频是单声道
        if speech_array.shape[0] > 1:
            speech_array = torch.mean(speech_array, dim=0, keepdim=True)

        # 6. 检查重采样前的音频长度
        original_length = speech_array.shape[1]
        if original_length == 0:
            print(f"Zero-length audio: {path}")
            return np.zeros(1000)  # numpy数组类型

        # 7. 重采样
        resampler = torchaudio.transforms.Resample(sampling_rate, target_sampling_rate)
        speech = resampler(speech_array).squeeze().numpy()

        # 8. 检查重采样后的音频长度
        if len(speech) == 0:
            print(f"Zero-length audio after resampling: {path}")
            return np.zeros(1000)  # numpy数组类型

        return speech  # 正常情况返回的也是numpy数组类型

    except torchaudio.exceptions.TorchaudioError as te:
        print(f"Torchaudio format error in {path}: {te}")
        return np.zeros(1000)  # numpy数组类型
    except Exception as e:
        print(f"General error processing {path}: {e}")
        return np.zeros(1000)  # numpy数组类型

def label_to_id(label, emotion_list):
    if len(emotion_list) > 0:
        return emotion_list.index(label) if label in emotion_list else -1
    return label

# def preprocess_function(examples): # 必须保证是二维的（着重修改！）
#     # 处理音频
#     speech_list = []
#     for path in examples[input_column]:
#         speech = speech_file_to_array_fn(path)
#         # 移除 flatten：保持 2 维或显式转为 2 维
#         if speech.ndim == 1:
#             speech = speech[np.newaxis, :]  # 转为 [1, sequence_length]
#         speech_list.append(speech)

#     # 处理标签
#     target_list = [label_to_id(label, emotion_list) for label in examples[output_column]]

#     # 处理音频特征
#     result = processor(
#         speech_list,
#         sampling_rate=target_sampling_rate,
#         padding="max_length",
#         max_length=160000,
#         truncation=True,
#         return_attention_mask=True
#     )

#     result["labels"] = list(target_list)
#     return result
def preprocess_function(examples):
    speech_list = []
    for path in examples[input_column]:
        speech = speech_file_to_array_fn(path)
        print(speech)

        # 确保音频是二维数组 [1, sequence_length]
        if speech.ndim == 1:
            speech = speech[np.newaxis, :]  # 添加通道维度

        speech_list.append(speech)  # 这里应该是NumPy数组列表

    # 处理音频特征
    result = processor(
        speech_list,  # 传递NumPy数组列表
        sampling_rate=target_sampling_rate,
        padding="max_length",
        max_length=160000,
        truncation=True,
        return_attention_mask=True
    )

    result["labels"] = [label_to_id(label, emotion_list) for label in examples[output_column]]
    return result

# 增强版预处理函数，添加批次级异常统计
def preprocess_function_with_stats(examples):
    # 处理音频并收集异常信息
    speech_list = []
    path_errors = []
    for path in examples[input_column]:
        speech = speech_file_to_array_fn(path)
        if np.all(speech == 0):  # 检查是否返回了默认空数组
            path_errors.append(path)
        speech_list.append(speech)

    # 打印批次中的异常文件
    if path_errors:
        print(f"Batch errors: {len(path_errors)}/{len(examples[input_column])} files")
        print(f"Error paths: {path_errors[:5]} (total {len(path_errors)})")

    # 处理标签
    target_list = [label_to_id(label, emotion_list) for label in examples[output_column]]

    # 处理音频特征
    result = processor(
        speech_list,
        sampling_rate=target_sampling_rate,
        padding="max_length",
        max_length=160000,
        truncation=True,
        return_attention_mask=True
    )

    result["labels"] = list(target_list)
    return result

# 处理训练集，使用增强版预处理函数
train_dataset = train_dataset.map(
    preprocess_function_with_stats,
    batch_size=100,
    batched=True,
    # num_proc=4  # 暂时禁用多进程，便于调试
)

# 处理评估集
eval_dataset = eval_dataset.map(
    preprocess_function_with_stats,
    batch_size=100,
    batched=True,
    # num_proc=4  # 暂时禁用多进程，便于调试
)

Map:   0%|          | 0/1152 [00:00<?, ? examples/s]

Map:   0%|          | 0/288 [00:00<?, ? examples/s]

In [None]:
!df -h
# !rm -rf /tmp/*

Filesystem      Size  Used Avail Use% Mounted on
overlay          30G   30G  348M  99% /
/dev/md0        7.0T  6.4T  676G  91% /autodl-pub
AutoFS:fs1       10T  4.4T  5.7T  44% /autodl-pub/data
tmpfs            64M     0   64M   0% /dev
shm              60G  4.0K   60G   1% /dev/shm
/dev/sda2       438G   20G  396G   5% /usr/bin/nvidia-smi
tmpfs           504G   12K  504G   1% /proc/driver/nvidia
tmpfs           504G  4.0K  504G   1% /etc/nvidia/nvidia-application-profiles-rc.d
udev            504G     0  504G   0% /dev/nvidia1
tmpfs           504G     0  504G   0% /proc/asound
tmpfs           504G     0  504G   0% /proc/acpi
tmpfs           504G     0  504G   0% /proc/scsi
tmpfs           504G     0  504G   0% /sys/firmware


In [None]:
idx = 2
# print(f"Training input_values: {train_dataset[idx]['input_values']}")
# print(f"Training attention_mask: {train_dataset[idx]['attention_mask']}")
# print(f"Training emotion_labels: {train_dataset[idx]['labels']} - {train_dataset[idx]['emotion']}")
# 注意数据里面有labels对应emotion
# print(f"Training intensity_labels: {train_dataset[idx]['intensity_labels']} - {train_dataset[idx]['intensity']}")

### model阶段

In [None]:
from dataclasses import dataclass
from typing import Optional, Tuple
import torch
from transformers.file_utils import ModelOutput


@dataclass
class SpeechClassifierOutput(ModelOutput):
    loss: Optional[torch.FloatTensor] = None
    logits: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None


In [None]:
import torch
import torch.nn as nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
from transformers import Wav2Vec2Model, Wav2Vec2PreTrainedModel

class FixedWav2Vec2Model(Wav2Vec2Model):
    def forward(
        self,
        input_values,
        attention_mask=None,
        output_attentions=None,
        output_hidden_states=None,
        return_dict=None,
    ):
        # 调试输入形状
        print(f"模型输入形状 (修正前): {input_values.shape}")

        # 严格的维度控制
        if input_values.dim() == 4:
            input_values = input_values.squeeze(1)  # [B,1,1,L] -> [B,1,L]
            print("执行了squeeze(1)操作")
        elif input_values.dim() == 2:
            input_values = input_values.unsqueeze(1)  # [B,L] -> [B,1,L]
            print("执行了unsqueeze(1)操作")

        # 最终维度验证
        assert input_values.dim() == 3, \
            f"输入必须是3D [B,C,L]，实际得到 {input_values.shape}"
        assert input_values.size(1) == 1, \
            f"通道数必须为1，实际得到 {input_values.shape}"

        # 传递参数给父类
        outputs = super().forward(
            input_values,
            attention_mask=attention_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict
        )

        # 调试输出形状
        if return_dict:
            print(f"隐藏状态形状: {outputs.last_hidden_state.shape}")
        else:
            print(f"隐藏状态形状: {outputs[0].shape}")

        return outputs

class Wav2Vec2ForSpeechClassification(Wav2Vec2PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        # 使用严格维度控制的模型
        self.wav2vec2 = FixedWav2Vec2Model(config)
        self.classifier = Wav2Vec2ClassificationHead(config)

        # 初始化参数
        self.num_labels = config.num_labels
        self.pooling_mode = config.pooling_mode
        self.ctc_loss = nn.CTCLoss(blank=config.pad_token_id, reduction='mean')
        self.init_weights()

    def freeze_feature_extractor(self):
        """
        冻结Wav2Vec2的特征提取器部分
        两种实现方式任选其一：
        """
        # 方式1：使用transformers内置方法
        self.wav2vec2.feature_extractor._freeze_parameters()

        # 方式2：手动冻结（更直观）
        for param in self.wav2vec2.feature_extractor.parameters():
            param.requires_grad = False

        # 验证冻结效果
        for name, param in self.wav2vec2.feature_extractor.named_parameters():
            assert not param.requires_grad, f"参数 {name} 未被正确冻结"

    def forward(
        self,
        input_values,
        attention_mask=None,
        output_attentions=None,
        output_hidden_states=None,
        return_dict=None,
        labels=None,
        input_lengths=None,
        label_lengths=None,
    ):
        # 调试原始输入
        print(f"\n===== 前向传播开始 =====")
        print(f"原始输入形状: {input_values.shape}")

        # 使用父类配置决定返回格式
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        # 核心修改：添加维度断言
        assert input_values.dim() in [2, 3, 4], \
            f"非法输入维度: {input_values.shape} (允许2D/3D/4D)"

        # 传递参数给修正后的模型
        outputs = self.wav2vec2(
            input_values=input_values,
            attention_mask=attention_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict
        )

        # 获取隐藏状态（兼容不同返回格式）
        hidden_states = outputs.last_hidden_state if return_dict else outputs[0]
        print(f"特征提取后形状: {hidden_states.shape}")

        # 分类处理
        pooled_output = self.merged_strategy(hidden_states, mode=self.pooling_mode)
        logits = self.classifier(pooled_output)
        print(f"分类输出形状: {logits.shape}")

        # 损失计算
        loss = None
        if labels is not None:
            if input_lengths is not None and label_lengths is not None:
                # CTC损失分支
                ctc_logits = hidden_states.transpose(0, 1)  # [T,N,C]
                ctc_logits = self.classifier(ctc_logits)
                print(f"CTC logits形状: {ctc_logits.shape}")

                loss = self.ctc_loss(
                    ctc_logits.log_softmax(-1),
                    labels,
                    input_lengths,
                    label_lengths
                )
            else:
                # 标准分类损失
                if self.config.problem_type is None:
                    if self.num_labels == 1:
                        self.config.problem_type = "regression"
                    elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                        self.config.problem_type = "single_label_classification"
                    else:
                        self.config.problem_type = "multi_label_classification"

                if self.config.problem_type == "regression":
                    loss = MSELoss()(logits.view(-1, self.num_labels), labels)
                elif self.config.problem_type == "single_label_classification":
                    loss = CrossEntropyLoss()(logits.view(-1, self.num_labels), labels.view(-1))
                elif self.config.problem_type == "multi_label_classification":
                    loss = BCEWithLogitsLoss()(logits, labels)

        # 返回格式处理
        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return {
            "loss": loss,
            "logits": logits,
            "hidden_states": outputs.hidden_states,
            "attentions": outputs.attentions,
        }

    def merged_strategy(self, hidden_states, mode="mean"):
        """严格的维度控制池化方法"""
        print(f"池化前形状: {hidden_states.shape}")

        if hidden_states.dim() == 3:  # [B,T,C]
            if mode == "mean":
                pooled = torch.mean(hidden_states, dim=1)
            elif mode == "sum":
                pooled = torch.sum(hidden_states, dim=1)
            elif mode == "max":
                pooled = torch.max(hidden_states, dim=1)[0]
            else:
                raise ValueError(f"不支持的池化模式: {mode}")
        elif hidden_states.dim() == 4:  # [B,T,L,C]
            print("检测到4D输入，自动降维")
            return self.merged_strategy(hidden_states.mean(dim=2), mode)
        else:
            raise ValueError(f"不支持的hidden_states维度: {hidden_states.dim()}")

        print(f"池化后形状: {pooled.shape}")
        return pooled

In [None]:
def test_forward_compatibility():
    config = Wav2Vec2Config()
    model = Wav2Vec2ForSpeechClassification(config)

    # 测试标准调用
    inputs = torch.rand(1, 16000)
    outputs = model(inputs, output_attentions=True)
    assert outputs.attentions is not None

    # 测试旧版兼容
    old_outputs = model(inputs)
    assert hasattr(old_outputs, "logits")

In [None]:
# import torch
# import torch.nn as nn
# from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

# from transformers.models.wav2vec2.modeling_wav2vec2 import (
#     Wav2Vec2PreTrainedModel,
#     Wav2Vec2Model
# )


# class Wav2Vec2ClassificationHead(nn.Module):
#     """Head for wav2vec classification task."""

#     def __init__(self, config):
#         super().__init__()
#         self.dense = nn.Linear(config.hidden_size, config.hidden_size)
#         self.dropout = nn.Dropout(config.final_dropout)
#         self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

#     def forward(self, features, **kwargs):
#         x = features
#         x = self.dropout(x)
#         x = self.dense(x)
#         x = torch.tanh(x)
#         x = self.dropout(x)
#         x = self.out_proj(x)
#         return x


# class Wav2Vec2ForSpeechClassification(Wav2Vec2PreTrainedModel):
#     def __init__(self, config):
#         super().__init__(config)
#         self.num_labels = config.num_labels
#         self.pooling_mode = config.pooling_mode
#         self.config = config

#         self.wav2vec2 = Wav2Vec2Model(config)
#         self.classifier = Wav2Vec2ClassificationHead(config)

#         self.init_weights()

#     def freeze_feature_extractor(self):
#         self.wav2vec2.feature_extractor._freeze_parameters()

#     def merged_strategy(
#             self,
#             hidden_states,
#             mode="mean"
#     ):
#         if mode == "mean":
#             outputs = torch.mean(hidden_states, dim=1)
#         elif mode == "sum":
#             outputs = torch.sum(hidden_states, dim=1)
#         elif mode == "max":
#             outputs = torch.max(hidden_states, dim=1)[0]
#         else:
#             raise Exception(
#                 "The pooling method hasn't been defined! Your pooling mode must be one of these ['mean', 'sum', 'max']")

#         return outputs

#     def forward(
#             self,
#             input_values,
#             attention_mask=None,
#             output_attentions=None,
#             output_hidden_states=None,
#             return_dict=None,
#             labels=None,
#     ):
#         return_dict = return_dict if return_dict is not None else self.config.use_return_dict
#         outputs = self.wav2vec2(
#             input_values,
#             attention_mask=attention_mask,
#             output_attentions=output_attentions,
#             output_hidden_states=output_hidden_states,
#             return_dict=return_dict,
#         )
#         hidden_states = outputs[0]
#         hidden_states = self.merged_strategy(hidden_states, mode=self.pooling_mode)
#         logits = self.classifier(hidden_states)

#         loss = None
#         if labels is not None:
#             if self.config.problem_type is None:
#                 if self.num_labels == 1:
#                     self.config.problem_type = "regression"
#                 elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
#                     self.config.problem_type = "single_label_classification"
#                 else:
#                     self.config.problem_type = "multi_label_classification"

#             if self.config.problem_type == "regression":
#                 loss_fct = MSELoss()
#                 loss = loss_fct(logits.view(-1, self.num_labels), labels)
#             elif self.config.problem_type == "single_label_classification":
#                 loss_fct = CrossEntropyLoss()
#                 loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
#             elif self.config.problem_type == "multi_label_classification":
#                 loss_fct = BCEWithLogitsLoss()
#                 loss = loss_fct(logits, labels)

#         if not return_dict:
#             output = (logits,) + outputs[2:]
#             return ((loss,) + output) if loss is not None else output

#         return SpeechClassifierOutput(
#             loss=loss,
#             logits=logits,
#             hidden_states=outputs.hidden_states,
#             attentions=outputs.attentions,
#         )


### training阶段

In [None]:
import numpy as np
from transformers import EvalPrediction


def compute_metrics(p: EvalPrediction):
    preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
    preds = np.squeeze(preds) if is_regression else np.argmax(preds, axis=1)

    if is_regression:
        return {"mse": ((preds - p.label_ids) ** 2).mean().item()}
    else:
        return {"accuracy": (preds == p.label_ids).astype(np.float32).mean().item()}

In [None]:
model = Wav2Vec2ForSpeechClassification.from_pretrained(
    model_name_or_path,
    config=config,
)

Some weights of Wav2Vec2ForSpeechClassification were not initialized from the model checkpoint at wav2vec2-base-960h and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1', 'wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
model.freeze_feature_extractor()

### 数据收集器

In [None]:
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
import torch
from transformers import Wav2Vec2Processor

@dataclass
class DataCollatorCTCWithPadding:
    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    padding_value: float = 0.0

    def __call__(self, features: List[Dict[str, Union[List, np.ndarray, torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # 处理每个样本的input_values
        for feature in features:
            input_values = feature["input_values"]

            # 确保input_values是张量
            if isinstance(input_values, list):
                input_values = torch.tensor(input_values)
            elif isinstance(input_values, np.ndarray):
                input_values = torch.from_numpy(input_values)

            # 确保是二维张量 [batch_size, sequence_length]
            if input_values.dim() == 1:
                input_values = input_values.unsqueeze(0)  # 转为 [1, sequence_length]
            elif input_values.dim() >2:
                input_values = input_values.squeeze(1)  # 若为 [1, 1, sequence_length]，压缩到 [1, sequence_length]

            feature["input_values"] = input_values

        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [feature["labels"] for feature in features]

        # 调试：打印输入形状和类型
        for i, feat in enumerate(input_features):
            arr = feat["input_values"]
            print(f"Input {i} shape: {arr.shape} (type: {type(arr)})")

        # 应用填充
        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )

        # 调试：打印填充后的形状
        print(f"Padded input_values shape: {batch['input_values'].shape}")

        # 修改DataCollatorCTCWithPadding类的__call__方法：
#         batch["input_values"] = batch["input_values"].squeeze(1) if batch["input_values"].dim() == 3 else batch["input_values"]
#         input_lengths = [x.size(-1) for x in batch["input_values"]]  # 使用-1获取最后一维

#         d_type = torch.long if isinstance(label_features[0], int) else torch.float

#         # 返回两个？
#         # batch["input_lengths"] = torch.tensor(input_lengths, dtype=torch.long)
#         batch["input_lengths"] = torch.tensor(input_lengths, dtype=torch.long)  # list转Tensor
#         batch["labels"] = torch.tensor(label_features, dtype=d_type)
#         # batch['label_lengths'] = 1
#         # 必须编码为tensor
#         batch["label_lengths"] = torch.ones(len(batch["labels"]), dtype=torch.long)

#         return batch



        # 关键修改1：input_lengths统一为Tensor
        input_lengths = torch.tensor(
            [x.size(-1) for x in batch["input_values"]],
            dtype=torch.long,
            device=batch["input_values"].device  # 保持设备一致
        )

        # 关键修改2：target_lengths改为Tensor（假设所有标签长度=1）
        target_lengths = torch.full(
            size=(len(features),),
            fill_value=1,  # 或根据实际标签长度调整
            dtype=torch.long,
            device=input_lengths.device
        )


        batch.update({
            "input_lengths": input_lengths.squeeze() if input_lengths.dim() > 1 else input_lengths,
            "label_lengths": target_lengths,# 目标和原来要一致？
            "labels": torch.full(
            (len(features),),
            1,  # 或 [len(str(l)) for l in label_features]
            dtype=torch.long,
            device=batch["input_values"].device)
        })
        print(batch)

        return batch


In [None]:
# print("原始样本形状:", train_dataset[0]["input_values"])
test_batch = data_collator([train_dataset[0], train_dataset[1]])
print("Collator输出:", test_batch["input_values"])

Input 0 shape: torch.Size([1, 160000]) (type: <class 'torch.Tensor'>)
Input 1 shape: torch.Size([1, 160000]) (type: <class 'torch.Tensor'>)
Padded input_values shape: torch.Size([2, 1, 160000])
{'input_values': tensor([[[ 5.0068e-05,  5.0068e-05,  5.0068e-05,  ...,  0.0000e+00,
           0.0000e+00,  0.0000e+00]],

        [[-9.0796e-04, -1.8261e-03, -7.2571e-04,  ...,  0.0000e+00,
           0.0000e+00,  0.0000e+00]]]), 'input_lengths': tensor([160000, 160000]), 'label_lengths': tensor([1, 1]), 'labels': tensor([1, 1])}
Collator输出: tensor([[[ 5.0068e-05,  5.0068e-05,  5.0068e-05,  ...,  0.0000e+00,
           0.0000e+00,  0.0000e+00]],

        [[-9.0796e-04, -1.8261e-03, -7.2571e-04,  ...,  0.0000e+00,
           0.0000e+00,  0.0000e+00]]])


In [None]:
# # 假设 processor 是正确初始化的 Wav2Vec2Processor 实例
# data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

In [None]:
# sample_features = [train_dataset[i] for i in range(2)]
# batch = data_collator(sample_features)

# print(f"input_values shape: {batch['input_values'].shape}")
# print(f"input_lengths: {batch['input_lengths']}")
# print(f"labels: {batch['labels']}")

In [None]:
# # 验证单个样本，为什么原来的样本中有labels和attention_mask processor之后都没有了
# sample = train_dataset[0]
# print((sample['labels'])) # 注意只有一个列表

# # 验证processor输出
# sample_processed = processor(
#     sample["input_values"],
#     labels = sample["labels"],  # 显式传递标签
#     sampling_rate=target_sampling_rate,
#     return_tensors="pt",
#     return_attention_mask=True  # 确保生成attention_mask
# )
# print(f"Processor output shape: {sample_processed['input_values'].shape}")

# # # 测试小批次
# sample_features = [train_dataset[i] for i in range(2)]
# batch = data_collator(sample_features)

# print(f"input_values shape: {batch['input_values'].shape}")
# print(f"input_lengths: {batch['input_lengths']}")
# # print(f"attention_mask: {batch['labels']}")
# print("Processor输出键:", sample_processed.keys())  # 应只有input_values

### 训练参数调整

In [None]:
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="/content/wav2vec2-0624",  # 模型保存路径
    per_device_train_batch_size=4,  # 训练批次大小，gpu内存不足可减少该值
    per_device_eval_batch_size=4,  # 评估批次大小
    gradient_accumulation_steps=2,  # 梯度累积步数（等效批次大小=4*2=8）
    evaluation_strategy="steps",  # 按步数评估
    num_train_epochs=1.0,  # 训练轮次，可能学习不足，可增加
    fp16=True,  # 启用混合精度训练
    save_steps=10,  # 每10步保存模型
    eval_steps=10,  # 每10步评估
    logging_steps=10,  # 每10步记录日志
    learning_rate=1e-4,  # 学习率
    save_total_limit=2,  # 最多保存2个检查点
)

### 训练器

In [None]:
from typing import Any, Dict, Union
import torch
import numpy as np
from packaging import version
from torch import nn
from transformers import Trainer, is_apex_available

if is_apex_available():
    from apex import amp

if version.parse(torch.__version__) >= version.parse("1.6"):
    _is_native_amp_available = True
    from torch.cuda.amp import autocast, GradScaler


class CTCTrainer(Trainer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.use_amp = False
        self.scaler = None
        if self.args.fp16 and _is_native_amp_available:
            self.use_amp = True
            self.scaler = GradScaler()

    def training_step(self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]]) -> torch.Tensor:
        model.train()

        # 关键修改1：在准备输入前检查维度
        if inputs["input_values"].dim() == 4:
            inputs["input_values"] = inputs["input_values"].squeeze(1)

        inputs = self._prepare_inputs(inputs)

        if self.use_amp:
            with autocast():
                loss = self.compute_loss(model, inputs)
        else:
            loss = self.compute_loss(model, inputs)

        if self.args.gradient_accumulation_steps > 1:
            loss = loss / self.args.gradient_accumulation_steps

        if self.use_amp:
            self.scaler.scale(loss).backward()
        elif self.use_apex:
            with amp.scale_loss(loss, self.optimizer) as scaled_loss:
                scaled_loss.backward()
        elif self.deepspeed:
            self.deepspeed.backward(loss)
        else:
            loss.backward()

        return loss.detach()

    def optimizer_step(self, *args, **kwargs):
        if self.use_amp:
            self.scaler.step(self.optimizer)
            self.scaler.update()
            self.optimizer.zero_grad()
        else:
            super().optimizer_step(*args, **kwargs)

    def compute_loss(self, model, inputs, return_outputs=False):
        """
        修正后的CTC损失计算函数
        确保处理以下输入格式：
        {
            'input_values': [batch_size, 1, seq_len],
            'labels': [batch_size],
            'input_lengths': [batch_size],
            'label_lengths': [batch_size]
        }
        """
        # 1. 输入验证和维度修正
        required_keys = ["input_values", "labels", "input_lengths", "label_lengths"]
        for key in required_keys:
            if key not in inputs:
                raise ValueError(f"Missing required key '{key}' in inputs. Available keys: {list(inputs.keys())}")

        # 2. 设备转移和维度处理
        device = next(model.parameters()).device
        input_values = inputs["input_values"].to(device)

        # 关键修改2：强制输入为3D [B,1,L]
        if input_values.dim() == 4:
            input_values = input_values.squeeze(1)
        elif input_values.dim() == 2:
            input_values = input_values.unsqueeze(1)

        assert input_values.dim() == 3, f"输入应为3D [B,1,L]，实际得到 {input_values.shape}"

        # 3. 其他张量处理
        labels = inputs["labels"].to(device)
        input_lengths = inputs["input_lengths"].to(device) if isinstance(inputs["input_lengths"], torch.Tensor) \
                       else torch.tensor(inputs["input_lengths"], device=device)
        label_lengths = inputs["label_lengths"].to(device) if isinstance(inputs["label_lengths"], torch.Tensor) \
                       else torch.tensor(inputs["label_lengths"], device=device)

        # 4. 调试信息
        print("\n===== DEBUG INFO =====")
        print(f"Input values shape: {input_values.shape}")
        print(f"Labels: {labels.cpu().numpy()}")
        print(f"Input lengths: {input_lengths.cpu().numpy()}")
        print(f"Label lengths: {label_lengths.cpu().numpy()}")

        # 5. 模型前向传播
        outputs = model(input_values=input_values)
        logits = outputs.logits

        # 6. 形状调整和验证
        logits = logits.transpose(0, 1)  # [T,N,C]
        print(f"Logits shape after transpose: {logits.shape}")

        # 7. CTC损失计算（带增强的错误处理）
        try:
            loss = torch.nn.functional.ctc_loss(
                log_probs=logits.log_softmax(-1),
                targets=labels,
                input_lengths=input_lengths,
                target_lengths=label_lengths,
                blank=model.config.pad_token_id,
                reduction='mean',
                zero_infinity=True
            )
        except Exception as e:
            print("\n!!! CTC LOSS CALCULATION ERROR !!!")
            print(f"Logits shape: {logits.shape}")
            print(f"Logits range: [{logits.min().item():.4f}, {logits.max().item():.4f}]")
            print(f"Labels unique: {torch.unique(labels)}")
            print(f"Input lengths: {input_lengths.cpu().numpy()}")
            print(f"Label lengths: {label_lengths.cpu().numpy()}")
            raise

        print(f"Loss value: {loss.item():.4f}")
        print("=======================")

        return (loss, outputs) if return_outputs else loss

In [None]:
# import os
# os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

In [None]:
# # 示例输入
# input_values = torch.rand(2, 1, 160000)  # [batch_size, channels, seq_len]
# labels = torch.tensor([1, 3])  # 分类标签
# input_lengths = torch.tensor([160000, 160000])  # CTC需要
# label_lengths = torch.tensor([1, 1])  # CTC需要

# # 前向传播
# outputs = model(
#     input_values=input_values,
#     labels=labels,
#     input_lengths=input_lengths,
#     label_lengths=label_lengths
# )

# print(outputs["loss"])  # 打印损失值

In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)
trainer = CTCTrainer(
    model=model,                # 待训练的模型
    data_collator=data_collator,  # 数据收集器
    args=training_args,          # 训练参数
    compute_metrics=compute_metrics,  # 评估指标函数（需自定义）
    train_dataset=train_dataset,    # 训练数据集
    eval_dataset=eval_dataset,      # 评估数据集
    tokenizer=processor.feature_extractor,  # 特征提取器
)

In [None]:
trainer.train()

Input 0 shape: torch.Size([1, 160000]) (type: <class 'torch.Tensor'>)
Input 1 shape: torch.Size([1, 160000]) (type: <class 'torch.Tensor'>)
Input 2 shape: torch.Size([1, 160000]) (type: <class 'torch.Tensor'>)
Input 3 shape: torch.Size([1, 160000]) (type: <class 'torch.Tensor'>)
Padded input_values shape: torch.Size([4, 1, 160000])
{'input_values': tensor([[[5.8648e-04, 6.0610e-04, 2.6287e-04,  ..., 0.0000e+00,
          0.0000e+00, 0.0000e+00]],

        [[3.1735e-06, 3.1735e-06, 3.1273e-06,  ..., 0.0000e+00,
          0.0000e+00, 0.0000e+00]],

        [[3.4287e-04, 3.4287e-04, 3.4287e-04,  ..., 0.0000e+00,
          0.0000e+00, 0.0000e+00]],

        [[2.4523e-04, 2.0184e-04, 4.9197e-04,  ..., 0.0000e+00,
          0.0000e+00, 0.0000e+00]]]), 'input_lengths': tensor([160000, 160000, 160000, 160000]), 'label_lengths': tensor([1, 1, 1, 1]), 'labels': tensor([1, 1, 1, 1])}
Input 0 shape: torch.Size([1, 160000]) (type: <class 'torch.Tensor'>)
Input 1 shape: torch.Size([1, 160000]) (type

RuntimeError: Expected 2D (unbatched) or 3D (batched) input to conv1d, but got input of size: [4, 1, 1, 160000]

In [None]:
# 检查原始数据
sample = train_dataset[0]
# print(f"原始样本形状: {sample['input_values']}")  # 应为 (seq_len,) 或 (1, seq_len)

# 检查DataCollator输出
collator = DataCollatorCTCWithPadding(processor)
batch = collator([train_dataset[i] for i in range(2)])
print(f"Collator输出形状: {batch['input_values'].shape}")  # 应为 [2,1,seq_len]

# 检查模型接收的输入
model = Wav2Vec2ForCTC.from_pretrained("wav2vec2-base-960h")
with torch.no_grad():
    print(f"模型输入形状: {batch['input_values'].shape}")
    outputs = model(batch["input_values"])

Input 0 shape: torch.Size([1, 160000]) (type: <class 'torch.Tensor'>)
Input 1 shape: torch.Size([1, 160000]) (type: <class 'torch.Tensor'>)
Padded input_values shape: torch.Size([2, 1, 160000])
{'input_values': tensor([[[ 5.0068e-05,  5.0068e-05,  5.0068e-05,  ...,  0.0000e+00,
           0.0000e+00,  0.0000e+00]],

        [[-9.0796e-04, -1.8261e-03, -7.2571e-04,  ...,  0.0000e+00,
           0.0000e+00,  0.0000e+00]]]), 'input_lengths': tensor([160000, 160000]), 'label_lengths': tensor([1, 1]), 'labels': tensor([1, 1])}
Collator输出形状: torch.Size([2, 1, 160000])


Some weights of the model checkpoint at wav2vec2-base-960h were not used when initializing Wav2Vec2ForCTC: ['wav2vec2.encoder.pos_conv_embed.conv.weight_g', 'wav2vec2.encoder.pos_conv_embed.conv.weight_v']
- This IS expected if you are initializing Wav2Vec2ForCTC from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing Wav2Vec2ForCTC from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of Wav2Vec2ForCTC were not initialized from the model checkpoint at wav2vec2-base-960h and are newly initialized: ['wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1', 'wav2vec2.masked_spec_embed']
You should probably TRAIN

模型输入形状: torch.Size([2, 1, 160000])


RuntimeError: Expected 2D (unbatched) or 3D (batched) input to conv1d, but got input of size: [2, 1, 1, 160000]

In [None]:
test_dataset = load_dataset("csv", data_files={"test": "/content/data/test.csv"}, delimiter="\t")["test"]
test_dataset

In [None]:
from dataclasses import dataclass
from typing import Optional, Tuple, Dict

@dataclass
class DualTaskOutput:
    """双任务统一输出格式"""
    loss: Optional[torch.FloatTensor] = None
    logits_emotion: torch.FloatTensor = None
    logits_intensity: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None
    metrics: Optional[Dict[str, float]] = None  # 可选：存储计算好的指标

In [None]:
class Wav2Vec2ForDualTask(Wav2Vec2PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        # 初始化主干网络
        self.wav2vec2 = Wav2Vec2Model(config)

        # 双任务分类头
        self.emotion_head = nn.Linear(config.hidden_size, config.num_emotion_labels)
        self.intensity_head = nn.Linear(config.hidden_size, config.num_intensity_labels)

        # 初始化权重
        self.init_weights()

    def forward(self, input_values, attention_mask=None, labels_emotion=None, labels_intensity=None, return_dict=True):
        # 特征提取
        outputs = self.wav2vec2(
            input_values,
            attention_mask=attention_mask,
            return_dict=return_dict
        )

        # 池化特征
        pooled_output = outputs.last_hidden_state.mean(dim=1)

        # 双任务预测
        logits_emotion = self.emotion_head(pooled_output)
        logits_intensity = self.intensity_head(pooled_output)

        # 损失计算
        loss = 0
        if labels_emotion is not None:
            loss += F.cross_entropy(logits_emotion, labels_emotion)
        if labels_intensity is not None:
            if labels_intensity.dtype == torch.long:
                loss += F.cross_entropy(logits_intensity, labels_intensity)
            else:
                loss += F.mse_loss(logits_intensity.squeeze(), labels_intensity.float())

        # 统一输出格式
        if not return_dict:
            return (loss, logits_emotion, logits_intensity) + outputs[2:]

        return DualTaskOutput(
            loss=loss if loss != 0 else None,
            logits_emotion=logits_emotion,
            logits_intensity=logits_intensity,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions
        )

In [None]:
class DualTaskDataCollator:
    def __init__(self, processor):
        self.processor = processor

    def __call__(self, batch):
        # 音频特征处理
        processed = self.processor.pad(
            [{"input_values": x["input_values"]} for x in batch],
            return_tensors="pt"
        )

        # 标签处理
        processed["labels_emotion"] = torch.tensor(
            [x.get("emotion_label", -100) for x in batch],
            dtype=torch.long
        )

        processed["labels_intensity"] = torch.tensor(
            [x.get("intensity_label", 0.0) for x in batch],
            dtype=torch.float if any(isinstance(x.get("intensity_label"), float) for x in batch) else torch.long
        )

        return processed

In [None]:
# 训练器
class DualTaskTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        # 分离标签
        labels_emotion = inputs.pop("labels_emotion", None)
        labels_intensity = inputs.pop("labels_intensity", None)

        # 前向传播
        outputs = model(**inputs, labels_emotion=labels_emotion, labels_intensity=labels_intensity)

        # 统一输出处理
        if return_outputs:
            return (outputs.loss, outputs) if outputs.loss is not None else (0.0, outputs)
        return outputs.loss if outputs.loss is not None else 0.0

    def prediction_step(self, model, inputs, prediction_loss_only=False, ignore_keys=None):
        # 覆盖预测步骤以处理双任务
        inputs = self._prepare_inputs(inputs)
        with torch.no_grad():
            outputs = model(**inputs)

        if prediction_loss_only:
            return (outputs.loss, None, None)

        return (
            outputs.loss,
            {"emotion": outputs.logits_emotion, "intensity": outputs.logits_intensity},
            {"emotion": inputs.get("labels_emotion"), "intensity": inputs.get("labels_intensity")}
        )

In [None]:
# 评估指标
def compute_unified_metrics(pred):
    """处理双任务评估指标"""
    logits, labels = pred
    results = {}

    # 情感任务指标
    if "emotion" in logits:
        preds = logits["emotion"].argmax(-1)
        results.update({
            "emotion_acc": (preds == labels["emotion"]).float().mean().item(),
            "emotion_f1": f1_score(labels["emotion"].cpu(), preds.cpu(), average="macro")
        })

    # 强度任务指标
    if "intensity" in logits:
        if logits["intensity"].dim() > 1:  # 分类任务
            preds = logits["intensity"].argmax(-1)
            results["intensity_acc"] = (preds == labels["intensity"]).float().mean().item()
        else:  # 回归任务
            results.update({
                "intensity_mse": F.mse_loss(logits["intensity"], labels["intensity"].float()).item(),
                "intensity_mae": F.l1_loss(logits["intensity"], labels["intensity"].float()).item()
            })

    return results

In [None]:
# 初始化组件
processor = Wav2Vec2Processor.from_pretrained("wav2vec2-base-960h")
collator = DualTaskDataCollator(processor)
model = Wav2Vec2ForSpeechClassification.from_pretrained(
    "wav2vec2-base-960h",
    config=config,
)
# 训练配置
training_args = TrainingArguments(
    output_dir="./results",
    per_device_train_batch_size=8,
    evaluation_strategy="steps",
    eval_steps=500,
    logging_steps=100,
    learning_rate=5e-5,
    fp16=True,
    save_strategy="steps",
    save_steps=1000,
    metric_for_best_model="emotion_f1",
    load_best_model_at_end=True
)

# 创建训练器
trainer = DualTaskTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    data_collator=collator,
    compute_metrics=compute_unified_metrics
)

# 训练与评估
trainer.train()
eval_results = trainer.evaluate()

Some weights of Wav2Vec2ForSpeechClassification were not initialized from the model checkpoint at wav2vec2-base-960h and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1', 'wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  processed["labels_intensity"] = torch.tensor(


TypeError: forward() got an unexpected keyword argument 'labels_emotion'