In [24]:
import math
import os
import sys
import random
import traceback
import numpy as np
import torch
import sentencepiece as spm
import librosa
current_dir = os.getcwd()
parent_dir = os.path.dirname(os.getcwd())
sys.path.append(parent_dir)
from GPT_SoVITS.module.mel_processing import spectrogram_torch

from tools.my_utils import load_audio

class TextAudioSpeakerLoader(torch.utils.data.Dataset):
    def tokenize_text(self, text):
        token = [0] + [x + 1 for x in self.sp.encode(text)] + [2]
        return token

    def __init__(self, hparams, val=False):
        self.sp = spm.SentencePieceProcessor()
        self.sp.load("../pretrained_models/sentencepiece.bpe.model")
        exp_dir = hparams.exp_dir
        todo = []
        self.audiopaths_text = []
        self.lengths = []
        for root, dirs, files in os.walk(exp_dir):
            for file in files:
                if file.endswith(".txt"):
                    index_folder = os.path.relpath(root, exp_dir)
                    file_path = os.path.join(root, file)

                    # 尝试不同的编码
                    encodings = ["utf-8", "gbk", "gb2312", "utf-16"]
                    for encoding in encodings:
                        try:
                            with open(file_path, "r", encoding=encoding) as f:
                                lines = f.readlines()
                            break  # 如果成功读取，跳出循环
                        except UnicodeDecodeError:
                            continue  # 如果解码失败，尝试下一个编码
                    else:
                        print(f"无法解码文件 {file_path}，跳过此文件")
                        continue  # 如果所有编码都失败，跳过此文件

                    for line in lines:
                        try:
                            spk_name, wav_name, text = line.split("|")
                            todo.append([spk_name, wav_name, text, index_folder])
                        except Exception:
                            print(line)
        for data in todo:
            _, wav_name, text, index_folder = data
            audio_path = os.path.join(exp_dir, index_folder, wav_name)
            speech_token_path = audio_path + ".npy"
            bert_path = audio_path + ".pt"
            wav_path = audio_path + ".wav"
            if (
                os.path.exists(speech_token_path)
                and os.path.exists(bert_path)
                and os.path.exists(wav_path)
            ):
                try:
                    duration = librosa.get_duration(filename=wav_path)  # noqa: F821
                    self.lengths.append(math.ceil(duration * 50))
                except Exception as e:
                    print(f"无法处理文件 {wav_path}：{str(e)}")
                    continue
                self.audiopaths_text.append([audio_path, text])

        self.max_wav_value = hparams.max_wav_value
        self.sampling_rate = hparams.sampling_rate
        self.filter_length = hparams.filter_length
        self.hop_length = hparams.hop_length
        self.win_length = hparams.win_length
        self.sampling_rate = hparams.sampling_rate
        self.val = val

        """
        @misc{picard2023torchmanualseed3407needinfluencerandom,
        title={Torch.manual_seed(3407) is all you need: On the influence of random seeds in deep learning architectures for computer vision}, 
        author={David Picard},
        year={2023},
        eprint={2109.08203},
        archivePrefix={arXiv},
        primaryClass={cs.CV},
        url={https://arxiv.org/abs/2109.08203}, 
        }
        """
        random.seed(3407)  # 3407 is all you need

        random.shuffle(self.audiopaths_text)
        print("wav_data_len:", len(self.audiopaths_text))

    def get_audio_text_speaker_pair(self, audiopath_text):
        audiopath, text = audiopath_text
        text_token = self.tokenize_text(text)
        try:
            spec, wav = self.get_audio(audiopath + ".wav")
            speech_token = np.load(audiopath + ".npy")
            speech_token = torch.from_numpy(speech_token)
            min_length = min(speech_token.shape[-1], spec.shape[-1])
            speech_token = speech_token[..., :min_length]
            spec = spec[..., :min_length]
        except Exception:
            traceback.print_exc()
            spec = torch.zeros(1025, 100)
            wav = torch.zeros(1, 100 * self.hop_length)
            speech_token = torch.zeros(1, 100)
            text_token = text_token[-1:]
            print("load error!!!!!!", audiopath)
        return (speech_token, spec, wav, text_token)

    def get_audio(self, filename):
        audio_array = load_audio(filename, self.sampling_rate)
        audio = torch.FloatTensor(audio_array)
        audio = audio.unsqueeze(0)
        spec = spectrogram_torch(
            audio,
            self.filter_length,
            self.sampling_rate,
            self.hop_length,
            self.win_length,
            center=False,
        )
        spec = torch.squeeze(spec, 0)
        return spec, audio

    def get_sid(self, sid):
        sid = torch.LongTensor([int(sid)])
        return sid

    def __getitem__(self, index):
        # with torch.no_grad():
        return self.get_audio_text_speaker_pair(self.audiopaths_text[index])

    def __len__(self):
        return len(self.audiopaths_text)

In [25]:
class SimpleHparams:
    def __init__(self):
        self.exp_dir = "../dataset"
        self.max_wav_value = 32768.0
        self.sampling_rate = 32000
        self.filter_length = 2048
        self.hop_length = 640
        self.win_length = 2048

# 创建 hparams 实例
hparams = SimpleHparams()

# 创建 TextAudioSpeakerLoader 实例
dataset = TextAudioSpeakerLoader(hparams)

# 打印数据集的大小
print(f"数据集大小: {len(dataset)}")

# 获取第一个样本
if len(dataset) > 0:
    sample = dataset[1]
    speech_token, spec, wav, text_token = sample
    
    print(f"语音标记形状: {speech_token.shape}")
    print(f"频谱图形状: {spec.shape}")
    print(f"波形形状: {wav.shape}")
    print(f"文本标记: {text_token}")
    
    # 保存wav到文件
    import soundfile as sf
    import numpy as np
    
    # 确保wav是一个numpy数组，并且是float32类型
    if isinstance(wav, torch.Tensor):
        wav = wav.numpy()
    wav = wav.astype(np.float32)
    
    # 确保音频数据在-1到1之间
    wav = np.clip(wav, -1, 1)
    
    # 导出为WAV文件
    try:
        sf.write('output_audio.wav', wav.squeeze(), hparams.sampling_rate, subtype='FLOAT')
        print("音频已导出为 output_audio.wav")
    except Exception as e:
        print(f"导出音频时出错: {str(e)}")
else:
    print("数据集为空")

wav_data_len: 16006
数据集大小: 16006
语音标记形状: torch.Size([324])
频谱图形状: torch.Size([1025, 324])
波形形状: torch.Size([1, 207920])
文本标记: [0, 6, 10330, 216765, 886, 130668, 4, 35922, 274, 8856, 887, 7402, 83855, 4011, 25840, 4, 63211, 184120, 274, 2058, 4011, 2391, 4695, 6042, 4, 1036, 4502, 32, 2]
音频已导出为 output_audio.wav


In [26]:
class TextAudioCollate:
    def __init__(self, return_ids=False):
        self.return_ids = return_ids

    def __call__(self, batch):
        # 按照频谱图长度排序
        _, ids_sorted_decreasing = torch.sort(
            torch.LongTensor([x[1].size(1) for x in batch]),
            dim=0, descending=True
        )

        max_speech_len = max([x[0].size(0) for x in batch])
        max_spec_len = max([x[1].size(1) for x in batch])
        max_wav_len = max([x[2].size(1) for x in batch])
        max_text_len = max([len(x[3]) for x in batch])

        speech_lengths = torch.LongTensor(len(batch))
        spec_lengths = torch.LongTensor(len(batch))
        wav_lengths = torch.LongTensor(len(batch))
        text_lengths = torch.LongTensor(len(batch))

        speech_padded = torch.LongTensor(len(batch), max_speech_len)
        spec_padded = torch.FloatTensor(len(batch), 1025, max_spec_len)
        wav_padded = torch.FloatTensor(len(batch), 1, max_wav_len)
        text_padded = torch.LongTensor(len(batch), max_text_len)

        speech_padded.zero_()
        spec_padded.zero_()
        wav_padded.zero_()
        text_padded.zero_()

        for i in range(len(ids_sorted_decreasing)):
            row = batch[ids_sorted_decreasing[i]]

            speech = row[0]
            speech_padded[i, :speech.size(0)] = speech
            speech_lengths[i] = speech.size(0)

            spec = row[1]
            spec_padded[i, :, :spec.size(1)] = spec
            spec_lengths[i] = spec.size(1)

            wav = row[2]
            wav_padded[i, :, :wav.size(1)] = wav
            wav_lengths[i] = wav.size(1)

            text = torch.LongTensor(row[3])
            text_padded[i, :text.size(0)] = text
            text_lengths[i] = text.size(0)

        if self.return_ids:
            return (
                speech_padded, speech_lengths, spec_padded, spec_lengths,
                wav_padded, wav_lengths, text_padded, text_lengths, ids_sorted_decreasing
            )
        return (
            speech_padded, speech_lengths, spec_padded, spec_lengths,
            wav_padded, wav_lengths, text_padded, text_lengths
        )

In [27]:
from torch.utils.data import DataLoader
# 创建 collate 函数实例
collate_fn = TextAudioCollate()

# 创建 DataLoader
batch_size = 4  # 使用小批量以便于观察
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

# 获取一个批次的数据
batch = next(iter(dataloader))

# 解包批次数据
speech_padded, speech_lengths, spec_padded, spec_lengths, wav_padded, wav_lengths, text_padded, text_lengths = batch

# 打印每个张量的形状和类型
print(f"speech_padded shape: {speech_padded.shape}, type: {speech_padded.dtype}")
print(f"speech_lengths shape: {speech_lengths.shape}, type: {speech_lengths.dtype}")
print(f"spec_padded shape: {spec_padded.shape}, type: {spec_padded.dtype}")
print(f"spec_lengths shape: {spec_lengths.shape}, type: {spec_lengths.dtype}")
print(f"wav_padded shape: {wav_padded.shape}, type: {wav_padded.dtype}")
print(f"wav_lengths shape: {wav_lengths.shape}, type: {wav_lengths.dtype}")
print(f"text_padded shape: {text_padded.shape}, type: {text_padded.dtype}")
print(f"text_lengths shape: {text_lengths.shape}, type: {text_lengths.dtype}")

# 验证填充是否正确
for i in range(batch_size):
    print(f"\nSample {i}:")
    print(f"  Speech length: {speech_lengths[i]}, Padded speech shape: {speech_padded[i].shape}")
    print(f"  Spec length: {spec_lengths[i]}, Padded spec shape: {spec_padded[i].shape}")
    print(f"  Wav length: {wav_lengths[i]}, Padded wav shape: {wav_padded[i].shape}")
    print(f"  Text length: {text_lengths[i]}, Padded text shape: {text_padded[i].shape}")

speech_padded shape: torch.Size([4, 237]), type: torch.int64
speech_lengths shape: torch.Size([4]), type: torch.int64
spec_padded shape: torch.Size([4, 1025, 237]), type: torch.float32
spec_lengths shape: torch.Size([4]), type: torch.int64
wav_padded shape: torch.Size([4, 1, 151820]), type: torch.float32
wav_lengths shape: torch.Size([4]), type: torch.int64
text_padded shape: torch.Size([4, 21]), type: torch.int64
text_lengths shape: torch.Size([4]), type: torch.int64

Sample 0:
  Speech length: 237, Padded speech shape: torch.Size([237])
  Spec length: 237, Padded spec shape: torch.Size([1025, 237])
  Wav length: 151820, Padded wav shape: torch.Size([1, 151820])
  Text length: 20, Padded text shape: torch.Size([21])

Sample 1:
  Speech length: 226, Padded speech shape: torch.Size([237])
  Spec length: 226, Padded spec shape: torch.Size([1025, 237])
  Wav length: 144780, Padded wav shape: torch.Size([1, 151820])
  Text length: 21, Padded text shape: torch.Size([21])

Sample 2:
  Speech

In [28]:
import torch
import torchaudio
import os

# 创建一个保存音频文件的目录
output_dir = "output_wavs"
os.makedirs(output_dir, exist_ok=True)

for i in range(batch_size):
    # 获取当前样本的音频数据和长度
    wav = wav_padded[i, :, :]  # 只取有效长度的部分
    
    # 将张量转换为CPU上的float32类型
    wav = wav.cpu().float()
    
    # 确保wav是二维的 (通道数 x 采样点数)
    if wav.dim() == 1:
        wav = wav.unsqueeze(0)
    elif wav.dim() == 3:
        wav = wav.squeeze(0)
    
    # 保存音频文件
    file_path = os.path.join(output_dir, f"sample_{i}.wav")
    torchaudio.save(file_path, wav, sample_rate=32000)
    
    print(f"已保存音频文件：{file_path}")

已保存音频文件：output_wavs/sample_0.wav
已保存音频文件：output_wavs/sample_1.wav
已保存音频文件：output_wavs/sample_2.wav
已保存音频文件：output_wavs/sample_3.wav
