From 93b6ac6ecb8f742437ac38140cfe2fe6901e5c02 Mon Sep 17 00:00:00 2001 From: ag9920 Date: Sat, 11 Apr 2026 13:51:25 +0800 Subject: [PATCH] feat: add checkpoint/resume for long document processing --- pageindex/client.py | 10 ++++++--- pageindex/config.yaml | 4 +++- pageindex/page_index.py | 46 +++++++++++++++++++++++++++++++++----- pageindex/page_index_md.py | 41 ++++++++++++++++++++++++++------- run_pageindex.py | 22 +++++++++++++++--- 5 files changed, 102 insertions(+), 21 deletions(-) diff --git a/pageindex/client.py b/pageindex/client.py index 894dab181..218d8f08d 100644 --- a/pageindex/client.py +++ b/pageindex/client.py @@ -52,7 +52,7 @@ def __init__(self, api_key: str = None, model: str = None, retrieve_model: str = if self.workspace: self._load_workspace() - def index(self, file_path: str, mode: str = "auto") -> str: + def index(self, file_path: str, mode: str = "auto", checkpoint_dir: str = None, resume: bool = False) -> str: """Index a document. Returns a document_id.""" # Persist a canonical absolute path so workspace reloads do not # reinterpret caller-relative paths against the workspace directory. @@ -74,7 +74,9 @@ def index(self, file_path: str, mode: str = "auto") -> str: if_add_node_summary='yes', if_add_node_text='yes', if_add_node_id='yes', - if_add_doc_description='yes' + if_add_doc_description='yes', + checkpoint_dir=checkpoint_dir, + resume='yes' if resume else None, ) # Extract per-page text so queries don't need the original PDF pages = [] @@ -104,7 +106,9 @@ def index(self, file_path: str, mode: str = "auto") -> str: model=self.model, if_add_doc_description='yes', if_add_node_text='yes', - if_add_node_id='yes' + if_add_node_id='yes', + checkpoint_dir=checkpoint_dir, + resume=resume, ) try: asyncio.get_running_loop() diff --git a/pageindex/config.yaml b/pageindex/config.yaml index 591fe9331..4960c74c5 100644 --- a/pageindex/config.yaml +++ b/pageindex/config.yaml @@ -7,4 +7,6 @@ max_token_num_each_node: 20000 if_add_node_id: "yes" if_add_node_summary: "yes" if_add_doc_description: "no" -if_add_node_text: "no" \ No newline at end of file +if_add_node_text: "no" +checkpoint_dir: null +resume: "no" \ No newline at end of file diff --git a/pageindex/page_index.py b/pageindex/page_index.py index 9004309fb..9d463d580 100644 --- a/pageindex/page_index.py +++ b/pageindex/page_index.py @@ -1063,6 +1063,17 @@ async def tree_parser(page_list, opt, doc=None, logger=None): return toc_tree +def _save_checkpoint(data, path): + if not path: + return + os.makedirs(os.path.dirname(path), exist_ok=True) + tmp_path = path + '.tmp' + with open(tmp_path, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + os.replace(tmp_path, path) + print(f'Checkpoint saved: {path}') + + def page_index_main(doc, opt=None): logger = JsonLogger(doc) @@ -1080,30 +1091,52 @@ def page_index_main(doc, opt=None): logger.info({'total_token': sum([page[1] for page in page_list])}) async def page_index_builder(): - structure = await tree_parser(page_list, opt, doc=doc, logger=logger) + checkpoint_dir = getattr(opt, 'checkpoint_dir', None) + resume = getattr(opt, 'resume', 'no') == 'yes' + doc_name = get_pdf_name(doc) + + tree_ckpt = os.path.join(checkpoint_dir, doc_name + '_tree.json') if checkpoint_dir else None + summary_ckpt = os.path.join(checkpoint_dir, doc_name + '_summary.json') if checkpoint_dir else None + + if resume and checkpoint_dir and summary_ckpt and os.path.isfile(summary_ckpt): + print(f'Resuming from summary checkpoint: {summary_ckpt}') + with open(summary_ckpt, 'r', encoding='utf-8') as f: + structure = json.load(f) + elif resume and checkpoint_dir and tree_ckpt and os.path.isfile(tree_ckpt): + print(f'Resuming from tree checkpoint: {tree_ckpt}') + with open(tree_ckpt, 'r', encoding='utf-8') as f: + structure = json.load(f) + else: + if resume and checkpoint_dir: + raise FileNotFoundError( + f"No checkpoint found in {checkpoint_dir} for '{doc_name}'. " + f"Expected: {tree_ckpt}") + structure = await tree_parser(page_list, opt, doc=doc, logger=logger) + _save_checkpoint(structure, tree_ckpt) + if opt.if_add_node_id == 'yes': - write_node_id(structure) + write_node_id(structure) if opt.if_add_node_text == 'yes': add_node_text(structure, page_list) if opt.if_add_node_summary == 'yes': if opt.if_add_node_text == 'no': add_node_text(structure, page_list) await generate_summaries_for_structure(structure, model=opt.model) + _save_checkpoint(structure, summary_ckpt) if opt.if_add_node_text == 'no': remove_structure_text(structure) if opt.if_add_doc_description == 'yes': - # Create a clean structure without unnecessary fields for description generation clean_structure = create_clean_structure_for_description(structure) doc_description = generate_doc_description(clean_structure, model=opt.model) structure = format_structure(structure, order=['title', 'node_id', 'start_index', 'end_index', 'summary', 'text', 'nodes']) return { - 'doc_name': get_pdf_name(doc), + 'doc_name': doc_name, 'doc_description': doc_description, 'structure': structure, } structure = format_structure(structure, order=['title', 'node_id', 'start_index', 'end_index', 'summary', 'text', 'nodes']) return { - 'doc_name': get_pdf_name(doc), + 'doc_name': doc_name, 'structure': structure, } @@ -1111,7 +1144,8 @@ async def page_index_builder(): def page_index(doc, model=None, toc_check_page_num=None, max_page_num_each_node=None, max_token_num_each_node=None, - if_add_node_id=None, if_add_node_summary=None, if_add_doc_description=None, if_add_node_text=None): + if_add_node_id=None, if_add_node_summary=None, if_add_doc_description=None, if_add_node_text=None, + checkpoint_dir=None, resume=None): user_opt = { arg: value for arg, value in locals().items() diff --git a/pageindex/page_index_md.py b/pageindex/page_index_md.py index 5a5971690..72cde7c2f 100644 --- a/pageindex/page_index_md.py +++ b/pageindex/page_index_md.py @@ -240,7 +240,26 @@ def clean_tree_for_output(tree_nodes): return cleaned_nodes -async def md_to_tree(md_path, if_thinning=False, min_token_threshold=None, if_add_node_summary='no', summary_token_threshold=None, model=None, if_add_doc_description='no', if_add_node_text='no', if_add_node_id='yes'): +def _save_checkpoint_md(data, path): + if not path: + return + os.makedirs(os.path.dirname(path), exist_ok=True) + tmp_path = path + '.tmp' + with open(tmp_path, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + os.replace(tmp_path, path) + print(f'Checkpoint saved: {path}') + + +async def md_to_tree(md_path, if_thinning=False, min_token_threshold=None, if_add_node_summary='no', summary_token_threshold=None, model=None, if_add_doc_description='no', if_add_node_text='no', if_add_node_id='yes', checkpoint_dir=None, resume=False): + doc_name = os.path.splitext(os.path.basename(md_path))[0] + summary_ckpt = os.path.join(checkpoint_dir, doc_name + '_md_summary.json') if checkpoint_dir else None + + if resume and checkpoint_dir and summary_ckpt and os.path.isfile(summary_ckpt): + print(f'Resuming from summary checkpoint: {summary_ckpt}') + with open(summary_ckpt, 'r', encoding='utf-8') as f: + return json.load(f) + with open(md_path, 'r', encoding='utf-8') as f: markdown_content = f.read() line_count = markdown_content.count('\n') + 1 @@ -265,36 +284,42 @@ async def md_to_tree(md_path, if_thinning=False, min_token_threshold=None, if_ad print(f"Formatting tree structure...") if if_add_node_summary == 'yes': - # Always include text for summary generation tree_structure = format_structure(tree_structure, order = ['title', 'node_id', 'line_num', 'summary', 'prefix_summary', 'text', 'nodes']) print(f"Generating summaries for each node...") tree_structure = await generate_summaries_for_structure_md(tree_structure, summary_token_threshold=summary_token_threshold, model=model) if if_add_node_text == 'no': - # Remove text after summary generation if not requested tree_structure = format_structure(tree_structure, order = ['title', 'node_id', 'line_num', 'summary', 'prefix_summary', 'nodes']) if if_add_doc_description == 'yes': print(f"Generating document description...") - # Create a clean structure without unnecessary fields for description generation clean_structure = create_clean_structure_for_description(tree_structure) doc_description = generate_doc_description(clean_structure, model=model) - return { - 'doc_name': os.path.splitext(os.path.basename(md_path))[0], + result = { + 'doc_name': doc_name, 'doc_description': doc_description, 'line_count': line_count, 'structure': tree_structure, } + _save_checkpoint_md(result, summary_ckpt) + return result + + result = { + 'doc_name': doc_name, + 'line_count': line_count, + 'structure': tree_structure, + } + _save_checkpoint_md(result, summary_ckpt) + return result else: - # No summaries needed, format based on text preference if if_add_node_text == 'yes': tree_structure = format_structure(tree_structure, order = ['title', 'node_id', 'line_num', 'summary', 'prefix_summary', 'text', 'nodes']) else: tree_structure = format_structure(tree_structure, order = ['title', 'node_id', 'line_num', 'summary', 'prefix_summary', 'nodes']) return { - 'doc_name': os.path.splitext(os.path.basename(md_path))[0], + 'doc_name': doc_name, 'line_count': line_count, 'structure': tree_structure, } diff --git a/run_pageindex.py b/run_pageindex.py index 673439d89..e513aff18 100644 --- a/run_pageindex.py +++ b/run_pageindex.py @@ -28,7 +28,16 @@ help='Whether to add doc description to the doc') parser.add_argument('--if-add-node-text', type=str, default=None, help='Whether to add text to the node') - + + parser.add_argument('--checkpoint-dir', type=str, default=None, + help='Directory to save/load tree structure checkpoints. ' + 'When set, intermediate results are saved after expensive LLM calls ' + 'so processing can be resumed later with --resume, ' + 'or the checkpoint can be manually edited for correction.') + parser.add_argument('--resume', action='store_true', default=False, + help='Resume from a previously saved checkpoint instead of re-running ' + 'expensive LLM calls (requires --checkpoint-dir)') + # Markdown specific arguments parser.add_argument('--if-thinning', type=str, default='no', help='Whether to apply tree thinning for markdown (markdown only)') @@ -44,13 +53,16 @@ if args.pdf_path and args.md_path: raise ValueError("Only one of --pdf_path or --md_path can be specified") + if args.resume and not args.checkpoint_dir: + raise ValueError("--resume requires --checkpoint-dir to be set") + if args.pdf_path: # Validate PDF file if not args.pdf_path.lower().endswith('.pdf'): raise ValueError("PDF file must have .pdf extension") if not os.path.isfile(args.pdf_path): raise ValueError(f"PDF file not found: {args.pdf_path}") - + # Process PDF file user_opt = { 'model': args.model, @@ -61,6 +73,8 @@ 'if_add_node_summary': args.if_add_node_summary, 'if_add_doc_description': args.if_add_doc_description, 'if_add_node_text': args.if_add_node_text, + 'checkpoint_dir': args.checkpoint_dir, + 'resume': 'yes' if args.resume else None, } opt = ConfigLoader().load({k: v for k, v in user_opt.items() if v is not None}) @@ -117,7 +131,9 @@ model=opt.model, if_add_doc_description=opt.if_add_doc_description, if_add_node_text=opt.if_add_node_text, - if_add_node_id=opt.if_add_node_id + if_add_node_id=opt.if_add_node_id, + checkpoint_dir=args.checkpoint_dir, + resume=args.resume, )) print('Parsing done, saving to file...')