Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,37 @@ type IngestConfig struct {
// 4 + 4 per-stage caps while staying well below typical provider
// per-tenant concurrency limits.
GlobalLLMConcurrency int `yaml:"global_llm_concurrency"`

// LLMCallTimeoutSeconds bounds each INDIVIDUAL LLM call issued by the
// ingest pipeline (one section's summary, one leaf's HyDE questions,
// one TOC detect/extract/verify turn). Without it, a single provider
// call that hangs — no response, no error — blocks its bounded-
// concurrency errgroup forever, and the document never leaves
// `summarizing`. (Observed: a doc stuck for 13+ hours.)
//
// When a call exceeds this deadline it is treated exactly like any
// other per-section failure: logged and skipped, leaving that
// section with its existing/empty summary. One bad section can no
// longer freeze the whole document — it still reaches `ready`.
//
// 0 (or omitted) defaults to 90. Set explicitly to tune for slow
// reasoning models; a negative value is rejected by Validate.
LLMCallTimeoutSeconds int `yaml:"llm_call_timeout_seconds"`

// MaxSections caps the number of leaf sections a single document may
// produce. A pathological PDF (e.g. a 92-page 10-Q whose every bold
// table cell looks like a heading) can shatter into ~1500 leaves,
// each of which then costs a summarize + HyDE LLM call — thousands of
// calls that throttle or stall ingest.
//
// When the parsed leaf count exceeds this cap, the parser merges
// adjacent small leaf sections under a shared parent (smallest
// siblings first) until the document is back under the cap.
//
// 0 (or omitted) defaults to 400 — comfortably above a real filing's
// section count (~170-510 with tables) while still catching the
// runaway case. A negative value is rejected by Validate.
MaxSections int `yaml:"max_sections"`
}

// TOCBlock configures the LLM-driven table-of-contents tree
Expand Down Expand Up @@ -658,7 +689,9 @@ func Default() Config {
},
},
Ingest: IngestConfig{
GlobalLLMConcurrency: 12,
GlobalLLMConcurrency: 12,
LLMCallTimeoutSeconds: 90,
MaxSections: 400,
HyDE: HyDEConfig{
Enabled: true,
NumQuestions: 5,
Expand Down Expand Up @@ -814,6 +847,16 @@ func applyEnvOverrides(c *Config) {
c.Ingest.GlobalLLMConcurrency = n
}
}
if v := os.Getenv("VLE_INGEST_LLM_CALL_TIMEOUT_SECONDS"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n >= 0 {
c.Ingest.LLMCallTimeoutSeconds = n
}
}
if v := os.Getenv("VLE_INGEST_MAX_SECTIONS"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n >= 0 {
c.Ingest.MaxSections = n
}
}
// pdftable-driven table extraction.
if v := os.Getenv("VLE_INGEST_TABLES_ENABLED"); v != "" {
switch strings.ToLower(strings.TrimSpace(v)) {
Expand Down Expand Up @@ -1091,6 +1134,12 @@ func (c Config) Validate() error {
if c.Ingest.GlobalLLMConcurrency < 0 {
return fmt.Errorf("ingest.global_llm_concurrency must be >= 0, got %d", c.Ingest.GlobalLLMConcurrency)
}
if c.Ingest.LLMCallTimeoutSeconds < 0 {
return fmt.Errorf("ingest.llm_call_timeout_seconds must be >= 0, got %d", c.Ingest.LLMCallTimeoutSeconds)
}
if c.Ingest.MaxSections < 0 {
return fmt.Errorf("ingest.max_sections must be >= 0, got %d", c.Ingest.MaxSections)
}

switch c.Ingest.Tables.VerticalStrategy {
case "", "lines", "lines_strict", "text", "explicit":
Expand Down
15 changes: 11 additions & 4 deletions pkg/ingest/hyde.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"strings"
"sync"
"time"

"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -158,7 +159,7 @@ func (p *Pipeline) candidateQuestionsFor(ctx context.Context, s db.Section, prof
JSONSchema: []byte(hydeJSONSchema),
}

questions, err := runHyDEWithRetry(ctx, p.LLM, req, defaultHyDERetries)
questions, err := runHyDEWithRetry(ctx, p.LLM, req, defaultHyDERetries, p.LLMCallTimeout)
if err != nil {
return nil, err
}
Expand All @@ -176,7 +177,12 @@ const defaultHyDERetries = 2
// parse failure returns an error so the caller can log it; transport
// errors propagate. ErrNotImplemented (stub LLM) degrades to "no
// questions" so test paths keep working.
func runHyDEWithRetry(ctx context.Context, client llmgate.Client, baseReq llmgate.Request, maxRetries int) ([]string, error) {
//
// timeout bounds each individual Complete call. A per-call timeout (or
// context cancellation) is terminal — it short-circuits the retry loop
// rather than re-issuing a call that just hung. A non-positive timeout
// disables the per-call bound.
func runHyDEWithRetry(ctx context.Context, client llmgate.Client, baseReq llmgate.Request, maxRetries int, timeout time.Duration) ([]string, error) {
if maxRetries < 0 {
maxRetries = 0
}
Expand All @@ -193,11 +199,12 @@ func runHyDEWithRetry(ctx context.Context, client llmgate.Client, baseReq llmgat
}
req.Messages = msgs
}
resp, err := client.Complete(ctx, req)
resp, err := completeWithTimeout(ctx, client, req, timeout)
if err != nil {
// Stub clients return ErrNotImplemented — treat as "no
// questions" so the pipeline proceeds without LLM access
// in test setups.
// in test setups. Per-call timeouts and other transport
// errors propagate (the caller skips this section).
if errors.Is(err, llmgate.ErrNotImplemented) {
return nil, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/ingest/hyde_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestRunHyDEWithRetryHappy(t *testing.T) {
m := &llmgate.Mock{Reply: `{"questions":["Q1","Q2","Q3","Q4","Q5"]}`}
got, err := runHyDEWithRetry(context.Background(), m, llmgate.Request{
Messages: []llmgate.Message{{Role: llmgate.RoleUser, Content: "go"}},
}, 2)
}, 2, 0)
if err != nil {
t.Fatalf("happy path: %v", err)
}
Expand All @@ -118,7 +118,7 @@ func TestRunHyDEWithRetryRetriesOnNonJSON(t *testing.T) {
}
got, err := runHyDEWithRetry(context.Background(), m, llmgate.Request{
Messages: []llmgate.Message{{Role: llmgate.RoleUser, Content: "go"}},
}, 2)
}, 2, 0)
if err != nil {
t.Fatalf("should recover on 3rd attempt: %v", err)
}
Expand All @@ -134,7 +134,7 @@ func TestRunHyDEWithRetryFinalParseFailReturnsError(t *testing.T) {
m := &llmgate.Mock{Reply: "no JSON anywhere here, just prose."}
_, err := runHyDEWithRetry(context.Background(), m, llmgate.Request{
Messages: []llmgate.Message{{Role: llmgate.RoleUser, Content: "go"}},
}, 2)
}, 2, 0)
if err == nil {
t.Error("want final-parse error after all retries fail")
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestHyDEGracefulOnNonJSON(t *testing.T) {

_, err := runHyDEWithRetry(context.Background(), m, llmgate.Request{
Messages: []llmgate.Message{{Role: llmgate.RoleUser, Content: "u"}},
}, 2)
}, 2, 0)
if err == nil {
t.Fatal("want graceful error after 3 failed attempts")
}
Expand Down
73 changes: 66 additions & 7 deletions pkg/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,37 @@
// for a table of contents. Default: 20.
TOCCheckPages int

// LLMCallTimeout bounds each INDIVIDUAL LLM call the pipeline issues
// (one section's summary, one leaf's HyDE questions, one TOC
// detect/extract/verify turn). It is the safety valve against a
// provider call that hangs with neither a response nor an error:
// without it, that call's bounded-concurrency errgroup blocks on
// Wait() forever and the document never leaves `summarizing` (observed
// stuck for 13+ hours).
//
// A call that exceeds this deadline is handled exactly like any other
// per-section failure: the surrounding stage logs it and skips the
// section, leaving its existing/empty summary. One hung call can no
// longer freeze the whole document.
//
// Zero (the Go zero value, used by Pipeline literals in tests) means
// "no per-call timeout" so existing test paths that don't set it keep
// their unbounded behaviour. NewPipeline defaults it to 90s.
LLMCallTimeout time.Duration

// globalLLMSem is the lazily-initialized shared semaphore enforcing
// GlobalLLMConcurrency. nil means "no global cap" — callers fall back
// to per-stage limits only.
globalLLMSem chan struct{}
}

// defaultLLMCallTimeout is the per-call deadline NewPipeline applies when
// LLMCallTimeout is left unset. 90s is generous for a single summarize /
// HyDE / TOC turn even on a slow reasoning model, while still being short
// enough that a hung call is reaped in seconds-to-low-minutes rather than
// blocking the document forever.
const defaultLLMCallTimeout = 90 * time.Second

// NewPipeline returns a Pipeline with sensible defaults filled in.
func NewPipeline(p Pipeline) *Pipeline {
if p.SummaryMaxChars == 0 {
Expand Down Expand Up @@ -210,12 +235,45 @@
if p.GlobalLLMConcurrency > 0 {
p.globalLLMSem = make(chan struct{}, p.GlobalLLMConcurrency)
}
// A per-call timeout is the difference between "one bad section" and
// "the whole document wedged for hours", so NewPipeline always fills
// one in. Pipeline literals (test paths) that leave it zero keep the
// historical unbounded behaviour.
if p.LLMCallTimeout <= 0 {
p.LLMCallTimeout = defaultLLMCallTimeout
}
if p.Logger == nil {
p.Logger = slog.Default()
}
return &p
}

// completeWithTimeout issues a single LLM call bounded by timeout. A
// non-positive timeout disables the bound (calls ctx directly), preserving
// the legacy behaviour for Pipeline literals that never set one.
//
// The returned error on deadline expiry is context.DeadlineExceeded
// wrapped by the client — callers in the ingest stages already treat any
// Complete error as a non-fatal per-section skip, so a timeout slots into
// the existing degrade-and-continue path with no special-casing.
func completeWithTimeout(ctx context.Context, client llmgate.Client, req llmgate.Request, timeout time.Duration) (*llmgate.Response, error) {
if timeout <= 0 {
return client.Complete(ctx, req)
}
callCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return client.Complete(callCtx, req)
}

// isTimeout reports whether err is a context deadline/cancellation — the
// signature of a per-call LLM timeout. The ingest retry loops use it to
// stop retrying immediately on a timeout: re-issuing a call that just hung
// would only multiply the wall-time cost (N retries × the timeout) without
// changing the outcome, so a timeout is terminal, not retryable.
func isTimeout(err error) bool {

Check failure on line 273 in pkg/ingest/ingest.go

View workflow job for this annotation

GitHub Actions / lint

func isTimeout is unused (U1000)
return errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)
}

// acquireGlobalLLM blocks until a global-LLM-concurrency slot is free,
// or returns false if ctx is canceled first. Returns a release func the
// caller must invoke (typically deferred). Safe to call when the global
Expand Down Expand Up @@ -331,10 +389,11 @@
model = p.SummaryModel
}
builder := &TOCBuilder{
LLM: p.LLM,
Model: model,
Concurrency: p.TOCConcurrency,
TOCCheckPages: p.TOCCheckPages,
LLM: p.LLM,
Model: model,
Concurrency: p.TOCConcurrency,
TOCCheckPages: p.TOCCheckPages,
LLMCallTimeout: p.LLMCallTimeout,
}
nodes, usage, err := builder.Build(ctx, pages)
if err != nil {
Expand Down Expand Up @@ -717,7 +776,7 @@
// request. Kept for the SummaryAxesEnabled=false opt-out branch and
// for unit tests that build a Pipeline literal without the new flag.
func (p *Pipeline) legacyOneLineSummary(ctx context.Context, s db.Section, body, profile string) (string, error) {
resp, err := p.LLM.Complete(ctx, llmgate.Request{
resp, err := completeWithTimeout(ctx, p.LLM, llmgate.Request{
Model: p.SummaryModel,
Temperature: 0.0,
MaxTokens: 260,
Expand All @@ -727,7 +786,7 @@
"Section titled %q.\n\n%s\n\nReturn a single sentence (≤ 60 words) that names this section's concrete topics, entities, identifiers, and key items so a retrieval engine can match it to user questions.",
cleanForLLM(s.Title), body)},
},
})
}, p.LLMCallTimeout)
if err != nil {
// Stub LLMs return ErrNotImplemented. Degrade gracefully: use a
// truncated excerpt as the "summary" so downstream retrieval
Expand Down Expand Up @@ -770,7 +829,7 @@
JSONSchema: []byte(summaryAxesJSONSchema),
}

axes, rawText, err := runSummaryAxesWithRetry(ctx, p.LLM, req, defaultSummaryAxesRetries)
axes, rawText, err := runSummaryAxesWithRetry(ctx, p.LLM, req, defaultSummaryAxesRetries, p.LLMCallTimeout)
if err != nil {
// Transport / ErrNotImplemented / unrecoverable: fall back to a
// text excerpt as OneLine with empty axes. Never fail ingest.
Expand Down
14 changes: 11 additions & 3 deletions pkg/ingest/summary_axes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/hallelx2/llmgate"

Expand Down Expand Up @@ -82,7 +83,13 @@ const defaultSummaryAxesRetries = 2
//
// ErrNotImplemented (stub LLM) is folded into the error return so
// callers can degrade to the text fallback path.
func runSummaryAxesWithRetry(ctx context.Context, client llmgate.Client, baseReq llmgate.Request, maxRetries int) (*tree.SummaryAxes, string, error) {
//
// timeout bounds each individual Complete call. A per-call timeout (or
// context cancellation) is terminal — it short-circuits the retry loop
// and returns the error immediately, because re-issuing a call that just
// hung would only multiply the wall-time cost. A non-positive timeout
// disables the per-call bound.
func runSummaryAxesWithRetry(ctx context.Context, client llmgate.Client, baseReq llmgate.Request, maxRetries int, timeout time.Duration) (*tree.SummaryAxes, string, error) {
if maxRetries < 0 {
maxRetries = 0
}
Expand All @@ -99,10 +106,11 @@ func runSummaryAxesWithRetry(ctx context.Context, client llmgate.Client, baseReq
}
req.Messages = msgs
}
resp, err := client.Complete(ctx, req)
resp, err := completeWithTimeout(ctx, client, req, timeout)
if err != nil {
// ErrNotImplemented bubbles up so the caller can use a
// purely text-based fallback. Transport errors do the same.
// purely text-based fallback. Transport errors and per-call
// timeouts do the same (a timeout is terminal — no retry).
return nil, lastRaw, err
}
lastRaw = resp.Content
Expand Down
Loading
Loading