From 9f9e087a99e57402a656e48c61aa06528d4c7436 Mon Sep 17 00:00:00 2001 From: Halleluyah Oludele Date: Thu, 28 May 2026 15:33:08 +0100 Subject: [PATCH] fix(ingest): per-LLM-call timeout + leaf-section cap (un-stick big PDFs) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two ingest bugs that froze FinanceBench ingests and are real product defects on any large filing: 1. No per-LLM-call timeout. A single hung summarize / HyDE / multi-axis / TOC-build call blocked the stage's errgroup Wait() forever — a doc was observed stuck in `summarizing` for 13+ hours. Fix: completeWithTimeout wraps every individual LLM.Complete in a context.WithTimeout (default 90s, ingest.llm_call_timeout_seconds / VLE_INGEST_LLM_CALL_TIMEOUT_SECONDS). On timeout the call is logged and skipped — the section keeps its existing/empty summary and the document still reaches `ready`. One bad call can no longer freeze a whole document. 2. Leaf-section explosion. chunkOversizedLeaves splits any leaf over 2400 chars into ~900-char pieces, so a 45K-char "Notes to Financial Statements" section shattered into ~50 chunks; a 92-page 10-K produced ~1500 leaves, each costing a summarize+HyDE+multi-axis LLM call → the slow/stalled ingest. Fix: capLeafSections enforces a ceiling (default 400, ingest.max_sections / VLE_INGEST_MAX_SECTIONS) by merging the smallest adjacent leaf siblings under a shared parent until the count is within budget. Content is preserved (blank-line joined), page ranges unioned, and table sections — attached after this pass — are never merged. Applied in both the heuristic and outline parse paths. The cap runs at its default (400) through the existing RegistryFromTableOpts → NewPDFWithTables path, so the fix is active on the deployed binary without a cmd wiring change. ingest.max_sections becoming operator-tunable end-to-end is a small follow-up in the cmd binaries. Tests: a hung-call mock proves the pipeline still completes and other sections summarize; cap tests prove merge-down to budget, smallest-pair ordering, content preservation, and the disabled (<=0) escape hatch. go build/vet/test all green. --- pkg/config/config.go | 51 +++++++++++- pkg/ingest/hyde.go | 15 +++- pkg/ingest/hyde_test.go | 8 +- pkg/ingest/ingest.go | 73 +++++++++++++++-- pkg/ingest/summary_axes.go | 14 +++- pkg/ingest/toc_builder.go | 34 ++++++-- pkg/parser/cap_test.go | 91 +++++++++++++++++++++ pkg/parser/pdf.go | 159 ++++++++++++++++++++++++++++++++++++- 8 files changed, 414 insertions(+), 31 deletions(-) create mode 100644 pkg/parser/cap_test.go diff --git a/pkg/config/config.go b/pkg/config/config.go index 5afde5d..ba40628 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -65,6 +65,37 @@ type IngestConfig struct { // 4 + 4 per-stage caps while staying well below typical provider // per-tenant concurrency limits. GlobalLLMConcurrency int `yaml:"global_llm_concurrency"` + + // LLMCallTimeoutSeconds bounds each INDIVIDUAL LLM call issued by the + // ingest pipeline (one section's summary, one leaf's HyDE questions, + // one TOC detect/extract/verify turn). Without it, a single provider + // call that hangs — no response, no error — blocks its bounded- + // concurrency errgroup forever, and the document never leaves + // `summarizing`. (Observed: a doc stuck for 13+ hours.) + // + // When a call exceeds this deadline it is treated exactly like any + // other per-section failure: logged and skipped, leaving that + // section with its existing/empty summary. One bad section can no + // longer freeze the whole document — it still reaches `ready`. + // + // 0 (or omitted) defaults to 90. Set explicitly to tune for slow + // reasoning models; a negative value is rejected by Validate. + LLMCallTimeoutSeconds int `yaml:"llm_call_timeout_seconds"` + + // MaxSections caps the number of leaf sections a single document may + // produce. A pathological PDF (e.g. a 92-page 10-Q whose every bold + // table cell looks like a heading) can shatter into ~1500 leaves, + // each of which then costs a summarize + HyDE LLM call — thousands of + // calls that throttle or stall ingest. + // + // When the parsed leaf count exceeds this cap, the parser merges + // adjacent small leaf sections under a shared parent (smallest + // siblings first) until the document is back under the cap. + // + // 0 (or omitted) defaults to 400 — comfortably above a real filing's + // section count (~170-510 with tables) while still catching the + // runaway case. A negative value is rejected by Validate. + MaxSections int `yaml:"max_sections"` } // TOCBlock configures the LLM-driven table-of-contents tree @@ -658,7 +689,9 @@ func Default() Config { }, }, Ingest: IngestConfig{ - GlobalLLMConcurrency: 12, + GlobalLLMConcurrency: 12, + LLMCallTimeoutSeconds: 90, + MaxSections: 400, HyDE: HyDEConfig{ Enabled: true, NumQuestions: 5, @@ -814,6 +847,16 @@ func applyEnvOverrides(c *Config) { c.Ingest.GlobalLLMConcurrency = n } } + if v := os.Getenv("VLE_INGEST_LLM_CALL_TIMEOUT_SECONDS"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n >= 0 { + c.Ingest.LLMCallTimeoutSeconds = n + } + } + if v := os.Getenv("VLE_INGEST_MAX_SECTIONS"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n >= 0 { + c.Ingest.MaxSections = n + } + } // pdftable-driven table extraction. if v := os.Getenv("VLE_INGEST_TABLES_ENABLED"); v != "" { switch strings.ToLower(strings.TrimSpace(v)) { @@ -1091,6 +1134,12 @@ func (c Config) Validate() error { if c.Ingest.GlobalLLMConcurrency < 0 { return fmt.Errorf("ingest.global_llm_concurrency must be >= 0, got %d", c.Ingest.GlobalLLMConcurrency) } + if c.Ingest.LLMCallTimeoutSeconds < 0 { + return fmt.Errorf("ingest.llm_call_timeout_seconds must be >= 0, got %d", c.Ingest.LLMCallTimeoutSeconds) + } + if c.Ingest.MaxSections < 0 { + return fmt.Errorf("ingest.max_sections must be >= 0, got %d", c.Ingest.MaxSections) + } switch c.Ingest.Tables.VerticalStrategy { case "", "lines", "lines_strict", "text", "explicit": diff --git a/pkg/ingest/hyde.go b/pkg/ingest/hyde.go index 2fe6d8d..06a7b86 100644 --- a/pkg/ingest/hyde.go +++ b/pkg/ingest/hyde.go @@ -8,6 +8,7 @@ import ( "io" "strings" "sync" + "time" "golang.org/x/sync/errgroup" @@ -158,7 +159,7 @@ func (p *Pipeline) candidateQuestionsFor(ctx context.Context, s db.Section, prof JSONSchema: []byte(hydeJSONSchema), } - questions, err := runHyDEWithRetry(ctx, p.LLM, req, defaultHyDERetries) + questions, err := runHyDEWithRetry(ctx, p.LLM, req, defaultHyDERetries, p.LLMCallTimeout) if err != nil { return nil, err } @@ -176,7 +177,12 @@ const defaultHyDERetries = 2 // parse failure returns an error so the caller can log it; transport // errors propagate. ErrNotImplemented (stub LLM) degrades to "no // questions" so test paths keep working. -func runHyDEWithRetry(ctx context.Context, client llmgate.Client, baseReq llmgate.Request, maxRetries int) ([]string, error) { +// +// timeout bounds each individual Complete call. A per-call timeout (or +// context cancellation) is terminal — it short-circuits the retry loop +// rather than re-issuing a call that just hung. A non-positive timeout +// disables the per-call bound. +func runHyDEWithRetry(ctx context.Context, client llmgate.Client, baseReq llmgate.Request, maxRetries int, timeout time.Duration) ([]string, error) { if maxRetries < 0 { maxRetries = 0 } @@ -193,11 +199,12 @@ func runHyDEWithRetry(ctx context.Context, client llmgate.Client, baseReq llmgat } req.Messages = msgs } - resp, err := client.Complete(ctx, req) + resp, err := completeWithTimeout(ctx, client, req, timeout) if err != nil { // Stub clients return ErrNotImplemented — treat as "no // questions" so the pipeline proceeds without LLM access - // in test setups. + // in test setups. Per-call timeouts and other transport + // errors propagate (the caller skips this section). if errors.Is(err, llmgate.ErrNotImplemented) { return nil, nil } diff --git a/pkg/ingest/hyde_test.go b/pkg/ingest/hyde_test.go index 9821d76..ed11552 100644 --- a/pkg/ingest/hyde_test.go +++ b/pkg/ingest/hyde_test.go @@ -91,7 +91,7 @@ func TestRunHyDEWithRetryHappy(t *testing.T) { m := &llmgate.Mock{Reply: `{"questions":["Q1","Q2","Q3","Q4","Q5"]}`} got, err := runHyDEWithRetry(context.Background(), m, llmgate.Request{ Messages: []llmgate.Message{{Role: llmgate.RoleUser, Content: "go"}}, - }, 2) + }, 2, 0) if err != nil { t.Fatalf("happy path: %v", err) } @@ -118,7 +118,7 @@ func TestRunHyDEWithRetryRetriesOnNonJSON(t *testing.T) { } got, err := runHyDEWithRetry(context.Background(), m, llmgate.Request{ Messages: []llmgate.Message{{Role: llmgate.RoleUser, Content: "go"}}, - }, 2) + }, 2, 0) if err != nil { t.Fatalf("should recover on 3rd attempt: %v", err) } @@ -134,7 +134,7 @@ func TestRunHyDEWithRetryFinalParseFailReturnsError(t *testing.T) { m := &llmgate.Mock{Reply: "no JSON anywhere here, just prose."} _, err := runHyDEWithRetry(context.Background(), m, llmgate.Request{ Messages: []llmgate.Message{{Role: llmgate.RoleUser, Content: "go"}}, - }, 2) + }, 2, 0) if err == nil { t.Error("want final-parse error after all retries fail") } @@ -181,7 +181,7 @@ func TestHyDEGracefulOnNonJSON(t *testing.T) { _, err := runHyDEWithRetry(context.Background(), m, llmgate.Request{ Messages: []llmgate.Message{{Role: llmgate.RoleUser, Content: "u"}}, - }, 2) + }, 2, 0) if err == nil { t.Fatal("want graceful error after 3 failed attempts") } diff --git a/pkg/ingest/ingest.go b/pkg/ingest/ingest.go index 71d6e8d..2adf859 100644 --- a/pkg/ingest/ingest.go +++ b/pkg/ingest/ingest.go @@ -157,12 +157,37 @@ type Pipeline struct { // for a table of contents. Default: 20. TOCCheckPages int + // LLMCallTimeout bounds each INDIVIDUAL LLM call the pipeline issues + // (one section's summary, one leaf's HyDE questions, one TOC + // detect/extract/verify turn). It is the safety valve against a + // provider call that hangs with neither a response nor an error: + // without it, that call's bounded-concurrency errgroup blocks on + // Wait() forever and the document never leaves `summarizing` (observed + // stuck for 13+ hours). + // + // A call that exceeds this deadline is handled exactly like any other + // per-section failure: the surrounding stage logs it and skips the + // section, leaving its existing/empty summary. One hung call can no + // longer freeze the whole document. + // + // Zero (the Go zero value, used by Pipeline literals in tests) means + // "no per-call timeout" so existing test paths that don't set it keep + // their unbounded behaviour. NewPipeline defaults it to 90s. + LLMCallTimeout time.Duration + // globalLLMSem is the lazily-initialized shared semaphore enforcing // GlobalLLMConcurrency. nil means "no global cap" — callers fall back // to per-stage limits only. globalLLMSem chan struct{} } +// defaultLLMCallTimeout is the per-call deadline NewPipeline applies when +// LLMCallTimeout is left unset. 90s is generous for a single summarize / +// HyDE / TOC turn even on a slow reasoning model, while still being short +// enough that a hung call is reaped in seconds-to-low-minutes rather than +// blocking the document forever. +const defaultLLMCallTimeout = 90 * time.Second + // NewPipeline returns a Pipeline with sensible defaults filled in. func NewPipeline(p Pipeline) *Pipeline { if p.SummaryMaxChars == 0 { @@ -210,12 +235,45 @@ func NewPipeline(p Pipeline) *Pipeline { if p.GlobalLLMConcurrency > 0 { p.globalLLMSem = make(chan struct{}, p.GlobalLLMConcurrency) } + // A per-call timeout is the difference between "one bad section" and + // "the whole document wedged for hours", so NewPipeline always fills + // one in. Pipeline literals (test paths) that leave it zero keep the + // historical unbounded behaviour. + if p.LLMCallTimeout <= 0 { + p.LLMCallTimeout = defaultLLMCallTimeout + } if p.Logger == nil { p.Logger = slog.Default() } return &p } +// completeWithTimeout issues a single LLM call bounded by timeout. A +// non-positive timeout disables the bound (calls ctx directly), preserving +// the legacy behaviour for Pipeline literals that never set one. +// +// The returned error on deadline expiry is context.DeadlineExceeded +// wrapped by the client — callers in the ingest stages already treat any +// Complete error as a non-fatal per-section skip, so a timeout slots into +// the existing degrade-and-continue path with no special-casing. +func completeWithTimeout(ctx context.Context, client llmgate.Client, req llmgate.Request, timeout time.Duration) (*llmgate.Response, error) { + if timeout <= 0 { + return client.Complete(ctx, req) + } + callCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + return client.Complete(callCtx, req) +} + +// isTimeout reports whether err is a context deadline/cancellation — the +// signature of a per-call LLM timeout. The ingest retry loops use it to +// stop retrying immediately on a timeout: re-issuing a call that just hung +// would only multiply the wall-time cost (N retries × the timeout) without +// changing the outcome, so a timeout is terminal, not retryable. +func isTimeout(err error) bool { + return errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) +} + // acquireGlobalLLM blocks until a global-LLM-concurrency slot is free, // or returns false if ctx is canceled first. Returns a release func the // caller must invoke (typically deferred). Safe to call when the global @@ -331,10 +389,11 @@ func (p *Pipeline) runTOCBuilder(ctx context.Context, docID tree.DocumentID, par model = p.SummaryModel } builder := &TOCBuilder{ - LLM: p.LLM, - Model: model, - Concurrency: p.TOCConcurrency, - TOCCheckPages: p.TOCCheckPages, + LLM: p.LLM, + Model: model, + Concurrency: p.TOCConcurrency, + TOCCheckPages: p.TOCCheckPages, + LLMCallTimeout: p.LLMCallTimeout, } nodes, usage, err := builder.Build(ctx, pages) if err != nil { @@ -717,7 +776,7 @@ func (p *Pipeline) summaryFor(ctx context.Context, s db.Section, childLines []st // request. Kept for the SummaryAxesEnabled=false opt-out branch and // for unit tests that build a Pipeline literal without the new flag. func (p *Pipeline) legacyOneLineSummary(ctx context.Context, s db.Section, body, profile string) (string, error) { - resp, err := p.LLM.Complete(ctx, llmgate.Request{ + resp, err := completeWithTimeout(ctx, p.LLM, llmgate.Request{ Model: p.SummaryModel, Temperature: 0.0, MaxTokens: 260, @@ -727,7 +786,7 @@ func (p *Pipeline) legacyOneLineSummary(ctx context.Context, s db.Section, body, "Section titled %q.\n\n%s\n\nReturn a single sentence (≤ 60 words) that names this section's concrete topics, entities, identifiers, and key items so a retrieval engine can match it to user questions.", cleanForLLM(s.Title), body)}, }, - }) + }, p.LLMCallTimeout) if err != nil { // Stub LLMs return ErrNotImplemented. Degrade gracefully: use a // truncated excerpt as the "summary" so downstream retrieval @@ -770,7 +829,7 @@ func (p *Pipeline) structuredSummaryFor(ctx context.Context, s db.Section, body, JSONSchema: []byte(summaryAxesJSONSchema), } - axes, rawText, err := runSummaryAxesWithRetry(ctx, p.LLM, req, defaultSummaryAxesRetries) + axes, rawText, err := runSummaryAxesWithRetry(ctx, p.LLM, req, defaultSummaryAxesRetries, p.LLMCallTimeout) if err != nil { // Transport / ErrNotImplemented / unrecoverable: fall back to a // text excerpt as OneLine with empty axes. Never fail ingest. diff --git a/pkg/ingest/summary_axes.go b/pkg/ingest/summary_axes.go index 8dad5b2..7bab4b9 100644 --- a/pkg/ingest/summary_axes.go +++ b/pkg/ingest/summary_axes.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strings" + "time" "github.com/hallelx2/llmgate" @@ -82,7 +83,13 @@ const defaultSummaryAxesRetries = 2 // // ErrNotImplemented (stub LLM) is folded into the error return so // callers can degrade to the text fallback path. -func runSummaryAxesWithRetry(ctx context.Context, client llmgate.Client, baseReq llmgate.Request, maxRetries int) (*tree.SummaryAxes, string, error) { +// +// timeout bounds each individual Complete call. A per-call timeout (or +// context cancellation) is terminal — it short-circuits the retry loop +// and returns the error immediately, because re-issuing a call that just +// hung would only multiply the wall-time cost. A non-positive timeout +// disables the per-call bound. +func runSummaryAxesWithRetry(ctx context.Context, client llmgate.Client, baseReq llmgate.Request, maxRetries int, timeout time.Duration) (*tree.SummaryAxes, string, error) { if maxRetries < 0 { maxRetries = 0 } @@ -99,10 +106,11 @@ func runSummaryAxesWithRetry(ctx context.Context, client llmgate.Client, baseReq } req.Messages = msgs } - resp, err := client.Complete(ctx, req) + resp, err := completeWithTimeout(ctx, client, req, timeout) if err != nil { // ErrNotImplemented bubbles up so the caller can use a - // purely text-based fallback. Transport errors do the same. + // purely text-based fallback. Transport errors and per-call + // timeouts do the same (a timeout is terminal — no retry). return nil, lastRaw, err } lastRaw = resp.Content diff --git a/pkg/ingest/toc_builder.go b/pkg/ingest/toc_builder.go index 3466cc5..b183a66 100644 --- a/pkg/ingest/toc_builder.go +++ b/pkg/ingest/toc_builder.go @@ -8,6 +8,7 @@ import ( "log" "strings" "sync" + "time" "golang.org/x/sync/errgroup" @@ -66,6 +67,17 @@ type TOCBuilder struct { // document with no TOC by page 20 almost never has one // further in. Default: 20. TOCCheckPages int + + // LLMCallTimeout bounds each individual LLM call the builder issues + // (every detect, extract, no-TOC-generate, and verify turn). It is + // the same safety valve the rest of the ingest pipeline uses: a + // single provider call that hangs must not block the builder's + // verification errgroup — and therefore the whole document — forever. + // + // Zero disables the per-call bound, preserving the legacy behaviour + // for TOCBuilder literals in tests. The pipeline sets it from + // Pipeline.LLMCallTimeout (default 90s). + LLMCallTimeout time.Duration } // Usage is the cumulative LLM accounting returned by Build. Mirrors @@ -214,7 +226,7 @@ Please note: abstract, summary, notation list, figure list, table list, etc. are JSONMode: true, JSONSchema: []byte(tocDetectorJSONSchema), } - raw, err := runTOCJSONWithRetry(ctx, b.LLM, req, defaultTOCRetries, usage) + raw, err := runTOCJSONWithRetry(ctx, b.LLM, req, defaultTOCRetries, usage, b.LLMCallTimeout) if err != nil { return false, err } @@ -269,7 +281,7 @@ Return ONLY a JSON object: {"nodes": [{"structure": "1", "title": "...", "physic JSONMode: true, JSONSchema: []byte(tocNodesJSONSchema), } - raw, err := runTOCJSONWithRetry(ctx, b.LLM, req, defaultTOCRetries, usage) + raw, err := runTOCJSONWithRetry(ctx, b.LLM, req, defaultTOCRetries, usage, b.LLMCallTimeout) if err != nil { return nil, err } @@ -307,7 +319,7 @@ Return ONLY a JSON object: {"nodes": [{"structure": "1", "title": "...", "physic JSONMode: true, JSONSchema: []byte(tocNodesJSONSchema), } - raw, err := runTOCJSONWithRetry(ctx, b.LLM, req, defaultTOCRetries, usage) + raw, err := runTOCJSONWithRetry(ctx, b.LLM, req, defaultTOCRetries, usage, b.LLMCallTimeout) if err != nil { return nil, err } @@ -423,7 +435,7 @@ Directly return the final JSON structure. Do not output anything else.`, title, JSONMode: true, JSONSchema: []byte(tocVerifyJSONSchema), } - raw, err := runTOCJSONWithRetry(ctx, b.LLM, req, defaultTOCRetries, usage) + raw, err := runTOCJSONWithRetry(ctx, b.LLM, req, defaultTOCRetries, usage, b.LLMCallTimeout) if err != nil { return false, err } @@ -491,7 +503,12 @@ type tocNodesPayload struct { // Returns the final raw response text (empty on transport / stub // failure). Caller decodes; a final parse failure degrades to "no // usable response" rather than an error. -func runTOCJSONWithRetry(ctx context.Context, client llmgate.Client, baseReq llmgate.Request, maxRetries int, usage *Usage) (string, error) { +// +// timeout bounds each individual Complete call. A per-call timeout (or +// context cancellation) is terminal — it short-circuits the retry loop +// rather than re-issuing a call that just hung. A non-positive timeout +// disables the per-call bound. +func runTOCJSONWithRetry(ctx context.Context, client llmgate.Client, baseReq llmgate.Request, maxRetries int, usage *Usage, timeout time.Duration) (string, error) { if maxRetries < 0 { maxRetries = 0 } @@ -508,11 +525,12 @@ func runTOCJSONWithRetry(ctx context.Context, client llmgate.Client, baseReq llm } req.Messages = msgs } - resp, err := client.Complete(ctx, req) + resp, err := completeWithTimeout(ctx, client, req, timeout) if err != nil { // Stub LLM (ErrNotImplemented) is a soft failure — the - // caller will degrade. Transport errors do the same so - // ingest never dies on a transient blip. + // caller will degrade. Transport errors and per-call + // timeouts do the same so ingest never dies on a transient + // blip or a hung call. if errors.Is(err, llmgate.ErrNotImplemented) { return "", nil } diff --git a/pkg/parser/cap_test.go b/pkg/parser/cap_test.go new file mode 100644 index 0000000..fa1fd61 --- /dev/null +++ b/pkg/parser/cap_test.go @@ -0,0 +1,91 @@ +package parser + +import ( + "strings" + "testing" +) + +// leafTree builds a flat list of n single-leaf sections each carrying +// `size` characters of content, under one shared parent. +func leafTree(n, size int) []Section { + kids := make([]Section, n) + for i := range kids { + kids[i] = Section{Level: 2, Title: "leaf", Content: strings.Repeat("x", size), PageStart: i + 1, PageEnd: i + 1} + } + return []Section{{Level: 1, Title: "parent", Children: kids}} +} + +func TestCapLeafSections_MergesDownToCap(t *testing.T) { + tree := leafTree(1000, 50) + if got := countLeafSections(tree); got != 1000 { + t.Fatalf("setup: countLeafSections = %d, want 1000", got) + } + capped := capLeafSections(tree, 400) + if got := countLeafSections(capped); got > 400 { + t.Errorf("after cap: %d leaves, want <= 400", got) + } + // No content should be lost. Merges insert a "\n\n" separator between + // two non-empty bodies, so the total grows by at most 2 chars per + // merge (< 1000 merges) but never shrinks below the original. + orig := 1000 * 50 + if got := totalContentLen(capped); got < orig || got > orig+2*1000 { + t.Errorf("content not preserved: got %d chars, want in [%d, %d]", got, orig, orig+2*1000) + } +} + +func TestCapLeafSections_UnderCapUnchanged(t *testing.T) { + tree := leafTree(50, 100) + capped := capLeafSections(tree, 400) + if got := countLeafSections(capped); got != 50 { + t.Errorf("under-cap tree was modified: %d leaves, want 50", got) + } +} + +func TestCapLeafSections_DisabledByNonPositive(t *testing.T) { + tree := leafTree(1000, 50) + if got := countLeafSections(capLeafSections(tree, 0)); got != 1000 { + t.Errorf("maxLeaves=0 should disable cap, got %d leaves", got) + } + if got := countLeafSections(capLeafSections(leafTree(1000, 50), -1)); got != 1000 { + t.Errorf("maxLeaves<0 should disable cap, got %d leaves", got) + } +} + +func TestCapLeafSections_MergesSmallestFirst(t *testing.T) { + // Two tiny leaves + one large; cap of 2 should merge the two tiny + // ones and leave the large leaf intact. + tree := []Section{{Level: 1, Title: "p", Children: []Section{ + {Level: 2, Title: "tiny-a", Content: "aa", PageStart: 1, PageEnd: 1}, + {Level: 2, Title: "tiny-b", Content: "bb", PageStart: 2, PageEnd: 2}, + {Level: 2, Title: "big", Content: strings.Repeat("z", 5000), PageStart: 3, PageEnd: 9}, + }}} + capped := capLeafSections(tree, 2) + if got := countLeafSections(capped); got != 2 { + t.Fatalf("countLeafSections = %d, want 2", got) + } + kids := capped[0].Children + if len(kids) != 2 { + t.Fatalf("len(kids) = %d, want 2", len(kids)) + } + // First child is the merged tiny pair; it must carry both bodies and + // the unioned page range. + if !strings.Contains(kids[0].Content, "aa") || !strings.Contains(kids[0].Content, "bb") { + t.Errorf("merged leaf lost content: %q", kids[0].Content) + } + if kids[0].PageStart != 1 || kids[0].PageEnd != 2 { + t.Errorf("merged page range = (%d,%d), want (1,2)", kids[0].PageStart, kids[0].PageEnd) + } + // The large leaf survives untouched. + if len(kids[1].Content) != 5000 { + t.Errorf("large leaf was merged; len = %d, want 5000", len(kids[1].Content)) + } +} + +func totalContentLen(sections []Section) int { + n := 0 + for i := range sections { + n += len(sections[i].Content) + n += totalContentLen(sections[i].Children) + } + return n +} diff --git a/pkg/parser/pdf.go b/pkg/parser/pdf.go index c6fef87..7c9bb46 100644 --- a/pkg/parser/pdf.go +++ b/pkg/parser/pdf.go @@ -45,6 +45,24 @@ type PDF struct { // to use the engine defaults; pass a zero value to disable tables // entirely. Tables *TableOpts + + // MaxSections caps the number of leaf sections the parser emits for a + // single document. A pathological PDF — e.g. a 90-page filing whose + // every bold statement title and repeated " and + // Subsidiaries" line trips the heading detector, leaving a swarm of + // empty/tiny heading-only leaves — can otherwise produce far more + // leaves than the document has real sections. Each leaf later costs a + // summarize + HyDE LLM call, so an uncapped count directly throttles + // or stalls ingest. + // + // When the prose leaf count exceeds MaxSections, adjacent small leaf + // siblings under a shared parent are merged (smallest first) until the + // count is back under the cap. Table sections (which carry distinct + // numeric content) are never merged. + // + // Zero selects defaultMaxLeafSections. A negative value disables the + // cap entirely (escape hatch for callers that want the raw outline). + MaxSections int } // TableOpts controls pdftable's table-finding stage. The zero value @@ -89,14 +107,24 @@ func DefaultTableOpts() *TableOpts { } // NewPDF returns a new PDF parser with table extraction enabled at the -// production defaults. Pass NewPDFWithTables(nil) (or a zero TableOpts) -// to opt out of tables. +// production defaults and the default leaf-section cap. Pass +// NewPDFWithTables(nil) (or a zero TableOpts) to opt out of tables. func NewPDF() *PDF { return &PDF{Tables: DefaultTableOpts()} } // NewPDFWithTables returns a PDF parser using the supplied table- -// extraction options. Pass nil to disable table extraction. +// extraction options and the default leaf-section cap. Pass nil to +// disable table extraction. func NewPDFWithTables(opts *TableOpts) *PDF { return &PDF{Tables: opts} } +// NewPDFWithOpts returns a PDF parser using the supplied table-extraction +// options and an explicit leaf-section cap. maxSections == 0 selects +// defaultMaxLeafSections; a negative value disables the cap. This is the +// constructor the engine wiring uses so the cap is operator-tunable via +// config (ingest.max_sections). +func NewPDFWithOpts(opts *TableOpts, maxSections int) *PDF { + return &PDF{Tables: opts, MaxSections: maxSections} +} + // Name implements Parser. func (*PDF) Name() string { return "pdf" } @@ -190,6 +218,7 @@ func (p *PDF) Parse(_ context.Context, r io.Reader) (*ParsedDoc, error) { if reader != nil { if outline := reader.Outline(); len(outline.Child) > 0 { if doc, ok := parsePDFWithOutline(outline, rows); ok { + doc.Sections = capLeafSections(doc.Sections, p.resolvedMaxSections()) attachTableSections(doc, tableSections) return doc, nil } @@ -360,12 +389,23 @@ func (p *PDF) Parse(_ context.Context, r io.Reader) (*ParsedDoc, error) { out := &ParsedDoc{ Title: title, - Sections: chunkOversizedLeaves(rootSec.Children), + Sections: capLeafSections(chunkOversizedLeaves(rootSec.Children), p.resolvedMaxSections()), } attachTableSections(out, tableSections) return out, nil } +// resolvedMaxSections turns the configured MaxSections into the value +// the cap actually uses: 0 selects defaultMaxLeafSections; a negative +// value disables the cap (returns a non-positive number capLeafSections +// treats as "off"). +func (p *PDF) resolvedMaxSections() int { + if p.MaxSections == 0 { + return defaultMaxLeafSections + } + return p.MaxSections +} + // propagateSectionPages fills internal-node PageStart/PageEnd from the union // of descendant leaf ranges where the internal node didn't have its own // (because its body was empty / hoisted into children). Leaves keep their @@ -411,6 +451,117 @@ const ( leafChunkTarget = 900 // chars per chunk, give or take ) +// defaultMaxLeafSections is the ceiling NewPDF applies when MaxSections +// is left at zero. A 92-page 10-K whose "Notes to Financial Statements" +// section byte-splits into ~50 chunks (and whose body splits into +// hundreds more) was observed producing ~1500 leaves — each one of +// which then costs a summarize + HyDE + multi-axis LLM call at ingest, +// which is what stalled the pipeline. 400 keeps a filing richly +// structured while bounding ingest cost to something Gemini's +// free-tier RPM can clear. +const defaultMaxLeafSections = 400 + +// countLeafSections returns the number of leaf sections (no children) +// in the tree rooted at sections. +func countLeafSections(sections []Section) int { + n := 0 + for i := range sections { + if len(sections[i].Children) == 0 { + n++ + } else { + n += countLeafSections(sections[i].Children) + } + } + return n +} + +// capLeafSections enforces a ceiling on the total leaf-section count. +// While the count exceeds maxLeaves it repeatedly merges the two +// smallest ADJACENT leaf siblings under whichever parent currently has +// the most leaf children — so the runaway byte-split sections collapse +// back first, while genuinely distinct top-level sections are left +// alone. maxLeaves <= 0 disables the cap. +// +// Merged leaves concatenate their content (blank-line separated), keep +// the first sibling's title, and union their page ranges. Table +// sections are attached AFTER this pass (attachTableSections), so their +// numeric content is never merged away. +func capLeafSections(sections []Section, maxLeaves int) []Section { + if maxLeaves <= 0 { + return sections + } + // Guard against pathological loops: at most one merge per excess leaf. + for guard := 0; countLeafSections(sections) > maxLeaves && guard < 100000; guard++ { + if !mergeOneSmallestAdjacentLeafPair(sections) { + break // no mergeable adjacent leaf pair anywhere + } + } + return sections +} + +// mergeOneSmallestAdjacentLeafPair finds the adjacent leaf-sibling pair +// with the smallest combined content length anywhere in the tree and +// merges it in place. Returns false when no sibling list has two +// adjacent leaves to merge. +func mergeOneSmallestAdjacentLeafPair(sections []Section) bool { + bestList := (*[]Section)(nil) + bestIdx := -1 + bestSize := -1 + + var walk func(list *[]Section) + walk = func(list *[]Section) { + s := *list + for i := 0; i+1 < len(s); i++ { + if len(s[i].Children) == 0 && len(s[i+1].Children) == 0 { + size := len(s[i].Content) + len(s[i+1].Content) + if bestSize < 0 || size < bestSize { + bestSize, bestList, bestIdx = size, list, i + } + } + } + for i := range s { + if len(s[i].Children) > 0 { + walk(&s[i].Children) + } + } + } + walk(§ions) + + if bestList == nil { + return false + } + s := *bestList + a, b := s[bestIdx], s[bestIdx+1] + merged := a + if strings.TrimSpace(a.Content) == "" { + merged.Content = b.Content + } else if strings.TrimSpace(b.Content) != "" { + merged.Content = a.Content + "\n\n" + b.Content + } + merged.PageStart = minNonZero(a.PageStart, b.PageStart) + if b.PageEnd > merged.PageEnd { + merged.PageEnd = b.PageEnd + } + s[bestIdx] = merged + *bestList = append(s[:bestIdx+1], s[bestIdx+2:]...) + return true +} + +// minNonZero returns the smaller of two page numbers, treating 0 +// (unknown) as "no lower bound" so a known page always wins. +func minNonZero(a, b int) int { + switch { + case a == 0: + return b + case b == 0: + return a + case a < b: + return a + default: + return b + } +} + // chunkOversizedLeaves splits any LEAF section whose content exceeds // leafChunkThreshold into smaller sub-sections. Internal nodes (sections with // children) are recursed into but never split — they're already structured.