In [1]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from transformers import Wav2Vec2ForCTC, Wav2Vec2FeatureExtractor, Wav2Vec2Processor, BertModel, BertTokenizer
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from torch.utils.data import Dataset, TensorDataset, DataLoader  # 确保导入 DataLoader
import librosa

# 检查是否能够使用GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cuda


In [2]:
# 加载模型（一次加载）
model_english = Wav2Vec2ForCTC.from_pretrained("./wav2vec2-base-960h").to(device)
processor_english = Wav2Vec2Processor.from_pretrained("./wav2vec2-base-960h")
model_chinese = Wav2Vec2ForCTC.from_pretrained("./wav2vec2-large-xlsr-53-chinese-zh-cn").to(device)
processor_chinese = Wav2Vec2Processor.from_pretrained("./wav2vec2-large-xlsr-53-chinese-zh-cn")

_model = Wav2Vec2ForCTC.from_pretrained("./wav2vec2-xls-r-300m").to(device)
feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained("./wav2vec2-xls-r-300m")
tokenizer = BertTokenizer.from_pretrained("./bert-base-multilingual-cased")
bert_model = BertModel.from_pretrained("./bert-base-multilingual-cased").to(device)

# 读取CSV文件
attributes_file = "CBU0521DD_stories_attributes.csv"
df = pd.read_csv(attributes_file)
print("Dataframe loaded successfully.")
print(df.head())

Some weights of Wav2Vec2ForCTC were not initialized from the model checkpoint at ./wav2vec2-base-960h and are newly initialized: ['wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  return self.fget.__get__(instance, owner)()
Some weights of Wav2Vec2ForCTC were not initialized from the model checkpoint at ./wav2vec2-xls-r-300m and are newly initialized: ['lm_head.bias', 'lm_head.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Dataframe loaded successfully.
    filename Language  Story_type
0  00001.wav  Chinese  True Story
1  00002.wav  Chinese  True Story
2  00003.wav  Chinese  True Story
3  00004.wav  Chinese  True Story
4  00005.wav  Chinese  True Story


In [3]:
# 定义函数来提取音频特征
def extract_audio_features(waveform, model, processor):
    inputs = processor(waveform, return_tensors="pt", sampling_rate=16000, padding=True).to(device)
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits  # 获取模型的logits输出
    # 压平，忽略 batch_size 维度
    return logits.flatten(start_dim=1).cpu().numpy()  # 如果需要保存为numpy数组，可以进行转换

# 定义函数来转录音频文本
def transcribe_audio(waveform, model, processor):
    inputs = processor(waveform, sampling_rate=16000, return_tensors="pt", padding=True).to(device)
    with torch.no_grad():
        logits = model(**inputs).logits
    # 获取预测的ID
    predicted_ids = torch.argmax(logits, dim=-1)
    # 使用分词器将预测的ID转换为文本
    transcription = processor.batch_decode(predicted_ids)
    return transcription[0]

# 定义函数来提取文本特征
def extract_text_features(text, tokenizer, model):
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True).to(device)
    with torch.no_grad():
        outputs = model(**inputs)
    return outputs.last_hidden_state[:, 0, :].cpu().numpy()

In [4]:
# 创建数据集类
class AudioDataset(Dataset):
    def __init__(self, dataframe, audio_dir):
        self.dataframe = dataframe
        self.audio_dir = audio_dir
        self.count = 0

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        file_name = row['filename']
        language = row['Language']
        story_type = row['Story_type']
        file_path = os.path.join(self.audio_dir, file_name)

        # 检查文件是否存在
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"文件 {file_path} 不存在")

        # 加载音频文件并重采样到16kHz
        waveform, sampling_rate = librosa.load(file_path, sr=16000)

        # 提取音频特征
        audio_feature = extract_audio_features(waveform, _model, feature_extractor)

        # 转录音频文本
        if language == "English":
            transcription = transcribe_audio(waveform, model_english, processor_english)
        else:
            transcription = transcribe_audio(waveform, model_chinese, processor_chinese)
        
        if self.count < 3:
            self.count += 1
            print(file_name, " ", language)
            print("语音转文字结果:", transcription)

        # 提取文本特征
        text_feature = extract_text_features(transcription, tokenizer, bert_model)

        # 将音频特征和文本特征填充到统一大小
        audio_feature = self.pad_or_truncate_audio(audio_feature, 16384)

        # print(audio_feature.shape, text_feature.shape)

        label = 1 if story_type == "True Story" else 0

        return audio_feature, text_feature, label
    
    def pad_or_truncate_audio(self, audio_feature, target_length):
        """
        对音频特征进行填充或裁剪，确保其长度一致。
        """
        current_length = audio_feature.shape[-1]
        
        if current_length < target_length:
            # 填充
            padding = target_length - current_length
            audio_feature = np.pad(audio_feature, ((0, 0), (0, padding)), mode='constant')
        elif current_length > target_length:
            # 裁剪操作：使用均值池化，按目标长度进行聚合
            # 假设audio_feature的形状是 (batch_size, feature_dim)，可以通过在最后一维做池化
            step_size = current_length // target_length
            pooled_audio = []
            for i in range(target_length):
                start = i * step_size
                end = (i + 1) * step_size
                pooled_audio.append(np.mean(audio_feature[:, start:end], axis=1))  # 沿着特征维度求均值
            audio_feature = np.stack(pooled_audio, axis=1)  # 将均值结果堆叠在一起

        return audio_feature

In [5]:
df['stratify_col'] = df['Language'] + "_" + df['Story_type']

# 使用train_test_split进行分层抽样
train_df, test_df = train_test_split(df, test_size=0.2, stratify=df['stratify_col'], random_state=42)

# 打印分层结果
print("训练集语言和故事类型分布：")
print(train_df['stratify_col'].value_counts())

print("\n测试集语言和故事类型分布：")
print(test_df['stratify_col'].value_counts())

训练集语言和故事类型分布：
stratify_col
English_True Story         20
English_Deceptive Story    20
Chinese_True Story         20
Chinese_Deceptive Story    20
Name: count, dtype: int64

测试集语言和故事类型分布：
stratify_col
English_Deceptive Story    5
Chinese_Deceptive Story    5
Chinese_True Story         5
English_True Story         5
Name: count, dtype: int64


In [6]:
# 创建训练集和测试集的DataLoader
audio_dir = 'CBU0521DD_stories'
train_dataset = AudioDataset(train_df, audio_dir)
test_dataset = AudioDataset(test_df, audio_dir)

batch_size = 8
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 提取训练集特征
train_audio_features = []
train_text_features = []
train_labels = []

for audio_features, text_features, label in train_dataloader:
    audio_features = audio_features.squeeze()  # 如果是 (batch_size, 1, feature_size)，会去掉1维
    text_features = text_features.squeeze()
    train_audio_features.append(audio_features.numpy())
    train_text_features.append(text_features.numpy())
    train_labels.append(label.numpy())

train_audio_features_array = np.concatenate(train_audio_features, axis=0)
train_text_features_array = np.concatenate(train_text_features, axis=0)
train_labels_array = np.concatenate(train_labels, axis=0)

print("训练集特征和标签提取完成。")
print("训练集音频特征形状:", train_audio_features_array.shape)
print("训练集文本特征形状:", train_text_features_array.shape)
print("训练集标签形状:", train_labels_array.shape)

# 提取测试集特征
test_audio_features = []
test_text_features = []
test_labels = []

for audio_features, text_features, label in test_dataloader:
    audio_features = audio_features.squeeze()  # 如果是 (batch_size, 1, feature_size)，会去掉1维
    text_features = text_features.squeeze()
    test_audio_features.append(audio_features.numpy())
    test_text_features.append(text_features.numpy())
    test_labels.append(label.numpy())

# 将特征和标签拼接成 NumPy 数组
test_audio_features_array = np.concatenate(test_audio_features, axis=0)
test_text_features_array = np.concatenate(test_text_features, axis=0)
test_labels_array = np.concatenate(test_labels, axis=0)
print(test_audio_features_array.shape)

print("测试集特征和标签提取完成。")
print("测试集音频特征形状:", test_audio_features_array.shape)
print("测试集文本特征形状:", test_text_features_array.shape)
print("测试集标签形状:", test_labels_array.shape)

# 保存训练集和测试集的特征和标签
np.save("train_audio_features.npy", train_audio_features_array)
np.save("train_text_features.npy", train_text_features_array)  # 保存文本特征
np.save("train_labels.npy", train_labels_array)

np.save("test_audio_features.npy", test_audio_features_array)
np.save("test_text_features.npy", test_text_features_array)  # 保存文本特征
np.save("test_labels.npy", test_labels_array)

00097.wav   Chinese
语音转文字结果: 上洲我相望一家顾而怨是<unk>望我大学时期志愿服务时认识的小难骸好号那谁安静动日口温人和的撒录事空计中假达着但难韩一当人均人那间熟悉的愿字时号号正坐在丘陷让你头专注的白农的人一个基目完偶 含林一生他的名字他他系头<unk>了一下随即录出了大大的笑人放向完具非一奔过来爆助了我哥他了生你尼人熟悉的搬快我摸了模的害头发见他比我剂有展高了不手但的双明样惊亦就充满还特有纯真我们坐在院子的意角聊天高送我他最心喜欢上话话还听奋的跑回房间拿出一本画测给我看开话测里面是期些日嫩却深动的化作或的是孤人怨的生呼认着很来猫弥小火吧他又一父话了一个人穿着知人整个<unk>架真阿着他手只人的夫话碾点说这是安是尼我的心头一震<unk>似有温暖我告送他他话棒应该坚持下去号号用力点了点头人你满是期带你奏他经紧拉使<unk>哥应还为再来吧我<unk>叫来看他阵重的轨到定会到时怀来为带上你的王数数意起来看你  李看故而认认不住回看了号战在门口像我会归手杨光死在他小想生一下了额外温暖这一颗我明拜了陪万与怪爱也许就是我们给与他最重要的意样而号好的纯真和勇敢也让我对生活又了更多的噶物
00081.wav   English
语音转文字结果: IN CALTING WARMTH ITHER WUL A FREEZING WINTER BY THE WIND WAS BLOWI BLOWLING LIKE A KNIFE ACROSS MY FACE I HAD JUST EXPERIENCE THE MAJOR SET BACK AT WARK AND MY MOOD MOOND WAS THE EXTREMELY LOW WORKING ON THE DESERTED STREET I DIDN'T KNOW WHERE I SHOULD GO WHEN I PASSED BY A COFFEE SHOP THE WARM LIGHT THEYARE SIMILAR TO HAVE A E II HAVE AN E IS THE EX THEPLAGU CABOL ATTRACTIN AN I COULDN'T HAL BAT A WORKE THE SHOP WAS FILLED WITH THE RICH AROMA OF COFFEE AND 

In [7]:
# 定义音频二分类模型
class AudioTextClassifier(nn.Module):
    def __init__(self, audio_feature_dim, text_feature_dim):
        super(AudioTextClassifier, self).__init__()
        
        # 音频特征的处理：逐步降维到接近文本特征维度
        self.audio_fc1 = nn.Linear(audio_feature_dim, 4096)  # 从 260000 降到 100000
        self.audio_fc2 = nn.Linear(4096, 1024)   # 从 12000 降到 5000

        # 文本特征的处理
        self.text_fc = nn.Linear(text_feature_dim, 512)
        
        # 拼接后的全连接层
        self.fc1 = nn.Linear(1536, 2048)  # 拼接后的维度是 text_feature_dim + 1000
        self.fc2 = nn.Linear(2048, 512)
        self.fc3 = nn.Linear(512, 1)  # 二分类输出
        
        # 激活函数和 Dropout
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)  # 防止过拟合
        
    def forward(self, audio_features, text_features):
        # 处理音频特征
        audio_x = self.audio_fc1(audio_features)
        audio_x = self.relu(audio_x)
        audio_x = self.dropout(audio_x)

        audio_x = self.audio_fc2(audio_x)
        
        # 处理文本特征
        text_x = self.text_fc(text_features)
        
        # 拼接音频特征和文本特征
        combined_features = torch.cat((audio_x, text_x), dim=1)
        
        # 经过全连接层
        x = self.fc1(combined_features)
        x = self.relu(x)
        x = self.dropout(x)
        
        x = self.fc2(x)
        x = self.relu(x)
        x = self.dropout(x)
        
        x = self.fc3(x)
        # 输出 Sigmoid 激活，用于二分类
        return torch.sigmoid(x).squeeze()  # 返回 [0,1] 之间的概率

In [8]:
# 定义损失函数和优化器
model = AudioTextClassifier(16384, 768).to(device)

criterion = nn.BCELoss()  # 二分类交叉熵损失
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 转换为 PyTorch 的 Tensor
train_audio_features_tensor = torch.tensor(train_audio_features_array, dtype=torch.float32)
train_text_features_tensor = torch.tensor(train_text_features_array, dtype=torch.float32)
train_labels_tensor = torch.tensor(train_labels_array, dtype=torch.float32).view(-1, 1)

test_audio_features_tensor = torch.tensor(test_audio_features_array, dtype=torch.float32)
test_text_features_tensor = torch.tensor(test_text_features_array, dtype=torch.float32)
test_labels_tensor = torch.tensor(test_labels_array, dtype=torch.float32).view(-1, 1)

In [9]:
# 训练模型
num_epochs = 20
batch_size = 8  # 设定 batch_size

train_dataset = TensorDataset(train_audio_features_tensor, train_text_features_tensor, train_labels_tensor)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = TensorDataset(test_audio_features_tensor, test_text_features_tensor, test_labels_tensor)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for audio_features, text_features, labels in train_dataloader:
        audio_features, text_features, labels = audio_features.to(device), text_features.to(device), labels.to(device)
        labels = labels.view(-1)
        
        # 清零梯度
        optimizer.zero_grad()

        # 前向传播
        outputs = model(audio_features, text_features)

        # 计算损失
        loss = criterion(outputs, labels)

        # 反向传播
        loss.backward()
        optimizer.step()

        # 计算训练准确率
        predicted = (outputs > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        running_loss += loss.item()

    train_accuracy = 100 * correct / total
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_dataloader):.4f}, Accuracy: {train_accuracy:.2f}%")

Epoch [1/20], Loss: 0.8706, Accuracy: 46.25%
Epoch [2/20], Loss: 0.7428, Accuracy: 48.75%
Epoch [3/20], Loss: 0.6934, Accuracy: 56.25%
Epoch [4/20], Loss: 0.7259, Accuracy: 60.00%
Epoch [5/20], Loss: 0.7791, Accuracy: 51.25%
Epoch [6/20], Loss: 0.6219, Accuracy: 63.75%
Epoch [7/20], Loss: 0.7628, Accuracy: 51.25%
Epoch [8/20], Loss: 0.6797, Accuracy: 58.75%
Epoch [9/20], Loss: 0.7800, Accuracy: 61.25%
Epoch [10/20], Loss: 0.6484, Accuracy: 65.00%
Epoch [11/20], Loss: 0.6692, Accuracy: 61.25%
Epoch [12/20], Loss: 0.6425, Accuracy: 60.00%
Epoch [13/20], Loss: 0.6014, Accuracy: 60.00%
Epoch [14/20], Loss: 0.5604, Accuracy: 66.25%
Epoch [15/20], Loss: 0.5503, Accuracy: 70.00%
Epoch [16/20], Loss: 0.4936, Accuracy: 71.25%
Epoch [17/20], Loss: 0.5326, Accuracy: 71.25%
Epoch [18/20], Loss: 0.6657, Accuracy: 61.25%
Epoch [19/20], Loss: 0.5885, Accuracy: 68.75%
Epoch [20/20], Loss: 0.4739, Accuracy: 76.25%


In [10]:
# 测试模型
model.eval()  # 进入评估模式
correct = 0
total = 0
test_loss = 0.0
predictions = []
labels_list = []

with torch.no_grad():  # 不计算梯度
    for audio_features, text_features, labels in test_dataloader:
        # 将数据移到设备上
        audio_features, text_features, labels = audio_features.to(device), text_features.to(device), labels.to(device)
        labels = labels.view(-1)  # 确保标签的形状正确
        
        # 前向传播
        outputs = model(audio_features, text_features)
        
        # 计算损失
        loss = criterion(outputs, labels)
        test_loss += loss.item()

        # 计算预测结果
        predicted = (outputs > 0.5).float()
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

        # 将预测结果和标签保存在列表中
        predictions.extend(predicted.cpu().numpy())
        labels_list.extend(labels.cpu().numpy())

# 计算准确率
test_accuracy = 100 * correct / total
average_loss = test_loss / len(test_dataloader)

# 输出测试准确率和损失
print(f"Test Accuracy: {test_accuracy:.2f}%")
print(f"Test Loss: {average_loss:.4f}")

# 打印分类报告
print("Classification Report:")
print(classification_report(labels_list, predictions))


Test Accuracy: 65.00%
Test Loss: 1.4555
Classification Report:
              precision    recall  f1-score   support

         0.0       0.71      0.50      0.59        10
         1.0       0.62      0.80      0.70        10

    accuracy                           0.65        20
   macro avg       0.66      0.65      0.64        20
weighted avg       0.66      0.65      0.64        20

