#1挂载云盘

In [None]:

from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

#2安装依赖

In [None]:
!pip uninstall torch torchvision torchaudio -y

# Workaround from: https://github.com/m-bain/whisperX/issues/1027#issuecomment-2627525081
!pip install torch==2.5.1 torchvision==0.20.1 torchaudio==2.5.1 --index-url https://download.pytorch.org/whl/cu121

# WhisperX-related packages:
!pip install ctranslate2==4.4.0
!pip install faster-whisper==1.1.0
# !pip install git+https://github.com/m-bain/whisperx.git
!pip install whisperx==3.3.1

!apt-get update
!apt-get install libcudnn8=8.9.2.26-1+cuda12.1
!apt-get install libcudnn8-dev=8.9.2.26-1+cuda12.1

!python -c "import torch; torch.backends.cuda.matmul.allow_tf32 = True; torch.backends.cudnn.allow_tf32 = True"

#3导入配置

In [None]:
import yaml
path = '/content/gdrive/MyDrive/ASMRASR'
with open ('/content/gdrive/MyDrive/ASMRASR/0config.yaml', 'r', encoding='utf-8') as file:
  config = yaml.safe_load(file)

config["pre_path"] = os.path.join(path, config["pre_path"])
config["work_path"] = os.path.join(path, config["work_path"])
config["asr_path"] = os.path.join(path, config["asr_path"])
config["slice_path"] = os.path.join(path, config["slice_path"])
config["zh_path"] = os.path.join(path, config["zh_path"])
config["result_path"] = os.path.join(path, config["result_path"])
config["model_path"] = os.path.join(path, config["model_path"])

#4提取人声

In [None]:
import os
import subprocess

for root, dirs, files in os.walk(config["pre_path"]):
    for filename in files:
        if filename.endswith((".wav", ".mp3", ".flac")):
            input_audio = os.path.join(root, filename)
            command = [
                "demucs", "--two-stems", "vocals",
                "-o", config["work_path"],
                input_audio
            ]
            subprocess.run(command)
            print(f"提取人声保存至：{input_audio}")

#5转写音频

In [None]:
import os
import subprocess
import torch

audio_paths = []
device = "cuda" if torch.cuda.is_available() else "cpu"
compute_type = "float16" if device == "cuda" else "int8"
print('设备：', device, '类型：', compute_type)

for root, dirs, files in os.walk(config["work_path"]):
    for filename in files:
        if filename.endswith((".wav", ".mp3")):
            audio_path = os.path.join(root, filename)
            print(f"找到: {audio_path}")
            audio_paths.append(audio_path)

if audio_paths:
    command = ["whisperx"] + audio_paths + [
        "--model", config["model"],
        "--model_cache_only", str(config["model_cache_only"]),
        "--device_index", str(config["device_index"]),
        "--device", device,
        "--batch_size", str(config["batch_size"]),
        "--compute_type", compute_type,
        "--output_dir", config["asr_path"],
        "--output_format", config["output_format"],
        "--verbose", str(config["verbose"]),
        "--task", config["task"],
        "--language", config["language"],
        "--align_model", config["align_model"],
        "--interpolate_method", config["interpolate_method"],
        "--vad_method", config["vad_method"],
        "--vad_onset", str(config["vad_onset"]),
        "--vad_offset", str(config["vad_offset"]),
        "--chunk_size", str(config["chunk_size"]),
        "--temperature", str(config["temperature"]),
        "--best_of", str(config["best_of"]),
        "--beam_size", str(config["beam_size"]),
        "--patience", str(config["patience"]),
        "--length_penalty", str(config["length_penalty"]),
        "--suppress_tokens", str(config["suppress_tokens"]),
        "--condition_on_previous_text", str(config["condition_on_previous_text"]),
        "--fp16", str(config["fp16"]),
        "--temperature_increment_on_fallback", str(config["temperature_increment_on_fallback"]),
        "--compression_ratio_threshold", str(config["compression_ratio_threshold"]),
        "--logprob_threshold", str(config["logprob_threshold"]),
        "--no_speech_threshold", str(config["no_speech_threshold"]),
        "--highlight_words", str(config["highlight_words"]),
        "--segment_resolution", config["segment_resolution"],
        "--threads", str(config["threads"]),
        "--print_progress", str(config["print_progress"])
    ]

    if config.get("no_align"):
        command += ["--no_align"]
    if config.get("return_char_alignments"):
        command += ["--return_char_alignments"]
    if config.get("diarize"):
        command += ["--diarize"]
    if config.get("suppress_numerals"):
        command += ["--suppress_numerals"]

    if config.get("min_speakers"):
        command += ["--min_speakers", str(config["min_speakers"])]
    if config.get("max_speakers"):
        command += ["--max_speakers", str(config["max_speakers"])]
    if config.get("initial_prompt"):
        command += ["--initial_prompt", config["initial_prompt"]]
    if config.get("max_line_width"):
        command += ["--max_line_width", str(config["max_line_width"])]
    if config.get("max_line_count"):
        command += ["--max_line_count", str(config["max_line_count"])]
    if config.get("hf_token"):
        command += ["--hf_token", config["hf_token"]]

    subprocess.run(command)


#6字幕分句

In [None]:
import json
import os
import spacy

nlp = spacy.load("ja_ginza_electra")
for root, dirs, files in os.walk(config["asr_path"]):
    for filename in files:
        if filename.endswith(".json"):
            print(f"切分中: {filename}")
            json_path = os.path.join(root, filename)  # 获取音频文件的完整路径
            with open(json_path, "r", encoding="utf-8") as f:
                data = json.load(f)

            output = {
                "segments": [],
                "language": "ja"
            }

            # 遍历每个原始语音段
            for segment in data["segments"]:
                text = segment["text"]
                words = segment["words"]

                # 将整段文本进行分句
                doc = nlp(text)

                # 构造字符位置对应时间戳的索引（按顺序累积）
                char_times = []
                for word in words:
                    char = word["word"]
                    start = word.get("start", None)
                    end = word.get("end", None)
                    char_times.append({
                        "char": char,
                        "start": start,
                        "end": end
                    })

                current_char_index = 0  # 用于遍历 char_times

                # 遍历分好的每个句子
                for sent in doc.sents:
                    sentence = sent.text.strip()
                    if not sentence:
                        continue

                    sent_chars = list(sentence)
                    start_time = None
                    end_time = None
                    matched_indices = []

                    # 逐字符匹配，记录时间戳索引范围
                    i = 0
                    while i < len(sent_chars) and current_char_index < len(char_times):
                        target_char = sent_chars[i]
                        current_char = char_times[current_char_index]["char"]
                        if target_char == current_char:
                            matched_indices.append(current_char_index)
                            i += 1
                        current_char_index += 1

                    # 有匹配上的字符，找第一个有时间戳和最后一个有时间戳的
                    for idx in matched_indices:
                        if start_time is None and char_times[idx]["start"] is not None:
                            start_time = char_times[idx]["start"]
                        if char_times[idx]["end"] is not None:
                            end_time = char_times[idx]["end"]

                    if start_time is not None and end_time is not None:
                        output["segments"].append({
                            "text": sentence,
                            "start": start_time,
                            "end": end_time
                        })
            print(f"切分完毕: {filename}")
            basename = os.path.splitext(filename)[0]
            output_path = os.path.join(config["slice_path"], f"{basename}.json")
            with open(output_path, "w", encoding="utf-8") as f:
                json.dump(output, f, ensure_ascii=False, indent=4)



In [None]:
#7在线翻译

In [None]:
import json
import os
import time
import google.generativeai


for root, dirs, files in os.walk(config["slice_path"]):
    for filename in files:
        if filename.endswith(".json"):
            print(f"正在翻译: {filename}")
            json_path = os.path.join(root, filename)
            with open(json_path, "r", encoding="utf-8") as f:
                data = json.load(f)
            before = [segment["text"] for segment in data["segments"]]
            after = []

            try:
                print(len(before))
                google.generativeai.configure(api_key=config["genimi_token"])
                model = google.generativeai.GenerativeModel('gemini-2.0-flash')
                chat = model.start_chat()
                response = chat.send_message(config["first_prompt"])
                print(f"初始回复：", response.text)

                chunk_size = 30
                for i in range(0, len(before), chunk_size):
                    chunk = before[i:i + chunk_size]

                    joined_text = "\n".join(chunk)

                    try:
                        response = chat.send_message(f'{config["common_prompt"]}{len(chunk)}\n{joined_text}')
                        #print(f"第{i // chunk_size + 1}组：", response.text)
                        after.extend(response.text.splitlines())


                    except Exception as e:
                        print(f"第{i // chunk_size + 1}组出错：{e}")
                        after.append('11111111111')
                    time.sleep(5)


            except Exception as e:
                print("出错了：", e)
            finally:
                basename = os.path.splitext(filename)[0]
                output_path = os.path.join(root, f"{basename}.txt")
                with open(output_path, "a", encoding="utf-8") as f:
                    f.write("\n".join(after) + "\n")

            print(len(after))

            for i, segment in enumerate(data["segments"]):
                segment["text"] = after[i]
            print(f"翻译完成: {filename}")
            basename = os.path.splitext(filename)[0]
            output_path = os.path.join(config["zh_path"], f"{basename}.json")
            with open(output_path, "w", encoding="utf-8") as f:
                json.dump(data, f, ensure_ascii=False, indent=4)

#8导出字幕

In [None]:
import json
import os

from getconfig import get_config


def seconds_to_lrc_timestamp(seconds: float) -> str:
    minutes = int(seconds // 60)
    sec = seconds % 60
    return f"{minutes:02}:{sec:05.2f}"


def seconds_to_srt_timestamp(seconds: float) -> str:
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    secs = int(seconds % 60)
    millis = int((seconds - int(seconds)) * 1000)
    return f"{hours:02}:{minutes:02}:{secs:02},{millis:03}"


for root, dirs, files in os.walk(config["zh_path"]):
    for filename in files:
        if filename.endswith(".json"):
            json_path = os.path.join(root, filename)
            with open(json_path, "r", encoding="utf-8") as f:
                data = json.load(f)
            segments = data.get("segments", [])
            lrc_lines = []
            for seg in segments:
                start = seconds_to_lrc_timestamp(seg["start"])
                end = seconds_to_lrc_timestamp(seg["end"])
                text = seg["text"].strip()

                lrc_lines.append(f"[{start}]{text}")
                lrc_lines.append(f"[{end}]")

            basename = os.path.splitext(filename)[0]
            output_path = os.path.join(root, f"{basename}.lrc")
            with open(output_path, "w", encoding="utf-8") as f:
                f.write("\n".join(lrc_lines))

            print(f"已生成 LRC 文件：{output_path}")


for root, dirs, files in os.walk(config["zh_path"]):
    for filename in files:
        if (filename.startswith("slice-") or filename.startswith("zh-")) and filename.endswith(".json"):
            json_path = os.path.join(root, filename)
            with open(json_path, "r", encoding="utf-8") as f:
                data = json.load(f)

            segments = data.get("segments", [])

            srt_lines = []
            for idx, seg in enumerate(segments, 1):
                start = seconds_to_srt_timestamp(seg["start"])
                end = seconds_to_srt_timestamp(seg["end"])
                text = seg["text"].strip()

                srt_lines.append(f"{idx}")
                srt_lines.append(f"{start} --> {end}")
                srt_lines.append(text)
                srt_lines.append("")

            basename = os.path.splitext(filename)[0]
            output_path = os.path.join(root, f"{basename}.srt")
            with open(output_path, "w", encoding="utf-8") as f:
                f.write("\n".join(srt_lines))

            print(f"已生成 SRT 文件：{output_path}")


