In [301]:
#  # 音频数据集处理
# import os
# import subprocess
#
# def split_mp3_with_ffmpeg(input_folder, output_folder, duration=30):
#
#     cnt = 0
#     # 创建输出文件夹
#     if not os.path.exists(output_folder):
#         os.makedirs(output_folder)
#
#     # 遍历输入文件夹中的所有 MP3 文件
#     for filename in os.listdir(input_folder):
#         flag = False
#         if filename.endswith('.mp3'):
#             file_path = os.path.join(input_folder, filename)
#
#             # 获取音频文件的总时长
#             result = subprocess.run(['ffmpeg', '-i', file_path], stderr=subprocess.PIPE, text=True, encoding='utf-8')
#             duration_str = None
#
#             # 提取时长信息
#             for line in result.stderr.splitlines():
#                 if 'Duration' in line:
#                     duration_str = line.split('Duration:')[1].split(',')[0].strip()
#                     break
#
#             if duration_str is None:
#                 print(f"无法获取 {filename} 的时长，跳过此文件。")
#                 continue
#
#             hours, minutes, seconds = map(float, duration_str.split(':'))
#             total_seconds = int(hours * 3600 + minutes * 60 + seconds)
#
#             # 切割文件，每 30 秒为一个段落
#             for start_time in range(0, total_seconds, duration):
#                 end_time = min(start_time + duration, total_seconds)
#                 output_filename = f"{cnt}.mp3"
#                 output_path = os.path.join(output_folder, output_filename)
#
#                 cnt = cnt + 1
#                 # 使用 ffmpeg 切割文件
#                 command = [
#                     'ffmpeg', '-i', file_path, '-ss', str(start_time), '-to', str(end_time), '-acodec', 'libmp3lame', '-ab', '192k', output_path
#                 ]
#                 subprocess.run(command, encoding='utf-8')  # 添加 encoding 参数
#                 print(f"已切割并保存文件: {output_path}")
#
#                 if cnt == 500:
#                     flag = True
#                     break
#
#             if flag:
#                 break
#
#
# # 输入文件夹路径和输出文件夹路径
# for i in [2,4]:
#     input_folder = "E:\大创\data\MP3//" + str(i)  # 替换为你的文件夹路径
#     output_folder = "E:\大创\data\MP3//" + str(i) + "_new"  # 替换为你希望保存新文件的文件夹路径
#
#     # 调用切割函数
#     split_mp3_with_ffmpeg(input_folder, output_folder)
#
#


In [302]:
import torch
import torch.nn.functional as F

def pad_or_truncate_audio_features(audio_features, target_length=2813):
    """
    填充或裁剪音频特征使它们具有相同的目标长度。
    """
    current_length = audio_features.size(1)

    if current_length < target_length:
        # 使用零填充
        padding_size = target_length - current_length
        audio_features = F.pad(audio_features, (0, padding_size), "constant", 0)
    elif current_length > target_length:
        # 裁剪到目标长度
        audio_features = audio_features[:, :target_length]

    return audio_features


In [303]:
from sklearn.model_selection import train_test_split
import librosa
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import CLIPProcessor, CLIPModel

# 这里是音频特征提取的函数
def extract_audio_features(audio_path):
    # 加载音频文件
    y, sr = librosa.load(audio_path, sr=None)
    # 提取 Mel-spectrogram 特征
    mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128)
    # 将其转换为 log-mel spectrogram
    log_mel_spectrogram = librosa.power_to_db(mel_spectrogram)
    return log_mel_spectrogram

# 自定义 Dataset
class AudioTextDataset(Dataset):
    def __init__(self, audio_paths, labels, processor, label_map, target_length=2813):
        self.audio_paths = audio_paths
        self.labels = labels
        self.processor = processor
        self.label_map = label_map
        self.target_length = target_length  # 这里初始化 target_length

    def __len__(self):
        return len(self.audio_paths)

    def __getitem__(self, idx):

        # 提取音频特征
        audio_features = extract_audio_features(self.audio_paths[idx])
        audio_features = torch.tensor(audio_features).float()

        # 填充或裁剪音频特征
        audio_features = pad_or_truncate_audio_features(audio_features, self.target_length)


        # 获取标签文本并进行映射
        label = self.labels[idx]
        # label_idx = self.label_map[label]  # 将标签映射为对应的索引

        return torch.tensor(audio_features).float(), torch.tensor(label).int()
    def splitData(self, rate=0.8):
        train_audio_paths, test_audio_paths, train_labels, test_labels = train_test_split(
            self.audio_paths, self.labels, test_size=(1 - rate), random_state=42
        )

        # 返回新的训练集和测试集
        train_dataset = AudioTextDataset(train_audio_paths, train_labels, self.processor, self.label_map)
        test_dataset = AudioTextDataset(test_audio_paths, test_labels, self.processor, self.label_map)

        return train_dataset, test_dataset




In [304]:
import os

audio_path = r'E:\大创\data\MP3'  # 确保这是正确的路径
audio_paths = []
label_texts = []

# 遍历指定的音频文件夹
for folder in os.listdir(audio_path):
    folder_path = os.path.join(audio_path, folder)

    # 确保只处理文件夹，跳过文件
    if os.path.isdir(folder_path) and folder.endswith("new"):
        print(f"Processing folder: {folder}")
        label = int(folder.split("_")[0])  # 假设标签在文件夹名中，以 "_" 分隔

        # 遍历该文件夹中的音频文件
        for audio_file in os.listdir(folder_path):
            audio_file_path = os.path.join(folder_path, audio_file)

            # 只处理音频文件（可以根据实际文件扩展名调整条件）
            if audio_file.endswith(".mp3"):
                print(f"Adding file: {audio_file_path}")
                audio_paths.append(audio_file_path)
                label_texts.append(label)

        # 如果你只想处理文件夹下的第一个音频文件，可以在这里加上 break
        # break

# 输出结果
print(f"Total audio files: {len(audio_paths)}")
print(f"Total labels: {len(label_texts)}")


Processing folder: 1_new
Adding file: E:\大创\data\MP3\1_new\0.mp3
Adding file: E:\大创\data\MP3\1_new\1.mp3
Adding file: E:\大创\data\MP3\1_new\10.mp3
Adding file: E:\大创\data\MP3\1_new\100.mp3
Adding file: E:\大创\data\MP3\1_new\101.mp3
Adding file: E:\大创\data\MP3\1_new\102.mp3
Adding file: E:\大创\data\MP3\1_new\103.mp3
Adding file: E:\大创\data\MP3\1_new\104.mp3
Adding file: E:\大创\data\MP3\1_new\105.mp3
Adding file: E:\大创\data\MP3\1_new\106.mp3
Adding file: E:\大创\data\MP3\1_new\107.mp3
Adding file: E:\大创\data\MP3\1_new\108.mp3
Adding file: E:\大创\data\MP3\1_new\109.mp3
Adding file: E:\大创\data\MP3\1_new\11.mp3
Adding file: E:\大创\data\MP3\1_new\110.mp3
Adding file: E:\大创\data\MP3\1_new\111.mp3
Adding file: E:\大创\data\MP3\1_new\112.mp3
Adding file: E:\大创\data\MP3\1_new\113.mp3
Adding file: E:\大创\data\MP3\1_new\114.mp3
Adding file: E:\大创\data\MP3\1_new\115.mp3
Adding file: E:\大创\data\MP3\1_new\116.mp3
Adding file: E:\大创\data\MP3\1_new\117.mp3
Adding file: E:\大创\data\MP3\1_new\118.mp3
Adding file: E:

In [305]:
# 创建数据集
dataset = AudioTextDataset(audio_paths, label_texts, processor=None, label_map=None,target_length=2813)

# 划分数据集
train_dataset, test_dataset = dataset.splitData(rate=0.8)

# 打印数据集长度
print(f"训练集大小: {len(train_dataset)}")
print(f"测试集大小: {len(test_dataset)}")

训练集大小: 1391
测试集大小: 348


In [306]:
train_dataloader ,test_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True),DataLoader(test_dataset,batch_size=16,shuffle=True)

In [307]:
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = torch.device("cpu")
# print(device)

In [308]:
import torch.nn as nn
import torchvision.models as models

# class AudioEncoder(nn.Module):
#     def __init__(self):
#         super(AudioEncoder, self).__init__()
#         # 使用 ResNet18 作为音频编码器的基础模型
#         self.resnet = models.resnet18(pretrained=True)
#         self.resnet.fc = nn.Identity()  # 去掉分类头，只保留特征提取部分
#
#     def forward(self, x):
#         return self.resnet(x)


# class AudioEncoder(nn.Module):
#     def __init__(self, input_channels=128):
#         super(AudioEncoder, self).__init__()
#         # 首先添加一个卷积层，将输入的通道数从 128 降为 16
#         self.conv1 = nn.Conv2d(input_channels, 16, kernel_size=(1, 1))  # 使用 1x1 卷积降通道
#
#         # 使用 ResNet18 作为音频编码器的基础模型
#         self.resnet = models.resnet18(pretrained=True)
#
#         # 修改输入卷积层的通道数，将原来的 3 通道改为 16 通道
#         self.resnet.conv1 = nn.Conv2d(16, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
#
#         # 去掉分类头，只保留特征提取部分
#         self.resnet.fc = nn.Identity()
#
#     def forward(self, x):
#         # 先通过 conv1 将通道数从 128 降到 16
#         x = self.conv1(x)
#         if len(x.shape) == 3:
#             # 假设输入是 [batch_size, channels, height]，我们需要扩展成 [batch_size, channels, height, width]
#             x = x.unsqueeze(-1)  # 添加宽度维度，变为 [batch_size, channels, height, 1]
#
#         return self.resnet(x)


#  torch.Size([16, 128, 2813])
class AudioEncoder(nn.Module):
    def __init__(self, input_channels=128):
        super(AudioEncoder, self).__init__()
        # 使用 ResNet18 作为音频编码器的基础模型
        self.resnet = models.resnet18(pretrained=True)

        # 修改输入卷积层的通道数，将原来的 3 通道改为 16 通道
        self.resnet.conv1 = nn.Conv2d(input_channels, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)

        # 去掉分类头，只保留特征提取部分
        self.resnet.fc = nn.Identity()

    def forward(self, x):
        # 确保输入是 4D 张量，如果是 3D 则添加 batch 维度
        x = x.unsqueeze(-1)

        return self.resnet(x)

# 创建音频编码器模型
audio_encoder = AudioEncoder()
# audio_encoder.to(device)


In [309]:
# 加载 CLIP 模型
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch16")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch16")

class TextEncoder(nn.Module):
    def __init__(self, clip_model):
        super(TextEncoder, self).__init__()
        self.clip_model = clip_model
        self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch16")

    def forward(self, text_input):
        # 如果 text_input 是单个字符串，将其转换为列表
        if isinstance(text_input, str):
            text_input = [text_input]

        # 使用 CLIPProcessor 将文本转换为张量格式
        text_inputs = self.clip_processor(text=text_input, return_tensors="pt", padding=True)

        # 获取文本特征，注意这里提取的是文本的 [CLS] token 特征
        text_features = self.clip_model.get_text_features(**text_inputs)

        # 返回文本的 [CLS] token 表示
        return text_features


In [310]:
class MultiModalClassifier(nn.Module):
    def __init__(self, audio_encoder, text_encoder, num_classes=4):
        super(MultiModalClassifier, self).__init__()
        self.audio_encoder = audio_encoder
        self.text_encoder = text_encoder

        # 假设拼接后的特征维度是 513 (audio_features + text_features)
        # self.classification_head = nn.Linear(513, num_classes)  # 这里的 513 应该和拼接后的维度一致

        self.classification_head = nn.Sequential(
            nn.Linear(513, 1024),  # 假设拼接后的特征是 513，调整为 1024
            nn.ReLU(),
            nn.Linear(1024, num_classes)
        )

    def forward(self, audio_features, text_input):
        # 音频特征
        audio_features = self.audio_encoder(audio_features)

        print(audio_features.shape)
        # 文本特征
        print(text_input)
        # text_features = self.text_encoder(text_input)
        text_features = text_input
        print(text_features.shape)
        # 拼接音频和文本特征
         # 确保音频特征和文本特征的维度一致
        if audio_features.dim() == 2 and text_features.dim() == 1:
            text_features = text_features.unsqueeze(1)  # 将文本特征从 (batch_size,) 转换为 (batch_size, 1)
        combined_features = torch.cat([audio_features, text_features], dim=-1)
        print("拼接后：" + str(combined_features.shape))
        # 通过分类头进行分类
        logits = self.classification_head(combined_features)
        return logits


In [311]:
# 创建模型
model = MultiModalClassifier(audio_encoder=AudioEncoder(), text_encoder=TextEncoder(clip_model), num_classes=4)

# 损失函数和优化器
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)
loss_fn = nn.CrossEntropyLoss()
# torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

# 训练
epochs = 10
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for audio_features, label_batch in train_dataloader:
        # audio_features = audio_features.to(device).float()  # 确保音频特征是浮点数
        audio_features = audio_features.float()
        # label_batch = label_batch.to(device)

        label_batch = label_batch - 1
        print(audio_features.shape)

        # audio_features = pad_or_truncate_audio_features(audio_features, target_length=2813)

        # print(audio_features.shape)


        # 前向传播
        logits = model(audio_features, label_batch)
        label_batch = label_batch.long()
        # 计算损失
        loss = loss_fn(logits, label_batch)
        total_loss += loss.item()

        # 反向传播
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_dataloader)}")


  return torch.tensor(audio_features).float(), torch.tensor(label).int()


torch.Size([16, 128, 2813])
torch.Size([16, 512])
tensor([2, 0, 1, 0, 3, 0, 1, 0, 1, 0, 2, 1, 1, 1, 0, 2], dtype=torch.int32)
torch.Size([16])
拼接后：torch.Size([16, 513])


RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [None]:
def test_accuracy(test_dataloader):
    model.eval()  # 设置模型为评估模式
    correct = 0  # 记录正确预测的数量
    total = 0  # 记录总的样本数

    with torch.no_grad():  # 在测试阶段，不需要计算梯度
        for audio_features, label in test_dataloader:
            # 将数据移到相应的设备
            # audio_features = audio_features.to(device)
            # label = label.to(device)

            # 获取模型输出
            logits = model(audio_features)  # 根据你的模型，可能需要传递其他参数
            predicted_label = torch.argmax(logits, dim=-1)

            # 计算预测正确的数量
            correct += (predicted_label == label).sum().item()
            total += label.size(0)

    accuracy = correct / total * 100  # 计算准确率
    return accuracy

# 假设 test_dataloader 已经定义并且包含测试数据
accuracy = test_accuracy(test_dataloader)
print(f"Test Accuracy: {accuracy:.2f}%")

In [None]:
# 推理
def predict(audio_path, label_input):
    model.eval()

    # 提取音频特征
    audio_features = extract_audio_features(audio_path)
    # audio_features = torch.tensor(audio_features).unsqueeze(0).to(device).float()
    audio_features = torch.tensor(audio_features).unsqueeze(0).float()

    # 标签处理
    label_input = [label_input]  # 标签是文本
    text_features = model.text_encoder(label_input)

    # 获取预测结果
    logits = model(audio_features, label_input)

    # 获取最匹配的类别
    predicted_label = torch.argmax(logits, dim=-1)
    return predicted_label.item()

# 使用模型进行推理
audio_path = "data/audio_test.wav"
label_input = "cat"  # 假设标签是 "cat"
predicted_label = predict(audio_path, label_input)
print(f"Predicted label: {predicted_label}")
