In [1]:
import os
import glob
import subprocess
from tqdm import tqdm
import re
from openai import OpenAI
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
import shutil
import zipfile

In [None]:
llm_api_key = "your llm api key"
base_url = "llm server url"
model = "model name"
client = OpenAI(api_key=llm_api_key, base_url=base_url)
mineru_api_key  = "you can get in https://mineru.net/apiManage/docs"
directory_path = r"a folder path contains pdf files that you want to translate"

In [None]:
# if you install magic-pdf locally, you can use this function.
def process_pdfs_in_dir(dir_path):
    search_pattern = os.path.join(dir_path, '**', '*.pdf')
    pdf_files = glob.glob(search_pattern, recursive=True)
    pdf_files = [pdf_path for pdf_path in pdf_files if not os.path.exists(pdf_path.replace(".pdf", "_zh.md"))]
    
    if not pdf_files:
        print("don't find any pdf files")
        return
    
    output_dir = './output'
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    for pdf_file in tqdm(pdf_files, desc='processing pdf', unit='file'):
        cmd = ['magic-pdf', '-p', pdf_file, '-o', output_dir, '-m', 'txt']
        try:
            result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            print(f"success: {pdf_file}")
        except subprocess.CalledProcessError as e:
            print(f"error: {pdf_file}, message: {e.stderr.decode().strip()}")

In [28]:
upload_url = 'https://mineru.net/api/v4/file-urls/batch'

header = {
    'Content-Type': 'application/json',
    "Authorization": f"Bearer {mineru_api_key}"
}


def base_filename(str):
    illegal_chars = r'[\\/:*?"<>|]'
    cleaned_name = os.path.basename(str)
    cleaned_name = re.sub(illegal_chars, '_', cleaned_name)
    cleaned_name = cleaned_name.strip(' ')
    return cleaned_name

def upload_batch_urls(url_list):
    global upload_url, header
    data = {
        "language": "en",
        "files": [
            {"url": url, 'name': base_filename(url), "data_id": "abcd"}
            for url in url_list
        ]
    }

    try:
        response = requests.post(upload_url, headers=header, json=data)
        if response.status_code == 200:
            result = response.json()
            print('response success. result:{}'.format(result))
            if result["code"] == 0:
                batch_id = result["data"]["batch_id"]
                print('batch_id:{}'.format(batch_id))
                return batch_id
            else:
                raise Exception('submit task failed,reason:{}'.format(result))
        else:
            raise Exception('response not success. status:{} ,result:{}'.format(response.status_code, response))
    except Exception as err:
        print(err)
        raise err

def upload_batch_files(pdf_path_list):
    global upload_url, header
    data = {
        "language": "en",
        "files": [
            {"name": base_filename(pdf_path), "data_id": "abcd"}
            for pdf_path in pdf_path_list
        ]
    }
    print(data)

    try:
        response = requests.post(upload_url, headers=header, json=data)
        if response.status_code == 200:
            result = response.json()
            print('response success. result:{}'.format(result))
            if result["code"] == 0:
                batch_id = result["data"]["batch_id"]
                urls = result["data"]["file_urls"]
                for url_item, pdf_path_item in zip(urls, pdf_path_list):
                    with open(pdf_path_item, 'rb') as f:
                        res_upload = requests.put(url_item, data=f)
                    if res_upload.status_code == 200:
                        print(f"{pdf_path_item} upload success")
                    else:
                        print(f"{pdf_path_item} upload failed")
                print("all pdf upload successfully")
                return batch_id
            else:
                raise Exception('apply upload url failed,reason:{}'.format(result.msg))
        else:
            raise Exception('response not success. status:{} ,result:{}'.format(response.status_code, response))

    except Exception as err:
        print(err)
        raise err
    
def download_unzip(zip_url, file_name):
    if file_name.endswith(".pdf"):
        file_name = file_name[:-4]
    zip_save_path = os.path.join("./zip", file_name)

    try:
        response = requests.get(zip_url, stream=True)
        response.raise_for_status()
        
        with open(zip_save_path, "wb") as file:
            for chunk in response.iter_content(chunk_size=8192):
                file.write(chunk)
        extract_folder = os.path.join("./output", file_name)
        if not os.path.exists(extract_folder):
            os.makedirs(extract_folder, exist_ok=True)
        with zipfile.ZipFile(zip_save_path, "r") as zip_ref:
            zip_ref.extractall(extract_folder)
        
    except Exception as e:
        print(e)
        raise e
    
def monitor_batch(batch_id):
    global header
    monitor_url = f'https://mineru.net/api/v4/extract-results/batch/{batch_id}'

    futures = []
    with ThreadPoolExecutor(max_workers=4) as executor:
        while True:
            res = requests.get(monitor_url, headers=header).json()['data']
            extract_result = res['extract_result']
            all_done = True
            for result in extract_result:
                if result['state'] != 'done':
                    all_done = False
                    file_log = f"file_name:{result['file_name']} state: {result['state']}"
                    if result['state'] == 'running':
                        file_log += f" extracted_pages: {result['extract_progress']['extracted_pages']}"
                        file_log += f" total_pages: {result['extract_progress']['total_pages']}"
                    # print(file_log)
                elif result['file_name'] not in [future.result() for future in futures if future.done()]:
                    print(result['full_zip_url'])
                    futures.append(executor.submit(download_unzip, result['full_zip_url'], result['file_name']))
            if all_done:
                break
            time.sleep(60)

def mineru_parser(pdf_path_list=None, url_list=None):
    if pdf_path_list:
        batch_size = 200
        for i in range(0, len(pdf_path_list), batch_size):
            batch_paths = pdf_path_list[i:i + batch_size]
            files_batch_id = upload_batch_files(batch_paths)
            monitor_batch(files_batch_id)

    if url_list:
        batch_size = 200
        for i in range(0, len(url_list), batch_size):
            batch_urls = url_list[i:i + batch_size]
            url_batch_id = upload_batch_urls(batch_urls)
            monitor_batch(url_batch_id)

def mineru_parser_directory(directory):
    search_pattern = os.path.join(directory, '**', '*.pdf')
    pdf_files = glob.glob(search_pattern, recursive=True)
    pdf_files = [pdf_path for pdf_path in pdf_files if not os.path.exists(pdf_path.replace(".pdf", "_zh.md"))]
    mineru_parser(pdf_path_list=pdf_files)

In [23]:
mineru_parser_directory(directory_path)

{'language': 'en', 'files': [{'name': '2409.13731v3.pdf', 'data_id': 'abcd'}]}
response success. result:{'code': 0, 'msg': 'ok', 'trace_id': '8b0103db8349a82c70e40885f3330fc7', 'data': {'batch_id': '44c8000a-ae8d-449e-be70-51188e8c5dc5', 'file_urls': ['https://mineru.oss-cn-shanghai.aliyuncs.com/api-upload/44c8000a-ae8d-449e-be70-51188e8c5dc5/747b3538-0f1c-47ff-84ac-c65605c3b7e3.pdf?Expires=1740662330&OSSAccessKeyId=LTAI5t9nGwatk85zetzojXbn&Signature=rW91YsJqsKvJ9wi%2BzsYs2elRV9E%3D']}}
D:\Postgraduatev3\Papers\my_idea_pdf\2409.13731v3.pdf upload success
all pdf upload successfully
https://cdn-mineru.openxlab.org.cn/pdf/33394786-d0c2-4681-87fc-d7ed8f993ba7.zip


In [None]:
def enable_translate(line_str):
    line = line_str.strip(" \n")
    if line == "":
        return False
    if line.startswith("#"):
        return False
    if line.startswith("![]"):
        return False
    if line.startswith("$$"):
        return False
    if line.startswith("<html>"):
        return False
    if re.match(r'^\[[A-Za-z\s]+[.]?,\s\d+[A-Za-z]?\]', line):
        return False
    return True

tranlate_prompt = '''将以下输入的英文文本翻译为中文，请直接输出翻译结果，不附带任何其他的解释和说明格式。。其中，内嵌$和$包围的内嵌latex表达式，不翻译。'''
def translate_step(text):
    global client, tranlate_prompt, model
    response = client.chat.completions.create(
        model = model,
        messages=[
            {"role": "system", "content": tranlate_prompt},
            {"role": "user", "content": f"输入的英文文本：\n{text}"}
        ],
        stream = False
    )
    return response.choices[0].message.content

def translate(text, retries=1):
    for attempt in range(retries + 1):
        try:
            return translate_step(text)
        except Exception as e:
            print(f"Translation attempt {attempt + 1} failed with error: {e}. Retrying...")
            if attempt >= retries:
                raise e

def process_section(section):
    if enable_translate(section):
        return translate(section)
    else:
        return section
    
def format_figure(str):
    pattern = re.compile(r'!\[(.*?)\]\((.*?)\)')
    result = re.sub(pattern, r'![\1](\2)\n\n', str)
    return result

def translate_md_files(dir_path):
    search_pattern = os.path.join(dir_path, '**', '*.md')
    md_files = glob.glob(search_pattern, recursive=True)
    md_files = [md_path for md_path in md_files if not md_path.endswith("_zh.md")]
    md_files = [md_path for md_path in md_files if not os.path.exists(md_path.replace(".md", '_zh.md'))]

    for md_path in tqdm(md_files, desc='Processing files', unit='file'):
        with open(md_path, encoding='utf-8') as f:
            sections = format_figure(f.read()).split("\n\n")
        output_path = md_path.replace(".md", "_zh.md")
        if os.path.exists(output_path):
            continue
        translated_sections = [None] * len(sections)
        futures = []
        max_workers=os.cpu_count()-4
        # max_workers=1
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            for idx, section in enumerate(sections):
                futures.append((idx, executor.submit(process_section, section)))
            for idx, future in futures:
                translated_sections[idx] = future.result()

        output_content = "\n\n".join(translated_sections)
        
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write(output_content)
    return True

In [26]:
translate_md_files("./output")

Processing files: 100%|██████████| 1/1 [00:59<00:00, 59.23s/file]


True

In [None]:
def copy_files(src_dir, dst_dir):
    if not os.path.exists(dst_dir):
        os.makedirs(dst_dir)
    files = [f for f in os.listdir(src_dir) if os.path.isfile(os.path.join(src_dir, f))]

    for file in files:
        src_file = os.path.join(src_dir, file)
        dst_file = os.path.join(dst_dir, file)
        shutil.copy2(src_file, dst_file)

def post_return(directory_path):
    search_pattern = os.path.join(directory_path, '**', '*.pdf')
    pdf_files = glob.glob(search_pattern, recursive=True)
    pdf_files = [pdf_path for pdf_path in pdf_files if not os.path.exists(pdf_path.replace(".pdf", "_zh.md"))]

    def copy_file_with_progress(pdf_path):
        base_name_with_suffix = os.path.basename(pdf_path)
        base_name_without_suffix = base_name_with_suffix[:-4]
        target_dir = os.path.dirname(pdf_path)
        source_dir = f"./output/{base_name_without_suffix}"
        if os.path.exists(f"{source_dir}/txt"):
            source_dir = f"{source_dir}/txt"
        zh_md_path = glob.glob(os.path.join(source_dir, '*_zh.md'))[0]
        # zh_md_path = os.path.join(source_dir, "full_zh.md")
        new_zh_md_path = pdf_path.replace(".pdf", "_zh.md")
        shutil.copy2(zh_md_path, new_zh_md_path)
        source_images_dir = os.path.join(source_dir, "images")
        target_images_dir = os.path.join(target_dir, "images")
        os.makedirs(target_images_dir, exist_ok=True)
        copy_files(source_images_dir, target_images_dir)
        return pdf_path


    with ThreadPoolExecutor(max_workers=os.cpu_count()-2) as executor:
        futures = {executor.submit(copy_file_with_progress, pdf_path): pdf_path for pdf_path in pdf_files}
        for future in tqdm(as_completed(futures), total=len(futures), desc='Copying files'):
            try:
                future.result()
            except Exception as e:
                print(f"Generated an exception: {e}")

In [30]:
post_return(directory_path)

Copying files: 100%|██████████| 1/1 [00:00<00:00, 31.36it/s]


In [None]:
def md_filter(text):
    text = text.replace("<html><body>", "")
    text = text.replace("</body></html>", "")
    text = text.replace("•", "\n- ")
    return text

def post_process_md(directory_path):
    search_pattern = os.path.join(directory_path, '**', '*_zh.md')
    md_files = glob.glob(search_pattern, recursive=True)
    
    for file_path in md_files:
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()
        
        processed_content = md_filter(content)
        with open(file_path, 'w', encoding='utf-8') as file:
            file.write(processed_content)

In [32]:
post_process_md(directory_path)

In [None]:
def clean_filename(filename):
    invalid_chars = r'[<>:"/\\|?*]'
    return re.sub(invalid_chars, '', filename)

def build_name_dict(directory_path):
    result_dict = {}
    
    for root, dirs, files in os.walk(directory_path):
        for file in files:
            if file.endswith('_zh.md'):
                file_path = os.path.join(root, file)
                with open(file_path, 'r', encoding='utf-8') as f:
                    first_line = f.readline().strip()
                    if first_line.startswith('#'):
                        key = os.path.basename(file).replace('_zh.md', '')
                        value = clean_filename(first_line[2:])
                        result_dict[key] = value
    
    return result_dict


def rename_dir(directory_path):
    result = build_name_dict(directory_path)
    for root, dirs, files in os.walk(directory_path):
        for file in files:
            if file.endswith(".pdf"):
                base_name_without_suffix = os.path.basename(file)[:-4]
                if base_name_without_suffix in result:
                    old_pdf_path = os.path.join(root, file)
                    new_pdf_path = old_pdf_path.replace(base_name_without_suffix, result[base_name_without_suffix])
                    os.rename(old_pdf_path, new_pdf_path)
                    old_md_path = os.path.join(root, file.replace(".pdf", "_zh.md"))
                    new_md_path = old_md_path.replace(base_name_without_suffix, result[base_name_without_suffix])
                    os.rename(old_md_path, new_md_path)

In [None]:
rename_dir(directory_path)