### 背景音人声分离（三种方法选择一种就行）

In [None]:
# 下载UVR模型
from uvr.utils.get_models import download_all_models
import json

models_json = json.load(open("./uvr_models.json", "r"))
download_all_models(models_json)#, save_path="./uvr_model")
# 模型会默认下载到Conda虚拟环境的uvr目录下，最好不要随便更改下载目录，这个项目的模型载入非常神经

##### MDX人声提取

In [None]:
# MDX人声提取，速度相对较慢
from uvr import models
import soundfile as sf
import torch
import gc

audio_file = f"E:/AI/ASR/kotoba/DA3.wav"

init_other_metadata = {
            'segment_size': 256,
            'overlap': 0.75,
            'mdx_batch_size': 1,
            'semitone_shift': 0,
            'adjust': 1.08, 
            'denoise': False,
            'is_invert_spec': False,
            'is_match_frequency_pitch': True,
            'overlap_mdx': None
        }
'''
{'segment_size': 256}
'''

mdx = models.MDX(name="UVR-MDX-NET-Inst_HQ_4", other_metadata=init_other_metadata, logger=None)
# 可以使用MDX23C-8KFFT-InstVoc_HQ，专门用来分离人声的模型，模型更大，理论上效果更好

res = mdx(audio_file)
vocals = res["vocals"]

# 输出人声提取结果
vocals = res["vocals"].T # MDX模型的结果需要转置

uvr_sample_rate = 44100 
# MDX模型的采样率为44100
# 参见https://github.com/jhj0517/ultimatevocalremover_api/blob/3543be1349ce601568787b69cca0c1f8acba7c2e/src/models.py#L294

uvr_result = f"./temp/uvr_result.wav"
sf.write(uvr_result, vocals, uvr_sample_rate, format="WAV")

# 删除模型对象本身
mdx.model_run = None
mdx = None

# 建议Python进行垃圾回收
gc.collect()

# 清理PyTorch在GPU上缓存的但未被使用的内存
if torch.cuda.is_available():
    torch.cuda.empty_cache()

##### Demucs人声提取

In [None]:
# Demucs人声提取，基于ultimatevocalremover_api，速度快一点
import torch
from uvr import models
import soundfile as sf

audio_file = f"E:/AI/ASR/kotoba/DA3.wav"
device = "cuda" if torch.cuda.is_available() else "cpu"

demucs = models.Demucs(name="hdemucs_mmi", other_metadata={"segment":2, "split":True}, device=device, logger=None)

res = demucs(audio_file)
vocals = res["vocals"]

# Demucs 模型需要将torch.tensor转换为numpy数组，然后转置
vocals_numpy = vocals.detach().cpu().numpy()
vocals_numpy = vocals_numpy.T

uvr_sample_rate = 44100
uvr_result = f"./temp/uvr_result.wav"
sf.write(uvr_result, vocals_numpy, uvr_sample_rate, format="WAV")

##### HT Demucs ft (v4)人声提取

In [None]:
# HT Demucs ft (v4)，基于facebookresearch/demucs，无需额外下载模型
import os

audio_file = f"E:/AI/ASR/kotoba/DA3.wav"
full_filename = os.path.basename(audio_file)
source_file_name, suffix = os.path.splitext(full_filename)

!demucs {audio_file} --two-stems vocals --out "./temp"

uvr_result = f"./temp/htdemucs/{source_file_name}/vocals.wav"
HT_Demucs_ft_v4 = True
# 采样率为44100

-----------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------

### Silero人声活动检测及时间戳提取

In [None]:
# 人声活动检测
import torch, gc

VAD_SAMPLING_RATE = 16000
new_hub_dir = './vad_model' 
torch.hub.set_dir(new_hub_dir)

vad_model, vad_utils = torch.hub.load(
    repo_or_dir='snakers4/silero-vad',
    model='silero_vad',
    # force_reload=False,
    force_reload=True, # 每次使用强制下载最新模型
    # onnx=False
)

(get_speech_timestamps, save_audio, read_audio, VADIterator, collect_chunks) = vad_utils

with torch.no_grad():
    # 1. 读取音频
    wav = read_audio(uvr_result, sampling_rate=VAD_SAMPLING_RATE)
        
    # 2. 获取语音活动时间戳
    speech_timestamps = get_speech_timestamps(
        audio=wav,
        model=vad_model,  # 直接使用已加载的模型
        threshold=0.5,  # speech prob threshold (可根据需要调整)
        sampling_rate=VAD_SAMPLING_RATE,
        min_speech_duration_ms=250, #250,  # min speech duration in ms
        max_speech_duration_s=30,
        min_silence_duration_ms=1*1000, #100, # 1.5s ~ 2s
        speech_pad_ms=200, #30,  # spech pad ms
        window_size_samples=512,  # window size
    )

# 卸载模型
vad_model = None

gc.collect()
if torch.cuda.is_available():
    torch.cuda.empty_cache()

In [None]:
# 合并连续的短音频块

def merge_short_segments(
    speech_timestamps: list[dict], 
    sampling_rate: int,
    max_duration_s: float = 10.0,
    max_gap_s: float = 1.0
) -> list[dict]:
    """
    Merges consecutive short audio segments into longer ones.

    Args:
        speech_timestamps (list[dict]): 
            The list of speech segments, e.g., [{'start': ..., 'end': ...}].
        sampling_rate (int): 
            The sampling rate of the audio (e.g., 16000).
        max_duration_s (float): 
            The duration in seconds. Segments shorter than this will be
            considered for merging. Defaults to 10.0.
        max_gap_s (float): 
            The maximum allowed silence gap in seconds between two segments
            to consider them "consecutive". Defaults to 1.0.

    Returns:
        list[dict]: A new list of merged speech segments.
    """
    if not speech_timestamps:
        return []

    # 将秒转换为样本数，便于计算
    max_duration_samples = max_duration_s * sampling_rate
    max_gap_samples = max_gap_s * sampling_rate

    merged_segments = []
    
    # 将第一个片段作为当前合并块的起点
    current_merge_block = speech_timestamps[0].copy()

    for next_segment in speech_timestamps[1:]:
        
        # 计算当前合并块和下一个片段的持续时间
        current_block_duration = current_merge_block['end'] - current_merge_block['start']
        next_segment_duration = next_segment['end'] - next_segment['start']
        
        # 计算它们之间的静默间隔
        gap_between = next_segment['start'] - current_merge_block['end']

        # 检查是否满足所有合并条件
        if (current_block_duration < max_duration_samples and
            next_segment_duration < max_duration_samples and
            gap_between <= max_gap_samples):
            
            # --- 合并 ---
            # 扩展当前块的结束点，以包含下一个片段
            current_merge_block['end'] = next_segment['end']
        else:
            # --- 不合并 ---
            # 1. 将已完成的当前块存入结果列表
            merged_segments.append(current_merge_block)
            # 2. 将下一个片段作为新的合并块起点
            current_merge_block = next_segment.copy()

    # 循环结束后，不要忘记添加最后一个正在处理的块
    merged_segments.append(current_merge_block)

    return merged_segments

merged_segments = merge_short_segments(
    speech_timestamps=speech_timestamps,
    sampling_rate=VAD_SAMPLING_RATE,
    max_duration_s=5,
    max_gap_s=10,
)

In [None]:
import os
import soundfile as sf

def split_audio_by_vad(timestamps, audio_path, vad_sr, out_dir):
    """
    根据VAD时间戳列表切分原始音频文件。

    Args:
        timestamps (list): VAD输出的时间戳字典列表。
        audio_path (str): 原始音频文件的路径。
        vad_sr (int): VAD分析时使用的采样率。
        out_dir (str): 保存切片文件的目录。
    """
    # 确保输出目录存在
    if not os.path.exists(out_dir):
        os.makedirs(out_dir)
        print(f"创建输出目录: {out_dir}")

    # 获取原始音频文件的信息
    try:
        info = sf.info(audio_path)
        original_sr = info.samplerate

    except Exception as e:
        print(f"错误: 无法读取音频文件 {audio_path}。 {e}")
        return

    # 遍历VAD时间戳列表
    for i, ts in enumerate(timestamps):
        # VAD 给出的采样点索引
        vad_start_sample = ts['start']
        vad_end_sample = ts['end']

        # 将 VAD 的采样点索引转换为时间（秒）
        start_time_s = vad_start_sample / vad_sr
        end_time_s = vad_end_sample / vad_sr

        # 将时间（秒）转换为原始文件的采样点索引
        original_start_sample = int(start_time_s * original_sr)
        original_end_sample = int(end_time_s * original_sr)
        
        # 使用 soundfile 从原始文件中读取精确的音频块
        try:
            # 读取指定片段
            chunk_data, _ = sf.read(
                audio_path,
                start=original_start_sample,
                stop=original_end_sample,
                dtype='float32'  # 建议使用float32以保持精度
            )

            # 构造输出文件名
            output_filename = os.path.join(out_dir, f"chunk_{i+1:03d}.wav")
            
            # 将切分出的音频块保存为新文件，使用原始采样率
            sf.write(output_filename, chunk_data, original_sr)

        except Exception as e:
            print(f"  处理切片 {i+1:03d} 时发生错误: {e}\n")

split_audio_by_vad(
    timestamps=merged_segments, 
    audio_path=uvr_result, 
    vad_sr=VAD_SAMPLING_RATE, 
    out_dir="./temp")

-----------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------

### 多模态LLM语音识别（二选一）

##### Gemma-3n语音识别

In [None]:
# Gemma-3n-E4B-it，需要transformers>=4.53.0

##### Phi-4语音识别

In [None]:
# Phi-4-multimodal-instruct，需要transformers==4.51.3，否则可能报错AttributeError: 'Phi4MMModel' object has no attribute 'prepare_inputs_for_generation'，详见https://huggingface.co/microsoft/Phi-4-multimodal-instruct/discussions/75#684aea27aef16192b9f52104
# !uv pip install transformers==4.51.3

import torch
from transformers import AutoModelForCausalLM, AutoProcessor, GenerationConfig

model_path = './Phi-4-multimodal-instruct'

processor = AutoProcessor.from_pretrained(model_path, trust_remote_code=True)

model = AutoModelForCausalLM.from_pretrained(
    model_path,
    trust_remote_code=True,
    torch_dtype=torch.float16,
    # _attn_implementation='flash_attention_2',
    _attn_implementation='sdpa', #,'eager'
).cuda()


In [None]:
import os
import soundfile as sf

generation_config = GenerationConfig.from_pretrained(model_path, 'generation_config.json')
# print(generation_config)
user_prompt = '<|user|>'
assistant_prompt = '<|assistant|>'
prompt_suffix = '<|end|>'

# speech_prompt = "将这段日语语音转写为文本，直接输出文本内容和标点符号，不要输出任何其他内容"
speech_prompt = "Transcribe this Japanese speech into text, ONLY output the text content with punctuation, DO NOT output anything else"
prompt = f'{user_prompt}<|audio_1|>{speech_prompt}{prompt_suffix}{assistant_prompt}'

# 遍历temp文件夹
folder_path='./temp'
all_files = os.listdir(folder_path)
transcribe_result = []
for filename in all_files:
    torch.cuda.empty_cache()
    # 检查文件名是否以 'chunk_' 开头
    if filename.startswith('chunk_'):
        # 构建完整的文件路径
        full_path = os.path.join(folder_path, filename)
            
        # 额外检查是否是文件而不是目录
        if os.path.isfile(full_path):
            try:
                audio_data, sample_rate = sf.read(full_path)
                audio = (audio_data, sample_rate)
                inputs = processor(text=prompt, audios=[audio], return_tensors='pt').to('cuda')

                generate_ids = model.generate(
                    **inputs,
                    num_beams=5,
                    max_new_tokens=128,
                    generation_config=generation_config,
                    num_logits_to_keep=1, # NoneType Slicing Error in `modeling_phi4mm.py`: “bad operand type for unary -”，详见https://huggingface.co/microsoft/Phi-4-multimodal-instruct/discussions/46#67deef41a13e68cb45a8c2c2
                )

                generate_ids = generate_ids[:, inputs['input_ids'].shape[1] :]

                response = processor.batch_decode(
                    generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False
                )[0]
                transcribe_result.append(response)
                # print(response)

            except Exception as e:
                print(f"读取文件 {filename} 时出错: {e}")
    torch.cuda.empty_cache()


In [None]:
# 卸载LLM
model = None
gc.collect()
torch.cuda.empty_cache()

-----------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------

### 生成字幕文件

In [None]:
# 安全检查：确保两个列表长度相同
if len(merged_segments) != len(transcribe_result):
    print("错误：时间戳片段和转写结果的数量不匹配！")
else:
    # 使用 zip 并行遍历
    for segment, text in zip(merged_segments, transcribe_result):
        segment['text'] = text

In [None]:
# 采样点转换为时间

def convert_to_time(sample_index: int, sampling_rate: int) -> str:
    """
    Converts an audio sample index to an SRT timestamp string HH:MM:SS,ms.

    Args:
        sample_index (int): The sample index from the VAD output.
        sampling_rate (int): The sampling rate of the audio (e.g., 16000).

    Returns:
        str: The formatted SRT timestamp.
    """
    # 计算总毫秒数，使用 round 确保精度
    total_milliseconds = round(sample_index / sampling_rate * 1000)

    # 分解为小时、分钟、秒和毫秒
    milliseconds = total_milliseconds % 1000
    total_seconds = total_milliseconds // 1000
    
    seconds = total_seconds % 60
    total_minutes = total_seconds // 60
    
    minutes = total_minutes % 60
    hours = total_minutes // 60

    # 使用 f-string 格式化为 HH:MM:SS,ms
    # :02d 表示整数，宽度为2，不足用0填充
    # :03d 表示整数，宽度为3，不足用0填充
    srt_time = f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}"
    # ffmpeg_time = f"{hours:02d}:{minutes:02d}:{seconds:02d}.{milliseconds:03d}"

    return srt_time # , ffmpeg_time


with open(f'{source_file_name}.srt', 'w', encoding='utf-8') as f:
    # 字幕文件会生成在当前目录下
    block_index = 1
    start_times_srt, end_times_srt = [], []

    for segment in merged_segments:
        # 使用转换函数获取格式化的开始和结束时间
        start_time_srt = convert_to_time(segment['start'], VAD_SAMPLING_RATE)
        end_time_srt = convert_to_time(segment['end'], VAD_SAMPLING_RATE)
    
        # 打印完整的SRT块
        f.write(f"{block_index}"+"\n")
        f.write(f"{start_time_srt} --> {end_time_srt}"+"\n")
        f.write(segment['text']+"\n")
        f.write("\n")

        block_index += 1
