In [None]:
!pip install torch==2.1.0 torchvision==0.16.0 torchaudio==2.1.0

In [1]:
import sys
sys.path.append('..')
import torch
from infer.lib.rmvpe import RMVPE
from fairseq import checkpoint_utils
from scipy.signal import butter
import torchinterp1d
import torch.nn.functional as F
from typing import *
import librosa
from configs.config import Config
from infer.modules.vc.modules import VC
import numpy as np
import torchaudio

config = Config()

vc = VC(config)

vc.get_vc("Youdao.pth")

2025-02-11 14:10:58 | INFO | faiss.loader | Loading faiss with AVX2 support.
2025-02-11 14:10:58 | INFO | faiss.loader | Successfully loaded faiss with AVX2 support.
2025-02-11 14:10:58 | INFO | configs.config | Found GPU NVIDIA TITAN Xp
2025-02-11 14:10:58 | INFO | configs.config | Half-precision floating-point: True, device: cuda:0
2025-02-11 14:10:58 | INFO | infer.modules.vc.modules | Get sid: Youdao.pth
2025-02-11 14:10:58 | INFO | infer.modules.vc.modules | Loading: assets/weights/Youdao.pth
2025-02-11 14:10:59 | INFO | infer.modules.vc.modules | Select index: 


{'visible': True, 'maximum': 109, '__type__': 'update'}

In [2]:
hubert,_,_ = checkpoint_utils.load_model_ensemble_and_task(
    ["./assets/hubert/hubert_base.pt"],
    suffix="",
)
hubert_model = hubert[0]
hubert_model = hubert_model.half()

2025-02-11 14:11:00 | INFO | fairseq.tasks.hubert_pretraining | current directory is /root/Retrieval-based-Voice-Conversion-WebUI-main
2025-02-11 14:11:00 | INFO | fairseq.tasks.hubert_pretraining | HubertPretrainingTask Config {'_name': 'hubert_pretraining', 'data': 'metadata', 'fine_tuning': False, 'labels': ['km'], 'label_dir': 'label', 'label_rate': 50.0, 'sample_rate': 16000, 'normalize': False, 'enable_padding': False, 'max_keep_size': None, 'max_sample_size': 250000, 'min_sample_size': 32000, 'single_target': False, 'random_crop': True, 'pad_audio': False}
2025-02-11 14:11:00 | INFO | fairseq.models.hubert.hubert | HubertModel Config: {'_name': 'hubert', 'label_rate': 50.0, 'extractor_mode': default, 'encoder_layers': 12, 'encoder_embed_dim': 768, 'encoder_ffn_embed_dim': 3072, 'encoder_attention_heads': 12, 'activation_fn': gelu, 'layer_type': transformer, 'dropout': 0.1, 'attention_dropout': 0.1, 'activation_dropout': 0.0, 'encoder_layerdrop': 0.05, 'dropout_input': 0.1, 'drop

In [3]:
class RVCPipelineTorch(torch.nn.Module):
    def __init__(
        self, 
        hubert_model:torch.nn.Module,
        config: Config,
        vc:VC,
        rmvpe:RMVPE,
        bh:torch.Tensor,
        ah:torch.Tensor,
    ):
        super(RVCPipelineTorch, self).__init__()
        # 处理输入
        self.hubert_model:torch.nn.Module = hubert_model
        self.config:Config = config
        self.vc:VC = vc
        self.rmvpe:RMVPE = rmvpe

        ## Graio输入配置
        self.f0_up_key:int = 0 # 升调Key
        self.index_rate:float = 0.75 # 检索特征占比
        self.filter_radius:int = 3 # 中值滤波半径
        self.resample_sr:int = 40000 # 重采样率
        self.rms_mix_rate:float = 0.25 # 输出源音量包络替换输出音量暴包络的融合比
        self.protect:float = 0.33 # 保护清辅音和呼吸声的比例

        ## Pipeline 配置
        self.pipeline_sr:int = 16000 # Hubert输入采样率
        self.pipeline_window:int = 160 # 每帧点数
        self.pipeline_tpad:int = self.pipeline_sr # 每条前后pad时间
        self.pipeline_tpad_tgt:int = self.vc.tgt_sr
        self.pipeline_tpad2:int = self.pipeline_tpad * 2
        self.pipeline_tquery:int = self.pipeline_sr * 5 # 查询切点前后查询时间
        self.pipeline_tcenter:int = self.pipeline_sr * 30 # 查询切点位置
        self.pipeline_t_max:int = self.pipeline_sr * 32 # 最大查询时间
        self.tf0:float = self.pipeline_sr / self.pipeline_window # 每秒点数
        self.f0_min:int = 50 # 最小F0
        self.f0_max:int = 1100 # 最大F0
        self.f0_mel_min:int = 1127 * np.log(1 + self.f0_min / 700) # 最小梅尔频率
        self.f0_mel_max:int = 1127 * np.log(1 + self.f0_max / 700)
        self.bh:torch.Tensor = bh
        self.ah:torch.Tensor = ah
        self.f0_min:int = 50
        self.f0_max:int = 1100
        self.f0_mel_min:int = 1127 * np.log(1 + self.f0_min / 700)
        self.f0_mel_max:int = 1127 * np.log(1 + self.f0_max / 700)
        self.is_half:bool = False
        self.max_int16:int = np.iinfo(np.int16).max + 1
    def get_f0(
        self,
        x: torch.Tensor,
        inp_f0:Optional[torch.Tensor] = None,
    ) -> Tuple[torch.Tensor,torch.Tensor]:
        f0:torch.Tensor = torch.tensor(
            self.rmvpe.infer_from_audio(x.squeeze(), thred=0.03), 
        ).cuda()
        f0 *= pow(2, self.f0_up_key / 12)
        # with open("test.txt","w")as f:f.write("\n".join([str(i)for i in f0.tolist()]))
        tf0:int = self.pipeline_sr // self.pipeline_window  # 每秒f0点数
        if inp_f0 is not None:
            delta_t:torch.Tensor = torch.round(
                (inp_f0[:, 0].max() - inp_f0[:, 0].min()) * tf0 + 1
            ).astype(torch.int16)
            replace_f0:torch.Tensor = torchinterp1d.interp1d(
                list(range(delta_t)), inp_f0[:, 0] * 100, inp_f0[:, 1]
            )
            shape:int = f0[self.config.x_pad * tf0 : self.config.x_pad * tf0 + len(replace_f0)].shape[0]
            f0[self.config.x_pad * tf0 : self.config.x_pad * tf0 + len(replace_f0)] = replace_f0[
                :shape
            ]
        # with open("test_opt.txt","w")as f:f.write("\n".join([str(i)for i in f0.tolist()]))
        f0bak:torch.Tensor = f0.clone()
        f0_mel:torch.Tensor = 1127 * torch.log(1 + f0 / 700)
        # f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - self.f0_mel_min) * 254 / (
        #     self.f0_mel_max - self.f0_mel_min
        # ) + 1
        f0_mel:torch.Tensor = torch.where(
            f0_mel > 0,
            (f0_mel - self.f0_mel_min) * 254 / (
                self.f0_mel_max - self.f0_mel_min
            ) + 1,f0_mel
        )
        f0_mel:torch.Tensor = torch.clamp(f0_mel,1,255)
        f0_mel[f0_mel <= 1] = 1
        f0_mel[f0_mel > 255] = 255
        # f0_coarse = np.rint(f0_mel).astype(np.int32)
        f0_coarse:torch.Tensor = torch.round(f0_mel).to(torch.int32)
        return f0_coarse, f0bak  # 1-0
    def rms(
        self,
        y:torch.Tensor = None,
        frame_length:int = 2048,
        hop_length:int = 512,
        center:bool = True,
        pad_mode:str = "constant"
    ) -> torch.Tensor:
        # 确保输入张量是二维的
        if y.dim() == 1:
            y:torch.Tensor = y.unsqueeze(0)
        # 获取音频长度
        num_samples:int = y.shape[-1]
        
        if center:
            # 计算需要填充的长度
            padding:Tuple[int] = (frame_length // 2, frame_length // 2)
            # 对音频信号进行填充
            y:torch.Tensor = F.pad(
                y,padding,mode=pad_mode
            )
            # 更新音频长度
            num_samples:int = y.shape[-1]
        # 计算帧数
        frames:int = 1 + (num_samples - frame_length) // hop_length

        # 创建滑动窗口索引
        indices:torch.Tensor = torch.arange(frame_length).unsqueeze(0) + hop_length * torch.arange(frames).unsqueeze(-1)
        indices:torch.Tensor = indices.to(y.device)

        # 使用索引获取帧
        framed_signal:torch.Tensor = y[:, indices]

        # 计算RMS
        squared_signal:torch.Tensor = framed_signal ** 2
        mean_squared:torch.Tensor = torch.mean(squared_signal, dim=-1)
        rms:torch.Tensor = torch.sqrt(mean_squared)
        
        return rms.squeeze(0)
    def change_rms(
        self,
        data1:torch.Tensor,
        sr1:int,
        data2:torch.Tensor,
        sr2:int,
        rate:float
    ) -> torch.Tensor:  # 1是输入音频，2是输出音频,rate是2的占比
        rms1:torch.Tensor = self.rms(
            y = data1,
            frame_length = sr1 // 2 * 2,
            hop_length = sr1 // 2
        )
        rms2:torch.Tensor = self.rms(
            y = data2,
            frame_length = sr2 // 2 * 2,
            hop_length = sr2 // 2
        )
        rms1:torch.Tensor = F.interpolate(
            rms1.unsqueeze(0).unsqueeze(0), size=data2.shape[0], mode="linear"
        ).squeeze()
        rms2:torch.Tensor = F.interpolate(
            rms2.unsqueeze(0).unsqueeze(0), size=data2.shape[0], mode="linear"
        ).squeeze()
        rms2:torch.Tensor = torch.max(rms2, torch.zeros_like(rms2) + 1e-6)
        data2 *= (
            torch.pow(rms1, torch.tensor(1 - rate))
            * torch.pow(rms2, torch.tensor(rate - 1))
        )
        return data2
    @torch.autocast(
        device_type = "cuda",
        dtype = torch.float16
    )
    def vc_func(
        self,
        model:torch.nn.Module,
        net_g:torch.nn.Module,
        sid:int,
        audio0:torch.Tensor,
        pitch:Optional[torch.Tensor],
        pitchf:Optional[torch.Tensor],
        protect:float,
    ) -> torch.Tensor:
        feats:torch.Tensor = audio0.half()
        # 处理双通道音频
        if feats.dim() == 2:
            feats:torch.Tensor = feats.mean(-1)
        feats:torch.Tensor = feats.view(1, -1)
        padding_mask:torch.BoolTensor = torch.BoolTensor(feats.shape).to(audio0.device).fill_(False)
        inputs:Dict[str,Any] = {
            "source": feats.to(audio0.device),
            "padding_mask": padding_mask.to(audio0.device),
            "output_layer": 12,
        }
        with torch.no_grad():
            logits:torch.Tensor = model.extract_features(**inputs)
            feats:torch.Tensor = logits[0]
        if protect < 0.5 and pitch is not None and pitchf is not None:
            feats0:torch.Tensor = feats.clone()
        feats:torch.Tensor = F.interpolate(feats.permute(0, 2, 1), scale_factor=2).permute(0, 2, 1)
        if protect < 0.5 and pitch is not None and pitchf is not None:
            feats0:int = F.interpolate(feats0.permute(0, 2, 1), scale_factor=2).permute(
                0, 2, 1
            )
        p_len:int = audio0.shape[0] // self.pipeline_window
        if feats.shape[1] < p_len:
            p_len:int = feats.shape[1]
            if pitch is not None and pitchf is not None:
                pitch:torch.Tensor = pitch[:, :p_len]
                pitchf:torch.Tensor = pitchf[:, :p_len]

        if protect < 0.5 and pitch is not None and pitchf is not None:
            pitchff:torch.Tensor = pitchf.clone()
            pitchff[pitchf > 0] = 1
            pitchff[pitchf < 1] = protect
            pitchff:torch.Tnesor = pitchff.unsqueeze(-1)
            feats:torch.Tensor = feats * pitchff + feats0 * (1 - pitchff)
            feats:torch.Tensor = feats.to(feats0.dtype)
        p_len:torch.Tensor = torch.tensor([p_len], device=audio0.device).long()
        with torch.no_grad():
            # hasp = pitch is not None and pitchf is not None
            arg:Tuple = (feats.half(), p_len, pitch, pitchf, sid)
            audio1:torch.Tensor = net_g.infer(*arg)[0][0, 0]
        return audio1
    def pipeline(
        self,
        hubert_model:torch.nn.Module,
        net_g:torch.nn.Module,
        sid:int,
        audio:torch.Tensor,
        target_sr:int,
        resample_sr:int,
        rms_mix_rate:float,
        protect:float
    ) -> torch.Tensor:  
        audio:torch.Tensor = torchaudio.functional.filtfilt(
            waveform=audio.to(torch.float64),
            a_coeffs=self.ah,
            b_coeffs=self.bh
        )
        audio_pad:torch.Tensor = torch.nn.functional.pad(
            audio.unsqueeze(0),
            (
                self.pipeline_window // 2,
                self.pipeline_window // 2
            ),
            mode = "reflect"
        )
        audio_pad:torch.Tensor = audio_pad.squeeze()
        opt_ts:List[torch.Tensor] = []
        if audio_pad.shape[0] > self.pipeline_t_max:
            audio_sum:torch.Tensor = torch.zeros_like(audio)
            for i in range(self.pipeline_window):
                audio_sum += torch.abs(
                    audio_pad[i : i - self.pipeline_window]
                )
            for t in range(self.pipeline_tcenter, audio.shape[0], self.pipeline_tquery):
                opt_ts.append(
                    t
                    - self.pipeline_tquery
                    + torch.where(
                        audio_sum[t - self.pipeline_tquery : t + self.pipeline_tquery]
                        == audio_sum[t - self.pipeline_tquery : t + self.pipeline_tquery].min()
                    )[0][0]
                )
        s:int = 0
        audio_opt:List[torch.Tensor] = []
        audio_pad:torch.Tensor = torch.nn.functional.pad(
            audio.unsqueeze(0),
            (
                self.pipeline_tpad,
                self.pipeline_tpad
            ),
            mode = "reflect"
        ).squeeze()
        p_len:int = audio_pad.shape[0] // self.pipeline_window
        sid:torch.Tensor = torch.tensor(
            sid,device = audio.device
        ).unsqueeze(0).long()
        pitch, pitchf = self.get_f0(
            audio_pad,
            None
        )
        pitch = pitch[:p_len]
        pitchf = pitchf[:p_len]
        pitch = torch.tensor(pitch, device=audio.device).unsqueeze(0).long()
        pitchf = torch.tensor(pitchf, device=audio.device).unsqueeze(0).float()
        for t in opt_ts:
            t:int = t // self.pipeline_window * self.pipeline_window
            audio_opt.append(
                self.vc_func(
                    model = hubert_model,
                    net_g = net_g,
                    sid = sid,
                    audio0 = audio_pad[s : t + self.pipeline_tpad2 + self.pipeline_window],
                    pitch = pitch[:, s // self.pipeline_window : (t + self.pipeline_tpad2) // self.pipeline_window],
                    pitchf = pitchf[:, s // self.pipeline_window : (t + self.pipeline_tpad2) // self.pipeline_window],
                    protect = protect
                )[self.pipeline_tpad_tgt : -self.pipeline_tpad_tgt]
            )
            s = t
        audio_opt.append(
            self.vc_func(
                model = hubert_model,
                net_g = net_g,
                sid = sid,
                audio0 = audio_pad[t:],
                pitch = pitch[:, t // self.pipeline_window :] if t is not None else pitch,
                pitchf = pitchf[:, t // self.pipeline_window :] if t is not None else pitchf,
                protect = protect
            )[self.pipeline_tpad_tgt : -self.pipeline_tpad_tgt]
        )
        audio_opt:torch.Tensor = torch.concatenate(audio_opt)
        if rms_mix_rate != 1:
            audio_opt:torch.Tensor = self.change_rms(audio, 16000, audio_opt, target_sr, rms_mix_rate)
        if target_sr != resample_sr >= 16000:
            audio_opt:torch.Tensor = torchaudio.functional.resample(
                audio_opt,
                target_sr,
                resample_sr
            )
        audio_max:float = torch.abs(audio_opt).max().item() / 0.99
        if audio_max > 1:
            self.max_int16 /= audio_max
        audio_opt = (audio_opt * self.max_int16).to(torch.int16)
        return audio_opt
    def forward(
        self,
        wav:torch.Tensor,
    ) -> torch.Tensor:
        # 计算audio_max
        audio_max:float = torch.abs(wav).max().item() / 0.95
        if audio_max > 1.0:
            wav:torch.Tensor = wav / audio_max

        audio_opt:torch.Tensor = self.pipeline(
            hubert_model = self.hubert_model,
            net_g = self.vc.net_g,
            sid = 0,
            audio = wav,
            target_sr = self.vc.tgt_sr,
            resample_sr = self.resample_sr,
            rms_mix_rate = self.rms_mix_rate,
            protect = self.protect,
        )
        return audio_opt

In [4]:
# 初始化模型
rmvpe = rmvpe = RMVPE(
    is_half = True,
    device = torch.device("cuda"),
    model_path = "./assets/rmvpe/rmvpe.pt"
)

In [5]:
# 初始化模型
pipeline = RVCPipelineTorch(
    hubert_model = hubert_model.cuda(),
    config = config,
    vc = vc,
    rmvpe = rmvpe,
    bh = torch.tensor(butter(N=5, Wn=48, btype="high", fs=16000)[0]).to(torch.float64).cuda(),
    ah = torch.tensor(butter(N=5, Wn=48, btype="high", fs=16000)[1]).to(torch.float64).cuda(),
)

In [6]:
# 验证filtfilt是否会为空
bh = torch.tensor(butter(N=5, Wn=48, btype="high", fs=16000)[0]).to(torch.float64)
ah = torch.tensor(butter(N=5, Wn=48, btype="high", fs=16000)[1]).to(torch.float64)
audio = librosa.load("./tts.wav", sr=16000)[0]
audio = torch.tensor(audio).to(torch.float64)

In [7]:
filtfilt = torchaudio.functional.filtfilt(
    waveform=audio,
    a_coeffs=ah,
    b_coeffs=bh
)

In [8]:
filtfilt

tensor([-2.4828e-07, -2.6656e-07, -2.8495e-07,  ...,  6.6636e-15,
         7.1287e-15,  7.6059e-15], dtype=torch.float64)

In [9]:
# 执行pipeline
output = pipeline(audio.cuda())

  pitch = torch.tensor(pitch, device=audio.device).unsqueeze(0).long()
  pitchf = torch.tensor(pitchf, device=audio.device).unsqueeze(0).float()


In [10]:
output

tensor([-30, -30, -31,  ..., -29, -29, -29], device='cuda:0',
       dtype=torch.int16)

In [11]:
from IPython.display import Audio
Audio(output.cpu().numpy(), rate=40000)