# 翻译Jupyter Notebook为中文

Prompt 针对JWST望远镜数据处理

In [2]:
import os
import json
from glob import glob
from pathlib import Path
import openai  
import re 
from concurrent.futures import ThreadPoolExecutor, as_completed


In [3]:
!pwd

/root/Dropbox/jupyter-ny/jwst


In [4]:
# 设置 OpenAI API Key
API_KEY = ""
client = openai.OpenAI(api_key=API_KEY)
model = "gpt-4o-mini"
temperature = 0.2
max_tokens = 16384

# 设置要遍历的根目录
ROOT_DIR = "/root/Dropbox/jupyter-ny/jwst/notebooks/" 

MAX_WORKERS = 8  # 线程池的最大并行数，可调整
SEPARATOR = "\n   \n" 
TIMEOUT = 300

# 生成OpenAI客户端
client = openai.OpenAI(api_key=API_KEY, timeout=TIMEOUT)


In [5]:
def process_cell(content, cell_type):
    """AI翻译"""
    system_prompt = {
        "markdown": (
            "作为天文学文档翻译专家，精通JWST望远镜数据处理，请：\n"
            "1. 精准翻译Markdown内容\n"
            "2. 保留所有格式标记\n"
            "3. 维持中英术语对照\n"
            "4. 使用流畅的中文母语\n"
        ),
        "code": (
            "作为天文学Python代码专家，精通JWST望远镜数据处理，请：\n"
            "1. 添加行级中文注释\n"
            "2. 保持代码结构不变\n"
            "3. 保留原始功能"
        )
    }[cell_type]
    
    try:
        response = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": f"{content}\n{SEPARATOR}"}
            ],
            temperature=temperature,
            max_tokens=max_tokens
        )
        processed = clean_output(response.choices[0].message.content)
        return processed.split(SEPARATOR)[0].splitlines(keepends=True)
    except Exception as e:
        print(f"处理异常: {str(e)}")
        return content.splitlines(keepends=True)


def clean_output(text):
    """优化输出清理逻辑"""
    text = re.sub(r"```[\w]*", "", text)
    return re.sub(r"\n{3,}", "\n\n", text).strip()

def process_notebook(file_path):
    """直接原路径保存翻译后的文件"""
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            nb_data = json.load(f)

        with ThreadPoolExecutor(max_workers=4) as executor:
            futures = []
            for cell in nb_data["cells"]:
                if cell["cell_type"] in ["markdown", "code"]:
                    future = executor.submit(
                        process_cell,
                        "\n".join(cell["source"]),
                        cell["cell_type"]
                    )
                    futures.append((cell, future))
                else:
                    futures.append((cell, None))

            for cell, future in futures:
                if future:
                    cell["source"] = future.result(timeout=TIMEOUT+10)

        # 直接原目录保存
        original_dir = Path(file_path).parent
        new_file_name = f"{Path(file_path).stem}_cn.ipynb"
        new_file_path = original_dir / new_file_name

        with open(new_file_path, "w", encoding="utf-8") as f:
            json.dump(nb_data, f, indent=2, ensure_ascii=False)
            
        return True
    except Exception as e:
        print(f"文件处理失败: {file_path} - {str(e)}")
        return False

In [6]:
def main():
    """主函数"""
    notebooks = [f for f in glob(f"{ROOT_DIR}/**/*.ipynb", recursive=True)
                if "_cn" not in f and "_translated" not in f]  # 排除已翻译的文件
    
    # 进一步筛选：检查是否已经有对应的翻译文件存在
    notebooks_to_process = []
    for nb in notebooks:
        file_path_obj = Path(nb)
        translated_file = file_path_obj.parent / f"{file_path_obj.stem}_cn.ipynb"
        if not translated_file.exists():
            notebooks_to_process.append(nb)
        else:
            print(f"跳过已存在的翻译: {translated_file}")
    
    print(f"发现 {len(notebooks)} 个文件，需要处理 {len(notebooks_to_process)} 个")
    success_count = 0
    
    if not notebooks_to_process:
        print("没有需要翻译的新文件")
        return
    
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {executor.submit(process_notebook, nb): nb for nb in notebooks_to_process}
        
        for future in as_completed(futures):
            file_name = futures[future]
            try:
                if future.result():
                    success_count += 1
                    print(f"成功处理: {file_name}")
                else:
                    print(f"处理失败: {file_name}")
            except Exception as e:
                print(f"系统错误: {file_name} - {str(e)}")
    
    print(f"处理完成 | 成功: {success_count} | 失败: {len(notebooks_to_process)-success_count}")

In [7]:
if __name__ == "__main__":
    main()

跳过已存在的翻译: /root/Dropbox/jupyter-ny/jwst/notebooks/NIRCam/aperture_photometry/NIRCam_Aperture_Photometry_Example_cn.ipynb
跳过已存在的翻译: /root/Dropbox/jupyter-ny/jwst/notebooks/NIRCam/NIRCam_WFSS_simulating_spectra/Simulating_WFSS_spectra_CRDS_cn.ipynb
跳过已存在的翻译: /root/Dropbox/jupyter-ny/jwst/notebooks/NIRCam/NIRCam_WFSS_Box_extraction/BoxExtraction_using_Grismconf_CRDS_cn.ipynb
跳过已存在的翻译: /root/Dropbox/jupyter-ny/jwst/notebooks/NIRCam/psf_photometry/NIRCam_PSF_Photometry_Example_cn.ipynb
跳过已存在的翻译: /root/Dropbox/jupyter-ny/jwst/notebooks/NIRCam/NIRCam_PSF-matched_photometry/NIRCam_PSF_matched_multiband_photometry_cn.ipynb
跳过已存在的翻译: /root/Dropbox/jupyter-ny/jwst/notebooks/NIRCam/NIRCam_wisp_subtraction/nircam_wisp_subtraction_cn.ipynb
跳过已存在的翻译: /root/Dropbox/jupyter-ny/jwst/notebooks/NIRCam/NIRCam_photometry/NIRCam_multiband_photometry_cn.ipynb
跳过已存在的翻译: /root/Dropbox/jupyter-ny/jwst/notebooks/NIRCam/psf_photometry_with_space_phot/nircam_spacephot_cn.ipynb
跳过已存在的翻译: /root/Dropbox/jupyter-ny/jws

成功处理: /root/Dropbox/jupyter-ny/jwst/notebooks/NIRSpec/cube_fitting/cube_fitting.ipynb
处理完成 | 成功: 1 | 失败: 0
