From 3a79992fadc67a9106bf38cc23585ee3d9817e75 Mon Sep 17 00:00:00 2001 From: Halleluyah Oludele Date: Wed, 27 May 2026 01:52:51 +0100 Subject: [PATCH 1/3] feat(retrieval): query planner with LRU cache + JSON-retry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds pkg/retrieval/plan.go: one LLM call before retrieval that returns a structured Plan (intent, entities, expected_doc_areas, is_multi_hop, sub_questions). Cached on a per-(query, model) basis in an in-process LRU (default 128 entries) so repeat questions don't burn budget. Reuses the runSelectionWithRetry pattern from single_pass.go: persistent JSON-parse failures degrade gracefully to a nil plan + nil error so the caller continues with the original query. Transport errors still bubble. The planning prompt biases conservatively on is_multi_hop — only flags queries that genuinely need decomposition into distinct sub-retrieval passes. The decomposer further self-corrects an is_multi_hop=true with empty sub_questions back to false at parse time. --- pkg/retrieval/plan.go | 379 +++++++++++++++++++++++++++++++++++++ pkg/retrieval/plan_test.go | 370 ++++++++++++++++++++++++++++++++++++ 2 files changed, 749 insertions(+) create mode 100644 pkg/retrieval/plan.go create mode 100644 pkg/retrieval/plan_test.go diff --git a/pkg/retrieval/plan.go b/pkg/retrieval/plan.go new file mode 100644 index 0000000..d1c7953 --- /dev/null +++ b/pkg/retrieval/plan.go @@ -0,0 +1,379 @@ +package retrieval + +import ( + "context" + "encoding/json" + "fmt" + "log" + "strings" + "sync" + "time" + + "github.com/hallelx2/llmgate" + + "github.com/hallelx2/vectorless-engine/pkg/cache" +) + +// Plan is the structured understanding the planner extracts from a user +// query before retrieval runs. It is a deliberately small, model-agnostic +// shape: enough to inform the downstream retrieval + synthesis steps +// without trying to encode the full query semantics. +// +// Fields: +// +// - Intent labels the query so the synthesis prompt can adapt tone +// (factual lookup vs. comparison vs. summary, etc.). Values are +// free-form strings rather than an enum because the set is open and +// downstream consumers are tolerant. +// +// - Entities are the salient proper nouns, dates, or numbers the model +// judged load-bearing. Synthesis surfaces these to the answer model; +// they are not (yet) fed into the selection prompt. +// +// - ExpectedDocAreas are coarse hints about where in a document the +// answer is likely to live (e.g. "balance sheet", "risk factors"). +// The planner is allowed to leave this empty when no strong prior +// exists. +// +// - IsMultiHop signals that the query benefits from decomposition. +// When true SubQuestions holds the individual focused questions the +// decomposer dispatches one-at-a-time. When false SubQuestions is +// empty. +type Plan struct { + Intent string `json:"intent"` + Entities []string `json:"entities"` + ExpectedDocAreas []string `json:"expected_doc_areas"` + IsMultiHop bool `json:"is_multi_hop"` + SubQuestions []string `json:"sub_questions"` +} + +// Planner runs one LLM call before retrieval to produce a Plan. The result +// is cached in an LRU keyed on the raw query text so repeated questions +// don't burn extra LLM budget. +// +// The planner is deliberately defensive about both LLM blips (parse +// failures fall back to a no-plan result so the engine continues with the +// original query) and cache pressure (cache misses always fall through to +// the LLM call; a cache eviction loop in the background must never block +// query latency). +type Planner struct { + // LLM is the client used for the planning call. + LLM llmgate.Client + + // Model is the model name passed to the LLM. The caller is expected + // to point this at a small/fast model — planning is a short prompt + // that should not run on the same flagship model used for synthesis. + Model string + + // MaxRetries bounds the JSON-parse retries on a single planning call. + // Zero defaults to defaultPlanningRetries. + MaxRetries int + + cache cache.Cache + + // mu serialises cache writes for the same key so that two + // concurrent queries don't race to populate the same entry. The + // underlying cache.LRU is already mutex-guarded for Get/Set + // atomicity; this is purely about avoiding redundant LLM calls. + mu sync.Mutex +} + +// defaultPlanningCacheSize is the planner cache capacity when the caller +// does not specify one. 128 distinct queries is comfortable for typical +// per-tenant repeat-question workloads while keeping resident memory +// trivial. +const defaultPlanningCacheSize = 128 + +// defaultPlanningRetries is the number of additional attempts the +// planning LLM call gets when the response fails to parse as JSON. The +// same Gemini JSON-mode blip that single_pass.go guards against can hit +// here too. +const defaultPlanningRetries = 2 + +// planningCacheTTL is the cache lifetime for planning results. Plans are +// a property of the query text, not the document, so a long TTL is safe; +// the only reason to expire is to bound stale-prompt drift across model +// upgrades. +const planningCacheTTL = 6 * time.Hour + +// NewPlanner constructs a Planner backed by client. Pass cacheSize=0 to +// accept the default (128). +func NewPlanner(client llmgate.Client, model string) *Planner { + return NewPlannerWithCacheSize(client, model, defaultPlanningCacheSize) +} + +// NewPlannerWithCacheSize is the explicit-capacity constructor. Mostly +// useful in tests; production callers should prefer NewPlanner. +func NewPlannerWithCacheSize(client llmgate.Client, model string, cacheSize int) *Planner { + if cacheSize <= 0 { + cacheSize = defaultPlanningCacheSize + } + return &Planner{ + LLM: client, + Model: model, + MaxRetries: defaultPlanningRetries, + cache: cache.NewLRU(cacheSize), + } +} + +// Plan returns the planner's understanding of query. On cache hit Usage +// is the zero value (no LLM call was made). Errors are returned only on +// transport failures from the LLM client; persistent JSON-parse failures +// fall back to a nil Plan with a non-nil error so the caller can decide +// whether to ignore the planner for this request — handleQuery treats +// that as "no plan, no decomposition". +// +// A nil *Planner is treated as "planning disabled": Plan returns +// (nil, Usage{}, nil) so callers can wire a planner unconditionally +// without nil checks. +func (p *Planner) Plan(ctx context.Context, query string) (*Plan, Usage, error) { + if p == nil || p.LLM == nil { + return nil, Usage{}, nil + } + query = strings.TrimSpace(query) + if query == "" { + return nil, Usage{}, nil + } + + key := planningCacheKey(query, p.Model) + + if v, ok := p.cache.Get(key); ok { + if pl, ok := v.(*Plan); ok && pl != nil { + // Return a defensive copy so callers can't mutate the + // cached entry. + return clonePlan(pl), Usage{}, nil + } + } + + // Serialise cache writes for this key. The cache itself is + // thread-safe; this lock only prevents two concurrent identical + // queries from each issuing the same LLM call. + p.mu.Lock() + defer p.mu.Unlock() + // Re-check after acquiring the lock — another goroutine may have + // just populated the entry while we were waiting. + if v, ok := p.cache.Get(key); ok { + if pl, ok := v.(*Plan); ok && pl != nil { + return clonePlan(pl), Usage{}, nil + } + } + + maxRetries := p.MaxRetries + if maxRetries < 0 { + maxRetries = 0 + } + if maxRetries == 0 { + maxRetries = defaultPlanningRetries + } + + plan, usage, err := runPlanningWithRetry(ctx, p.LLM, p.Model, query, maxRetries) + if err != nil { + return nil, usage, err + } + if plan == nil { + return nil, usage, nil + } + + // Cache write — failure to store (e.g. zero-capacity LRU) must + // never propagate; the next call simply re-issues the LLM call. + p.cache.Set(key, plan, planningCacheTTL) + + return clonePlan(plan), usage, nil +} + +// CacheStats exposes the underlying cache metrics. Useful in tests + ops. +func (p *Planner) CacheStats() cache.Stats { + if p == nil || p.cache == nil { + return cache.Stats{} + } + return p.cache.Stats() +} + +// planningCacheKey hashes the query + model into a stable cache key. +// Two callers pinning the same query at the same model share the cache +// entry; switching the model invalidates it. +func planningCacheKey(query, model string) string { + return cache.Key("planner", query, "plan", model) +} + +// clonePlan returns a defensive copy of p. The planner caches plans by +// pointer; without copying, a caller could mutate Entities/SubQuestions +// and corrupt the cache. +func clonePlan(p *Plan) *Plan { + if p == nil { + return nil + } + out := &Plan{ + Intent: p.Intent, + IsMultiHop: p.IsMultiHop, + } + if len(p.Entities) > 0 { + out.Entities = append([]string(nil), p.Entities...) + } + if len(p.ExpectedDocAreas) > 0 { + out.ExpectedDocAreas = append([]string(nil), p.ExpectedDocAreas...) + } + if len(p.SubQuestions) > 0 { + out.SubQuestions = append([]string(nil), p.SubQuestions...) + } + return out +} + +// --- prompt + parse --- + +// planningSystemPrompt is intentionally conservative about IsMultiHop: +// the wording asks the model to mark a query multi-hop ONLY when distinct +// sub-questions are necessary, not whenever a query mentions two things. +// Over-firing here forces extra LLM calls in the decomposer without +// quality wins, so the prompt biases toward false rather than true. +const planningSystemPrompt = `You are a query planner for a document-retrieval engine. Given a user's query you return a small JSON object describing the query. + +Rules: +- "intent": one short snake_case label. Examples: "factual_lookup", "comparison", "summary", "definition", "list", "calculation". Pick the closest fit. +- "entities": proper nouns, dates, numbers, or specific terms the query hinges on. Skip filler words. +- "expected_doc_areas": short hints about WHERE in a document the answer is likely (e.g. "balance sheet", "risk factors", "methodology", "conclusion"). Leave empty if unsure. +- "is_multi_hop": true ONLY when the query requires answering distinct sub-questions whose answers must be combined. A single question that mentions two things is NOT multi-hop. A compound question that genuinely needs two retrieval passes (e.g. "compare X's revenue with Y's revenue") IS multi-hop. When in doubt, return false. +- "sub_questions": when is_multi_hop is true, list the focused sub-questions (each one a standalone retrieval target). Empty when is_multi_hop is false. + +Return only the JSON object. No prose, no markdown.` + +const planningJSONSchema = `{ + "type": "object", + "properties": { + "intent": {"type": "string"}, + "entities": {"type": "array", "items": {"type": "string"}}, + "expected_doc_areas": {"type": "array", "items": {"type": "string"}}, + "is_multi_hop": {"type": "boolean"}, + "sub_questions": {"type": "array", "items": {"type": "string"}} + }, + "required": ["intent", "is_multi_hop"] +}` + +// buildPlanningPrompt returns the user message for the planning call. +func buildPlanningPrompt(query string) string { + var b strings.Builder + b.WriteString("User query:\n") + b.WriteString(query) + b.WriteString("\n\nReturn a JSON object with fields: intent (string), entities (array of strings), expected_doc_areas (array of strings), is_multi_hop (boolean), sub_questions (array of strings).") + return b.String() +} + +// runPlanningWithRetry issues the planning LLM call, retrying up to +// maxRetries additional times when the response does not parse. Mirrors +// the shape of runSelectionWithRetry but specialised to the Plan payload. +// Returns (nil, usage, nil) when the planner exhausts retries; the +// caller treats that as a no-plan request. +func runPlanningWithRetry(ctx context.Context, client llmgate.Client, model, query string, maxRetries int) (*Plan, Usage, error) { + user := buildPlanningPrompt(query) + baseReq := llmgate.Request{ + Model: model, + Messages: []llmgate.Message{ + {Role: llmgate.RoleSystem, Content: planningSystemPrompt}, + {Role: llmgate.RoleUser, Content: user}, + }, + MaxTokens: 512, + Temperature: 0, + JSONMode: true, + JSONSchema: []byte(planningJSONSchema), + } + + var totalUsage Usage + var lastParseErr error + for attempt := 0; attempt <= maxRetries; attempt++ { + req := baseReq + if attempt > 0 { + msgs := make([]llmgate.Message, len(baseReq.Messages)) + copy(msgs, baseReq.Messages) + tail := len(msgs) - 1 + msgs[tail] = llmgate.Message{ + Role: msgs[tail].Role, + Content: msgs[tail].Content + "\n\nIMPORTANT: respond with ONLY a JSON object matching the schema. Do not include prose, explanation, or markdown fences.", + } + req.Messages = msgs + } + resp, err := client.Complete(ctx, req) + if err != nil { + return nil, totalUsage, fmt.Errorf("planner llm call: %w", err) + } + totalUsage.Add(Usage{ + InputTokens: resp.Usage.InputTokens, + OutputTokens: resp.Usage.OutputTokens, + TotalTokens: resp.Usage.TotalTokens, + CostUSD: resp.Usage.CostUSD, + LLMCalls: 1, + }) + plan, parseErr := ParsePlan(resp.Content) + if parseErr == nil { + return plan, totalUsage, nil + } + lastParseErr = parseErr + } + log.Printf("retrieval: planner parse failed after %d attempts (%v); continuing without plan", maxRetries+1, lastParseErr) + return nil, totalUsage, nil +} + +// ParsePlan extracts a Plan from an LLM JSON response. Tolerates code +// fences and leading/trailing prose, the same as ParseSelection. Returns +// a sanitised Plan: trimmed strings, empty slices instead of nil for +// stable JSON output, and IsMultiHop forced to false when SubQuestions +// is empty (a multi-hop flag with no decomposition is a model glitch we +// can correct locally rather than letting bad data flow downstream). +func ParsePlan(raw string) (*Plan, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return nil, fmt.Errorf("empty planner response") + } + if strings.HasPrefix(raw, "```") { + if i := strings.Index(raw, "\n"); i >= 0 { + raw = raw[i+1:] + } + raw = strings.TrimSuffix(raw, "```") + raw = strings.TrimSpace(raw) + } + if i := strings.Index(raw, "{"); i > 0 { + raw = raw[i:] + } + if j := strings.LastIndex(raw, "}"); j >= 0 && j < len(raw)-1 { + raw = raw[:j+1] + } + + var p Plan + if err := json.Unmarshal([]byte(raw), &p); err != nil { + return nil, fmt.Errorf("unmarshal plan: %w", err) + } + + p.Intent = strings.TrimSpace(p.Intent) + p.Entities = trimStrings(p.Entities) + p.ExpectedDocAreas = trimStrings(p.ExpectedDocAreas) + p.SubQuestions = trimStrings(p.SubQuestions) + + // Self-correct: a multi-hop flag without sub-questions is useless to + // the decomposer. Clear the flag rather than raise an error so the + // pipeline keeps making progress. + if p.IsMultiHop && len(p.SubQuestions) == 0 { + p.IsMultiHop = false + } + // And vice versa: if sub-questions came back but the flag wasn't + // set, leave both as the model returned them. Decomposer's + // fall-through path treats !IsMultiHop as "ignore sub-questions", + // which is the safer default. + + return &p, nil +} + +// trimStrings returns a new slice with each element trimmed and empty +// entries removed. +func trimStrings(in []string) []string { + if len(in) == 0 { + return nil + } + out := make([]string, 0, len(in)) + for _, s := range in { + s = strings.TrimSpace(s) + if s == "" { + continue + } + out = append(out, s) + } + return out +} diff --git a/pkg/retrieval/plan_test.go b/pkg/retrieval/plan_test.go new file mode 100644 index 0000000..7cd9f71 --- /dev/null +++ b/pkg/retrieval/plan_test.go @@ -0,0 +1,370 @@ +package retrieval_test + +import ( + "context" + "encoding/json" + "errors" + "strings" + "sync" + "sync/atomic" + "testing" + + "github.com/hallelx2/llmgate" + + "github.com/hallelx2/vectorless-engine/pkg/retrieval" +) + +// plannerMock is a minimal llmgate client that returns scripted replies +// in order. Each Complete() call advances the index; if calls exceed the +// scripted replies the last reply is returned (so a single-element script +// turns into a fixed response). +// +// Distinct from the retrieval_test.go mockLLM because the planner doesn't +// care about prompt-content-driven picks — it speaks to a different +// schema. Keeping the mocks separate avoids muddying mockLLM with +// planner-specific behaviour. +type plannerMock struct { + mu sync.Mutex + replies []string + err error + + calls int32 + prompts []string +} + +func (m *plannerMock) Complete(ctx context.Context, req llmgate.Request) (*llmgate.Response, error) { + m.mu.Lock() + defer m.mu.Unlock() + atomic.AddInt32(&m.calls, 1) + for _, msg := range req.Messages { + if msg.Role == llmgate.RoleUser { + m.prompts = append(m.prompts, msg.Content) + } + } + if m.err != nil { + return nil, m.err + } + if len(m.replies) == 0 { + return &llmgate.Response{}, nil + } + idx := int(atomic.LoadInt32(&m.calls)) - 1 + if idx >= len(m.replies) { + idx = len(m.replies) - 1 + } + return &llmgate.Response{ + Content: m.replies[idx], + Usage: llmgate.Usage{ + InputTokens: 10, + OutputTokens: 5, + TotalTokens: 15, + CostUSD: 0.0001, + }, + }, nil +} + +func (m *plannerMock) CountTokens(ctx context.Context, s string) (int, error) { + return len(s) / 4, nil +} + +// jsonPlan marshals a Plan for use as a mock LLM reply. +func jsonPlan(p retrieval.Plan) string { + raw, _ := json.Marshal(p) + return string(raw) +} + +func TestPlannerHappyPath(t *testing.T) { + t.Parallel() + m := &plannerMock{ + replies: []string{ + jsonPlan(retrieval.Plan{ + Intent: "factual_lookup", + Entities: []string{"Acme Corp", "Q4 2024"}, + ExpectedDocAreas: []string{"income statement"}, + IsMultiHop: false, + }), + }, + } + p := retrieval.NewPlanner(m, "test-model") + + plan, usage, err := p.Plan(context.Background(), "What was Acme Corp's Q4 2024 revenue?") + if err != nil { + t.Fatalf("plan: %v", err) + } + if plan == nil { + t.Fatal("expected a plan, got nil") + } + if plan.Intent != "factual_lookup" { + t.Errorf("intent = %q, want factual_lookup", plan.Intent) + } + if plan.IsMultiHop { + t.Error("IsMultiHop should be false") + } + if len(plan.Entities) != 2 { + t.Errorf("entities len = %d, want 2 (%v)", len(plan.Entities), plan.Entities) + } + if usage.LLMCalls != 1 { + t.Errorf("LLMCalls = %d, want 1", usage.LLMCalls) + } +} + +func TestPlannerCacheHitSkipsLLM(t *testing.T) { + t.Parallel() + m := &plannerMock{ + replies: []string{ + jsonPlan(retrieval.Plan{Intent: "summary", IsMultiHop: false}), + }, + } + p := retrieval.NewPlanner(m, "test-model") + + // First call → LLM hit. + if _, _, err := p.Plan(context.Background(), "Summarise the document"); err != nil { + t.Fatal(err) + } + if c := atomic.LoadInt32(&m.calls); c != 1 { + t.Fatalf("want 1 LLM call on first invocation, got %d", c) + } + // Second call same query → cache hit, no LLM call. + plan, usage, err := p.Plan(context.Background(), "Summarise the document") + if err != nil { + t.Fatal(err) + } + if c := atomic.LoadInt32(&m.calls); c != 1 { + t.Fatalf("want still 1 LLM call (cache hit), got %d", c) + } + if plan == nil || plan.Intent != "summary" { + t.Errorf("cached plan = %+v, want intent=summary", plan) + } + if usage.LLMCalls != 0 { + t.Errorf("cached call LLMCalls = %d, want 0", usage.LLMCalls) + } + + stats := p.CacheStats() + if stats.Hits != 1 { + t.Errorf("cache hits = %d, want 1", stats.Hits) + } +} + +// A different query bypasses the cache and issues another LLM call. +func TestPlannerCacheMissOnDifferentQuery(t *testing.T) { + t.Parallel() + m := &plannerMock{ + replies: []string{ + jsonPlan(retrieval.Plan{Intent: "a", IsMultiHop: false}), + jsonPlan(retrieval.Plan{Intent: "b", IsMultiHop: false}), + }, + } + p := retrieval.NewPlanner(m, "test-model") + _, _, _ = p.Plan(context.Background(), "query A") + _, _, _ = p.Plan(context.Background(), "query B") + if c := atomic.LoadInt32(&m.calls); c != 2 { + t.Errorf("two distinct queries should hit LLM twice, got %d", c) + } +} + +// Two concurrent identical queries — the cache must dedup so we don't +// double-spend on the same prompt. +func TestPlannerConcurrentSameQuery(t *testing.T) { + t.Parallel() + m := &plannerMock{ + replies: []string{ + jsonPlan(retrieval.Plan{Intent: "concurrent", IsMultiHop: false}), + }, + } + p := retrieval.NewPlanner(m, "test-model") + + const N = 16 + var wg sync.WaitGroup + wg.Add(N) + for i := 0; i < N; i++ { + go func() { + defer wg.Done() + _, _, _ = p.Plan(context.Background(), "race query") + }() + } + wg.Wait() + if c := atomic.LoadInt32(&m.calls); c != 1 { + t.Errorf("concurrent identical queries should fold to 1 LLM call, got %d", c) + } +} + +// On a non-JSON reply the planner must retry, then degrade gracefully to +// a nil plan + nil error so the caller can continue without planning. +func TestPlannerRetryOnBadJSON(t *testing.T) { + t.Parallel() + m := &plannerMock{ + replies: []string{ + "the user wants a summary", + "still talking instead of returning json", + "and yet again", + }, + } + p := retrieval.NewPlanner(m, "test-model") + p.MaxRetries = 2 // 1 initial + 2 retries = 3 attempts + + plan, usage, err := p.Plan(context.Background(), "Why does this fail to parse?") + if err != nil { + t.Fatalf("expected graceful nil error, got %v", err) + } + if plan != nil { + t.Errorf("expected nil plan on parse failure, got %+v", plan) + } + if c := atomic.LoadInt32(&m.calls); c != 3 { + t.Errorf("expected 3 LLM attempts (1 + 2 retries), got %d", c) + } + if usage.LLMCalls != 3 { + t.Errorf("usage LLMCalls = %d, want 3 (all attempts counted)", usage.LLMCalls) + } +} + +// A successful retry after one bad JSON reply must return the parsed plan. +func TestPlannerSucceedsAfterRetry(t *testing.T) { + t.Parallel() + m := &plannerMock{ + replies: []string{ + "sorry, not json", + jsonPlan(retrieval.Plan{Intent: "factual_lookup", IsMultiHop: false}), + }, + } + p := retrieval.NewPlanner(m, "test-model") + plan, usage, err := p.Plan(context.Background(), "What is the recovery rate?") + if err != nil { + t.Fatalf("plan: %v", err) + } + if plan == nil || plan.Intent != "factual_lookup" { + t.Errorf("plan after retry = %+v, want intent=factual_lookup", plan) + } + if usage.LLMCalls != 2 { + t.Errorf("LLMCalls = %d, want 2 (1 failed + 1 succeeded)", usage.LLMCalls) + } +} + +// LLM transport failures should bubble up to the caller — these are +// distinct from parse failures (which we hide). +func TestPlannerTransportErrorBubbles(t *testing.T) { + t.Parallel() + m := &plannerMock{err: errors.New("provider 500")} + p := retrieval.NewPlanner(m, "test-model") + _, _, err := p.Plan(context.Background(), "any query") + if err == nil { + t.Fatal("expected transport error, got nil") + } + if !strings.Contains(err.Error(), "provider 500") { + t.Errorf("error %q should wrap the provider message", err) + } +} + +// Empty / whitespace queries are a no-op. +func TestPlannerEmptyQueryNoOp(t *testing.T) { + t.Parallel() + m := &plannerMock{} + p := retrieval.NewPlanner(m, "test-model") + + plan, _, err := p.Plan(context.Background(), " ") + if err != nil { + t.Fatal(err) + } + if plan != nil { + t.Errorf("empty query should return nil plan, got %+v", plan) + } + if c := atomic.LoadInt32(&m.calls); c != 0 { + t.Errorf("empty query should issue no LLM calls, got %d", c) + } +} + +// A nil planner is safe to call — production wiring can pass nil when +// planning is disabled at the config level. +func TestNilPlannerSafe(t *testing.T) { + t.Parallel() + var p *retrieval.Planner + plan, usage, err := p.Plan(context.Background(), "any query") + if err != nil || plan != nil || usage.LLMCalls != 0 { + t.Errorf("nil planner should be a no-op, got plan=%+v usage=%+v err=%v", plan, usage, err) + } +} + +func TestParsePlanCleansData(t *testing.T) { + t.Parallel() + raw := "```json\n" + jsonPlan(retrieval.Plan{ + Intent: " comparison ", + Entities: []string{" A ", "", "B"}, + ExpectedDocAreas: []string{"results"}, + IsMultiHop: true, + SubQuestions: []string{"What is A?", " ", "What is B?"}, + }) + "\n```" + + plan, err := retrieval.ParsePlan(raw) + if err != nil { + t.Fatal(err) + } + if plan.Intent != "comparison" { + t.Errorf("intent = %q, want comparison (trimmed)", plan.Intent) + } + if len(plan.Entities) != 2 { + t.Errorf("entities = %v, want [A B] (empty stripped)", plan.Entities) + } + if len(plan.SubQuestions) != 2 { + t.Errorf("sub_questions = %v, want 2 items (empty stripped)", plan.SubQuestions) + } + if !plan.IsMultiHop { + t.Error("IsMultiHop should remain true (has sub-questions)") + } +} + +// Self-correction: IsMultiHop=true with no SubQuestions is meaningless; +// the parser clears the flag so the decomposer's fall-through fires. +func TestParsePlanSelfCorrectsMultiHopWithoutSubs(t *testing.T) { + t.Parallel() + raw := jsonPlan(retrieval.Plan{ + Intent: "comparison", + IsMultiHop: true, + SubQuestions: nil, + }) + plan, err := retrieval.ParsePlan(raw) + if err != nil { + t.Fatal(err) + } + if plan.IsMultiHop { + t.Error("IsMultiHop with no sub-questions should be coerced to false") + } +} + +// The cache returns defensive copies — mutating a returned plan must +// not corrupt the cached entry. +func TestPlannerCacheImmutability(t *testing.T) { + t.Parallel() + m := &plannerMock{ + replies: []string{ + jsonPlan(retrieval.Plan{ + Intent: "list", + Entities: []string{"foo", "bar"}, + IsMultiHop: true, + SubQuestions: []string{"what is foo?", "what is bar?"}, + }), + }, + } + p := retrieval.NewPlanner(m, "test-model") + + first, _, err := p.Plan(context.Background(), "list the things") + if err != nil { + t.Fatal(err) + } + // Corrupt the returned plan. + first.Intent = "MUTATED" + first.Entities = nil + first.SubQuestions[0] = "HIJACKED" + + // Second lookup should return a clean copy of the original. + second, _, err := p.Plan(context.Background(), "list the things") + if err != nil { + t.Fatal(err) + } + if second.Intent != "list" { + t.Errorf("cached intent corrupted: %q", second.Intent) + } + if len(second.Entities) != 2 || second.Entities[0] != "foo" { + t.Errorf("cached entities corrupted: %v", second.Entities) + } + if second.SubQuestions[0] != "what is foo?" { + t.Errorf("cached sub_questions corrupted: %v", second.SubQuestions) + } +} From 71c4aac0c7b2a4ceb7cb041383c62ba0e27a6155 Mon Sep 17 00:00:00 2001 From: Halleluyah Oludele Date: Wed, 27 May 2026 01:53:00 +0100 Subject: [PATCH 2/3] feat(retrieval): multi-hop decomposer over any Strategy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds pkg/retrieval/decompose.go: when a Plan has IsMultiHop=true and non-empty SubQuestions, runs the wrapped Strategy once per sub-question and returns the union of selected IDs in stable first-seen order. Each sub-question is a tighter prompt than the compound original — the selection LLM gets one thing to reason about instead of a multi-part question. Fall-through is transparent: nil plan, IsMultiHop=false, or empty SubQuestions → delegate to Strategy.Select with the original query unchanged. Callers can wire the decomposer unconditionally. Aggregates Usage across sub-questions when the wrapped Strategy implements CostStrategy. Non-CostStrategy fall-back works too (Usage is zero in that case; selection behaviour is identical). Error on any sub-question short-circuits and returns the partial Usage so retrieval bugs aren't silently swallowed by the multi-hop loop. --- pkg/retrieval/decompose.go | 103 +++++++++ pkg/retrieval/decompose_test.go | 386 ++++++++++++++++++++++++++++++++ 2 files changed, 489 insertions(+) create mode 100644 pkg/retrieval/decompose.go create mode 100644 pkg/retrieval/decompose_test.go diff --git a/pkg/retrieval/decompose.go b/pkg/retrieval/decompose.go new file mode 100644 index 0000000..8c5e83b --- /dev/null +++ b/pkg/retrieval/decompose.go @@ -0,0 +1,103 @@ +package retrieval + +import ( + "context" + "fmt" + + "github.com/hallelx2/vectorless-engine/pkg/tree" +) + +// Decomposer runs the wrapped Strategy once per sub-question in a +// multi-hop plan and unions the per-sub-question selections. Each +// sub-question gets its own focused retrieval call — the strategy sees +// one tight question instead of the compound original. +// +// Decomposer is deliberately strategy-agnostic: it composes on top of +// any Strategy implementation (single-pass, chunked-tree, agentic, even +// the cached wrapper). When the wrapped strategy is a CostStrategy the +// per-sub-question Usage is aggregated so /v1/answer's accounting stays +// honest. +// +// Fall-through: +// +// - nil Plan, IsMultiHop false, or empty SubQuestions → delegate to +// Strategy.Select with the original query unchanged. The decomposer +// is transparent in this case; callers don't need to branch. +type Decomposer struct { + Strategy Strategy +} + +// NewDecomposer wraps s with the decomposition dispatcher. +func NewDecomposer(s Strategy) *Decomposer { + return &Decomposer{Strategy: s} +} + +// DecomposedSelect runs the strategy according to plan. When plan is +// nil, missing IsMultiHop, or has no SubQuestions, it falls through to +// Strategy.Select on the original query. Otherwise it runs Strategy once +// per sub-question and returns the union of selected IDs in stable +// first-seen order, plus aggregated Usage across all underlying calls. +// +// Errors short-circuit: the first sub-question failure aborts and +// returns the partial Usage gathered up to that point. This is the same +// failure contract Strategy.Select has — a multi-hop loop shouldn't +// silently mask retrieval errors. +func (d *Decomposer) DecomposedSelect(ctx context.Context, t *tree.Tree, plan *Plan, query string, budget ContextBudget) ([]tree.SectionID, Usage, error) { + if d == nil || d.Strategy == nil { + return nil, Usage{}, fmt.Errorf("decomposer: no strategy configured") + } + + // Fall-through: no plan or not multi-hop. Single retrieval call on + // the original query, with usage extracted from CostStrategy when + // available. + if plan == nil || !plan.IsMultiHop || len(plan.SubQuestions) == 0 { + return d.runOnce(ctx, t, query, budget) + } + + // Multi-hop path. Issue one retrieval call per sub-question, in + // order. Stable order preserves the planner's intent — the first + // sub-question is usually the most important — and gives a + // deterministic union ordering callers can rely on. + var ( + totalUsage Usage + out = make([]tree.SectionID, 0) + seen = make(map[tree.SectionID]struct{}) + ) + for _, sub := range plan.SubQuestions { + ids, usage, err := d.runOnce(ctx, t, sub, budget) + totalUsage.Add(usage) + if err != nil { + return out, totalUsage, fmt.Errorf("decompose %q: %w", sub, err) + } + for _, id := range ids { + if _, dup := seen[id]; dup { + continue + } + seen[id] = struct{}{} + out = append(out, id) + } + } + return out, totalUsage, nil +} + +// runOnce delegates one retrieval call. Uses CostStrategy when the +// wrapped strategy implements it so per-sub-question usage flows into +// the aggregated total; otherwise falls back to plain Select with a +// zero Usage value. +func (d *Decomposer) runOnce(ctx context.Context, t *tree.Tree, query string, budget ContextBudget) ([]tree.SectionID, Usage, error) { + if cs, ok := d.Strategy.(CostStrategy); ok { + res, err := cs.SelectWithCost(ctx, t, query, budget) + if err != nil { + return nil, Usage{}, err + } + if res == nil { + return nil, Usage{}, nil + } + return res.SelectedIDs, res.Usage, nil + } + ids, err := d.Strategy.Select(ctx, t, query, budget) + if err != nil { + return nil, Usage{}, err + } + return ids, Usage{}, nil +} diff --git a/pkg/retrieval/decompose_test.go b/pkg/retrieval/decompose_test.go new file mode 100644 index 0000000..b1ff623 --- /dev/null +++ b/pkg/retrieval/decompose_test.go @@ -0,0 +1,386 @@ +package retrieval_test + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + + "github.com/hallelx2/vectorless-engine/pkg/retrieval" + "github.com/hallelx2/vectorless-engine/pkg/tree" +) + +// scriptedStrategy is a Strategy that returns canned per-query selections. +// Each call captures the query it received so tests can assert that the +// decomposer routed sub-questions to the right strategy invocation. +// +// Implements CostStrategy so we can verify Usage aggregation flows back +// out of the decomposer end-to-end. +type scriptedStrategy struct { + mu sync.Mutex + calls []string + picks map[string][]tree.SectionID + usage retrieval.Usage // returned (and added per-call) when CostStrategy is used + errFor map[string]error + noCost bool // when true, the strategy hides its CostStrategy implementation + counter int32 +} + +func (s *scriptedStrategy) Name() string { return "scripted" } + +func (s *scriptedStrategy) Select(ctx context.Context, t *tree.Tree, query string, budget retrieval.ContextBudget) ([]tree.SectionID, error) { + atomic.AddInt32(&s.counter, 1) + s.mu.Lock() + s.calls = append(s.calls, query) + s.mu.Unlock() + if err, ok := s.errFor[query]; ok { + return nil, err + } + if ids, ok := s.picks[query]; ok { + out := make([]tree.SectionID, len(ids)) + copy(out, ids) + return out, nil + } + return nil, nil +} + +// costStrategyAdapter exposes Select + SelectWithCost. We wrap the +// scriptedStrategy in this adapter when we want to exercise the +// CostStrategy code path; the `noCost` field on scriptedStrategy is +// used to suppress this when we want the fall-through to plain Select. +type costStrategyAdapter struct { + *scriptedStrategy +} + +func (c *costStrategyAdapter) SelectWithCost(ctx context.Context, t *tree.Tree, query string, budget retrieval.ContextBudget) (*retrieval.Result, error) { + ids, err := c.scriptedStrategy.Select(ctx, t, query, budget) + if err != nil { + return nil, err + } + return &retrieval.Result{ + SelectedIDs: ids, + Usage: c.scriptedStrategy.usage, + }, nil +} + +// asStrategy returns either the cost-aware adapter or the bare strategy +// depending on whether we want to test the CostStrategy branch. +func (s *scriptedStrategy) asStrategy() retrieval.Strategy { + if s.noCost { + return s + } + return &costStrategyAdapter{scriptedStrategy: s} +} + +// --- tests --- + +func TestDecomposerFallthroughOnNilPlan(t *testing.T) { + t.Parallel() + tr := buildTree() + s := &scriptedStrategy{ + picks: map[string][]tree.SectionID{ + "original query": {"sec_a"}, + }, + usage: retrieval.Usage{InputTokens: 7, OutputTokens: 3, TotalTokens: 10, LLMCalls: 1}, + } + d := retrieval.NewDecomposer(s.asStrategy()) + + ids, usage, err := d.DecomposedSelect(context.Background(), tr, nil, "original query", retrieval.ContextBudget{}) + if err != nil { + t.Fatal(err) + } + if len(ids) != 1 || ids[0] != "sec_a" { + t.Errorf("want [sec_a], got %v", ids) + } + if usage.LLMCalls != 1 { + t.Errorf("LLMCalls = %d, want 1 (single fall-through call)", usage.LLMCalls) + } + if got := atomic.LoadInt32(&s.counter); got != 1 { + t.Errorf("strategy called %d times, want 1", got) + } + if len(s.calls) != 1 || s.calls[0] != "original query" { + t.Errorf("strategy got %v, want [original query]", s.calls) + } +} + +func TestDecomposerFallthroughOnNonMultiHopPlan(t *testing.T) { + t.Parallel() + tr := buildTree() + s := &scriptedStrategy{ + picks: map[string][]tree.SectionID{ + "q": {"sec_b"}, + }, + } + d := retrieval.NewDecomposer(s.asStrategy()) + + plan := &retrieval.Plan{ + Intent: "factual_lookup", + IsMultiHop: false, + } + ids, _, err := d.DecomposedSelect(context.Background(), tr, plan, "q", retrieval.ContextBudget{}) + if err != nil { + t.Fatal(err) + } + if len(ids) != 1 || ids[0] != "sec_b" { + t.Errorf("want [sec_b], got %v", ids) + } + if got := atomic.LoadInt32(&s.counter); got != 1 { + t.Errorf("strategy called %d times, want 1 (fall-through to original query)", got) + } +} + +func TestDecomposerFallthroughOnEmptySubQuestions(t *testing.T) { + t.Parallel() + tr := buildTree() + s := &scriptedStrategy{ + picks: map[string][]tree.SectionID{ + "compound query": {"sec_c"}, + }, + } + d := retrieval.NewDecomposer(s.asStrategy()) + + // Pathological plan: IsMultiHop=true but no sub-questions. The + // decomposer's fall-through guards against this directly even + // though ParsePlan would have corrected it. + plan := &retrieval.Plan{ + IsMultiHop: true, + SubQuestions: nil, + } + ids, _, err := d.DecomposedSelect(context.Background(), tr, plan, "compound query", retrieval.ContextBudget{}) + if err != nil { + t.Fatal(err) + } + if len(ids) != 1 || ids[0] != "sec_c" { + t.Errorf("want [sec_c], got %v", ids) + } +} + +func TestDecomposerPerSubQuestionDispatch(t *testing.T) { + t.Parallel() + tr := buildTree() + s := &scriptedStrategy{ + picks: map[string][]tree.SectionID{ + "What is the setup?": {"sec_a"}, + "What is the usage?": {"sec_b"}, + "What's in the FAQ?": {"sec_c"}, + }, + usage: retrieval.Usage{InputTokens: 10, OutputTokens: 4, TotalTokens: 14, LLMCalls: 1, CostUSD: 0.001}, + } + d := retrieval.NewDecomposer(s.asStrategy()) + + plan := &retrieval.Plan{ + Intent: "comparison", + IsMultiHop: true, + SubQuestions: []string{ + "What is the setup?", + "What is the usage?", + "What's in the FAQ?", + }, + } + ids, usage, err := d.DecomposedSelect(context.Background(), tr, plan, "compound original", retrieval.ContextBudget{}) + if err != nil { + t.Fatal(err) + } + + // Per-sub-question dispatch: 3 strategy calls. + if got := atomic.LoadInt32(&s.counter); got != 3 { + t.Errorf("strategy called %d times, want 3 (one per sub-question)", got) + } + s.mu.Lock() + calls := append([]string(nil), s.calls...) + s.mu.Unlock() + wantCalls := []string{ + "What is the setup?", + "What is the usage?", + "What's in the FAQ?", + } + for i, c := range wantCalls { + if i >= len(calls) || calls[i] != c { + t.Errorf("call[%d] = %q, want %q", i, calls[i], c) + } + } + + // Union ordering: first-seen across sub-questions. + if len(ids) != 3 { + t.Fatalf("want 3 unioned ids, got %v", ids) + } + if ids[0] != "sec_a" || ids[1] != "sec_b" || ids[2] != "sec_c" { + t.Errorf("union order = %v, want [sec_a sec_b sec_c]", ids) + } + + // Usage aggregation: 3 sub-questions × per-call usage. + if usage.LLMCalls != 3 { + t.Errorf("aggregated LLMCalls = %d, want 3", usage.LLMCalls) + } + if usage.TotalTokens != 42 { + t.Errorf("aggregated TotalTokens = %d, want 42 (3 × 14)", usage.TotalTokens) + } + if usage.CostUSD < 0.003-1e-9 || usage.CostUSD > 0.003+1e-9 { + t.Errorf("aggregated CostUSD = %v, want ~0.003", usage.CostUSD) + } +} + +// When sub-questions select overlapping section IDs the union must +// preserve first-seen order and drop duplicates. +func TestDecomposerUnionDedup(t *testing.T) { + t.Parallel() + tr := buildTree() + s := &scriptedStrategy{ + picks: map[string][]tree.SectionID{ + "sub1": {"sec_a", "sec_b"}, + "sub2": {"sec_b", "sec_c"}, // sec_b overlaps with sub1 + "sub3": {"sec_a"}, // sec_a overlaps with sub1 + }, + } + d := retrieval.NewDecomposer(s.asStrategy()) + + plan := &retrieval.Plan{ + IsMultiHop: true, + SubQuestions: []string{"sub1", "sub2", "sub3"}, + } + ids, _, err := d.DecomposedSelect(context.Background(), tr, plan, "q", retrieval.ContextBudget{}) + if err != nil { + t.Fatal(err) + } + if len(ids) != 3 { + t.Fatalf("want 3 unique ids, got %v", ids) + } + // Expected order: sec_a (from sub1), sec_b (from sub1), sec_c (from sub2). + // sub2's sec_b and sub3's sec_a are skipped as duplicates. + wantOrder := []tree.SectionID{"sec_a", "sec_b", "sec_c"} + for i, want := range wantOrder { + if ids[i] != want { + t.Errorf("ids[%d] = %q, want %q (stable first-seen order)", i, ids[i], want) + } + } +} + +// On a sub-question failure, the decomposer aborts and returns the +// partial Usage. The caller surfaces the error to its own /v1/query +// 500 — silently swallowing it would hide retrieval-side bugs. +func TestDecomposerErrorShortCircuits(t *testing.T) { + t.Parallel() + tr := buildTree() + boom := errors.New("strategy boom") + s := &scriptedStrategy{ + picks: map[string][]tree.SectionID{ + "sub1": {"sec_a"}, + }, + errFor: map[string]error{ + "sub2": boom, + }, + usage: retrieval.Usage{LLMCalls: 1, TotalTokens: 5}, + } + d := retrieval.NewDecomposer(s.asStrategy()) + + plan := &retrieval.Plan{ + IsMultiHop: true, + SubQuestions: []string{"sub1", "sub2", "sub3"}, + } + _, usage, err := d.DecomposedSelect(context.Background(), tr, plan, "q", retrieval.ContextBudget{}) + if err == nil { + t.Fatal("expected error from sub2 failure, got nil") + } + if !errors.Is(err, boom) { + t.Errorf("error %v should wrap the strategy boom", err) + } + if usage.LLMCalls != 1 { + t.Errorf("partial usage LLMCalls = %d, want 1 (sub1 only counted)", usage.LLMCalls) + } + // Strategy should NOT have been called on sub3 — we short-circuit. + if got := atomic.LoadInt32(&s.counter); got != 2 { + t.Errorf("strategy called %d times, want 2 (sub1 ok + sub2 err)", got) + } +} + +// When the wrapped strategy is NOT a CostStrategy, the decomposer still +// works — Usage is zero (we have nothing to aggregate) but selection +// behaviour is identical. +func TestDecomposerNonCostStrategy(t *testing.T) { + t.Parallel() + tr := buildTree() + s := &scriptedStrategy{ + picks: map[string][]tree.SectionID{ + "sub1": {"sec_a"}, + "sub2": {"sec_b"}, + }, + noCost: true, + } + d := retrieval.NewDecomposer(s.asStrategy()) + + plan := &retrieval.Plan{ + IsMultiHop: true, + SubQuestions: []string{"sub1", "sub2"}, + } + ids, usage, err := d.DecomposedSelect(context.Background(), tr, plan, "q", retrieval.ContextBudget{}) + if err != nil { + t.Fatal(err) + } + if len(ids) != 2 || ids[0] != "sec_a" || ids[1] != "sec_b" { + t.Errorf("want [sec_a sec_b], got %v", ids) + } + if usage.LLMCalls != 0 { + t.Errorf("non-CostStrategy should report zero Usage, got %+v", usage) + } +} + +// Defensive: a nil decomposer is a programming bug. The error should be +// clear, not a nil panic. +func TestDecomposerNilStrategyErrors(t *testing.T) { + t.Parallel() + d := &retrieval.Decomposer{} // Strategy is nil + _, _, err := d.DecomposedSelect(context.Background(), buildTree(), nil, "q", retrieval.ContextBudget{}) + if err == nil { + t.Fatal("expected error when Strategy is nil") + } +} + +// End-to-end: a Planner + Decomposer chain with the mockLLM that drives +// the real SinglePass strategy. Verifies the wiring works as advertised +// in the task spec. +func TestPlannerPlusDecomposerEndToEnd(t *testing.T) { + t.Parallel() + tr := buildTree() + + // 1. Planner returns a multi-hop plan. + pm := &plannerMock{ + replies: []string{ + jsonPlan(retrieval.Plan{ + Intent: "comparison", + IsMultiHop: true, + SubQuestions: []string{"sec_a please", "sec_c please"}, + }), + }, + } + planner := retrieval.NewPlanner(pm, "planner-model") + plan, _, err := planner.Plan(context.Background(), "compare setup and faq") + if err != nil { + t.Fatal(err) + } + if plan == nil || !plan.IsMultiHop { + t.Fatalf("expected multi-hop plan, got %+v", plan) + } + + // 2. Decomposer wraps a real single-pass strategy with a mock LLM + // that picks by string presence — each sub-question contains the + // target section ID literal so picks are deterministic. + m := &mockLLM{pickIfPresent: []tree.SectionID{"sec_a", "sec_c"}} + s := retrieval.NewSinglePass(m) + d := retrieval.NewDecomposer(s) + + ids, usage, err := d.DecomposedSelect(context.Background(), tr, plan, "compare setup and faq", retrieval.ContextBudget{MaxTokens: 1000}) + if err != nil { + t.Fatal(err) + } + if len(ids) != 2 { + t.Fatalf("end-to-end union = %v, want 2 ids", ids) + } + // First sub-question fires first → sec_a comes before sec_c. + if ids[0] != "sec_a" || ids[1] != "sec_c" { + t.Errorf("union order = %v, want [sec_a sec_c]", ids) + } + if usage.LLMCalls != 2 { + t.Errorf("usage LLMCalls = %d, want 2 (one per sub-question)", usage.LLMCalls) + } +} From bb32d4e3f42809a133ff09415e573ee6aa6a7646 Mon Sep 17 00:00:00 2001 From: Halleluyah Oludele Date: Wed, 27 May 2026 02:00:20 +0100 Subject: [PATCH 3/3] feat(api,config): wire planner + decomposer into /v1/query + /v1/answer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Server-side opt-in for Phase 2.1 + 2.2. New PlanningBlock under retrieval (enabled, model, cache_size, decompose; env: VLE_RETRIEVAL_PLANNING_*). Default disabled at both config and per-request levels, so existing callers see no behaviour change. Wiring: - api.Deps gains Planner + Planning fields. The body-level enable_planning field (pointer-bool to disambiguate absent from explicit-false) overrides the config block. - handleQuery / handleAnswer route through a small set of helpers (runPlanner, runSelection, runSelectionWithUsage) that fold the Planner output into selection. Multi-hop plans go through a Decomposer wrapping the active Strategy when planning.decompose is true. - /v1/answer's synthesis prompt grows a short "Planner notes" block (intent, entities, expected doc areas, sub-questions) when a plan is present, so the model reasons with the same understanding the retrieval pipeline used. - Both endpoints surface the sanitised Plan in the response under "plan" (omitempty) when planning ran. - cmd/engine instantiates a Planner whenever LLM is configured, so per-request opt-in still works even with planning.enabled=false. - Planner transport errors are LOGGED but not propagated — a planner blip cannot 500 an otherwise-working retrieval request. OpenAPI: Plan schema added; QueryRequest/AnswerRequest get enable_planning; QueryResponse/AnswerResponse get plan ($ref Plan, omitempty). config.example.yaml gets a documented retrieval.planning block. Tests: planning defaults + env-override coverage added to pkg/config/config_test.go. All existing tests still pass. --- cmd/engine/main.go | 22 +++++ config.example.yaml | 23 +++++ internal/api/server.go | 191 ++++++++++++++++++++++++++++++++------ openapi.yaml | 71 +++++++++++++- pkg/config/config.go | 68 ++++++++++++++ pkg/config/config_test.go | 44 +++++++++ 6 files changed, 392 insertions(+), 27 deletions(-) diff --git a/cmd/engine/main.go b/cmd/engine/main.go index ccaab06..e3dd112 100644 --- a/cmd/engine/main.go +++ b/cmd/engine/main.go @@ -109,6 +109,26 @@ func run() error { // Multi-document query dispatcher. multiDoc := retrieval.NewMultiDoc(strategy, pool.LoadTree) + // Planner: opt-in Phase 2.1. When disabled at boot we still + // instantiate it lazily — the per-request `enable_planning` body + // field overrides the config, so a server with planning.enabled=false + // but a Planner configured can still serve opt-in callers. + var planner *retrieval.Planner + if llmClient != nil { + plannerModel := cfg.Retrieval.Planning.Model + if plannerModel == "" { + plannerModel = modelFor(cfg.LLM) + } + planner = retrieval.NewPlannerWithCacheSize(llmClient, plannerModel, cfg.Retrieval.Planning.CacheSize) + if cfg.Retrieval.Planning.Enabled { + logger.Info("retrieval: planner enabled", + "model", plannerModel, + "cache_size", cfg.Retrieval.Planning.CacheSize, + "decompose", cfg.Retrieval.Planning.Decompose, + ) + } + } + pipeline := ingest.NewPipeline(ingest.Pipeline{ DB: pool, Storage: store, @@ -135,6 +155,8 @@ func run() error { LLMModel: modelFor(cfg.LLM), AnswerSpan: cfg.Retrieval.AnswerSpan, Answer: cfg.Retrieval.Answer, + Planner: planner, + Planning: cfg.Retrieval.Planning, } srv := &http.Server{ diff --git a/config.example.yaml b/config.example.yaml index 6faf264..ffb43a1 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -129,6 +129,29 @@ retrieval: max_sections: 5 max_answer_tokens: 1024 + # planning: Phase 2.1 query planning + Phase 2.2 multi-hop decomposition. + # When enabled, every /v1/query and /v1/answer request first issues a + # short LLM call that returns a structured Plan (intent, entities, + # expected document areas, multi-hop flag, sub-questions). Multi-hop + # plans fan retrieval out one selection call per sub-question and + # union the results. + # + # OPT-IN. Default disabled. Per-request `enable_planning` body field + # overrides this block, so callers can experiment without a restart. + # Plans are cached in a per-process LRU keyed on (query, model); + # repeated questions don't burn extra LLM budget. + planning: + enabled: false + # Override the planner's model; empty inherits the engine's + # configured default. Point this at a small/fast model — planning + # is a short prompt that shouldn't run on the flagship model. + model: "" + cache_size: 128 + # decompose: when planning runs, multi-hop plans fan retrieval + # out per sub-question. Set false to validate the planner in + # isolation (plan returned, but retrieval uses the original query). + decompose: true + ingest: # The summarize and HyDE stages run concurrently. This caps the total # number of LLM calls in flight across both stages combined, so the diff --git a/internal/api/server.go b/internal/api/server.go index 86134ee..38dadce 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -63,6 +63,16 @@ type Deps struct { // values (AnswerSpan disabled, Answer.MaxSections=5) are safe. AnswerSpan config.AnswerSpanBlock Answer config.AnswerBlock + + // Planner runs one LLM call before retrieval to build a structured + // Plan (intent + entities + multi-hop sub-questions). Nil disables + // planning even when a request opts in via `enable_planning`. + Planner *retrieval.Planner + + // Planning carries the server-side planning config. The body-level + // `enable_planning` field on /v1/query and /v1/answer overrides + // Planning.Enabled. + Planning config.PlanningBlock } // Router builds and returns the chi router wired with v1 routes. @@ -368,8 +378,15 @@ func (d Deps) handleGetSection(w http.ResponseWriter, r *http.Request) { // --- query --- // handleQuery accepts { document_id, query, model?, max_tokens?, -// reserved_for_prompt?, max_parallel_calls?, max_sections? } and runs the -// configured retrieval.Strategy against the document's tree. +// reserved_for_prompt?, max_parallel_calls?, max_sections?, +// enable_planning? } and runs the configured retrieval.Strategy against +// the document's tree. +// +// When `enable_planning` is true (or `retrieval.planning.enabled` is on +// at config level) the request first issues a planning LLM call. The +// resulting Plan is surfaced in the response under "plan". If the plan +// is multi-hop and decomposition is enabled, retrieval fans out one +// strategy call per sub-question and unions the results. func (d Deps) handleQuery(w http.ResponseWriter, r *http.Request) { var body struct { DocumentID tree.DocumentID `json:"document_id"` @@ -379,6 +396,10 @@ func (d Deps) handleQuery(w http.ResponseWriter, r *http.Request) { ReservedForPrompt int `json:"reserved_for_prompt"` MaxParallelCalls int `json:"max_parallel_calls"` MaxSections int `json:"max_sections"` + // EnablePlanning opts this request into the Phase 2.1 query + // planner. A pointer so we can distinguish "absent" from + // "explicit false" — absent falls back to the server config. + EnablePlanning *bool `json:"enable_planning"` } if err := json.NewDecoder(r.Body).Decode(&body); err != nil { writeErr(w, http.StatusBadRequest, "invalid json: "+err.Error()) @@ -420,7 +441,9 @@ func (d Deps) handleQuery(w http.ResponseWriter, r *http.Request) { } started := time.Now() - ids, err := d.Strategy.Select(r.Context(), t, body.Query, budget) + + plan, _ := d.runPlanner(r.Context(), body.Query, body.EnablePlanning) + ids, err := d.runSelection(r.Context(), t, plan, body.Query, budget) if err != nil { d.Logger.Error("query: strategy failed", "err", err, "document_id", body.DocumentID) writeErr(w, http.StatusInternalServerError, "retrieval failed: "+err.Error()) @@ -461,14 +484,18 @@ func (d Deps) handleQuery(w http.ResponseWriter, r *http.Request) { sections = append(sections, sectionWithContentToMap(e)) } - writeJSON(w, http.StatusOK, map[string]any{ + resp := map[string]any{ "document_id": body.DocumentID, "query": body.Query, "strategy": d.Strategy.Name(), "model": body.Model, "sections": sections, "elapsed_ms": time.Since(started).Milliseconds(), - }) + } + if plan != nil { + resp["plan"] = plan + } + writeJSON(w, http.StatusOK, resp) } // sectionWithContent bundles a tree section with its loaded content @@ -590,6 +617,9 @@ func (d Deps) handleAnswer(w http.ResponseWriter, r *http.Request) { MaxParallelCalls int `json:"max_parallel_calls"` MaxSections int `json:"max_sections"` MaxAnswerTokens int `json:"max_answer_tokens"` + // EnablePlanning opts this request into the Phase 2.1 query + // planner. See handleQuery for the same field's semantics. + EnablePlanning *bool `json:"enable_planning"` } if err := json.NewDecoder(r.Body).Decode(&body); err != nil { writeErr(w, http.StatusBadRequest, "invalid json: "+err.Error()) @@ -629,22 +659,13 @@ func (d Deps) handleAnswer(w http.ResponseWriter, r *http.Request) { started := time.Now() totalUsage := retrieval.Usage{} - var ids []tree.SectionID - var retrievalUsage retrieval.Usage - if cs, ok := d.Strategy.(retrieval.CostStrategy); ok { - res, err := cs.SelectWithCost(r.Context(), t, body.Query, budget) - if err != nil { - writeErr(w, http.StatusInternalServerError, "retrieval failed: "+err.Error()) - return - } - ids, retrievalUsage = res.SelectedIDs, res.Usage - } else { - picks, err := d.Strategy.Select(r.Context(), t, body.Query, budget) - if err != nil { - writeErr(w, http.StatusInternalServerError, "retrieval failed: "+err.Error()) - return - } - ids = picks + plan, planUsage := d.runPlanner(r.Context(), body.Query, body.EnablePlanning) + totalUsage.Add(planUsage) + + ids, retrievalUsage, err := d.runSelectionWithUsage(r.Context(), t, plan, body.Query, budget) + if err != nil { + writeErr(w, http.StatusInternalServerError, "retrieval failed: "+err.Error()) + return } totalUsage.Add(retrievalUsage) @@ -700,7 +721,7 @@ func (d Deps) handleAnswer(w http.ResponseWriter, r *http.Request) { maxAnswerTokens = 1024 } - answerText, synthUsage, err := synthesiseAnswer(r.Context(), d.LLM, synthModel, body.Query, enriched, maxAnswerTokens) + answerText, synthUsage, err := synthesiseAnswer(r.Context(), d.LLM, synthModel, body.Query, plan, enriched, maxAnswerTokens) if err != nil { writeErr(w, http.StatusInternalServerError, "synthesis failed: "+err.Error()) return @@ -729,7 +750,7 @@ func (d Deps) handleAnswer(w http.ResponseWriter, r *http.Request) { citations = append(citations, c) } - writeJSON(w, http.StatusOK, map[string]any{ + resp := map[string]any{ "document_id": body.DocumentID, "query": body.Query, "answer": answerText, @@ -744,17 +765,27 @@ func (d Deps) handleAnswer(w http.ResponseWriter, r *http.Request) { "llm_calls": totalUsage.LLMCalls, }, "elapsed_ms": time.Since(started).Milliseconds(), - }) + } + if plan != nil { + resp["plan"] = plan + } + writeJSON(w, http.StatusOK, resp) } // synthesiseAnswer runs one LLM call producing the final answer from // retrieved sections + their extracted spans. The model is told to -// cite by section ID. -func synthesiseAnswer(ctx context.Context, client llmgate.Client, model, query string, secs []sectionWithContent, maxAnswerTokens int) (string, retrieval.Usage, error) { +// cite by section ID. When plan is non-nil its structured hints +// (intent, entities, expected_doc_areas, sub_questions) are folded +// into the prompt as a short "Planner notes" block so the model can +// reason with the same understanding the retrieval pipeline used. +func synthesiseAnswer(ctx context.Context, client llmgate.Client, model, query string, plan *retrieval.Plan, secs []sectionWithContent, maxAnswerTokens int) (string, retrieval.Usage, error) { var b strings.Builder b.WriteString("You are answering a user's question using ONLY the evidence below.\n\n") b.WriteString("User query:\n") b.WriteString(query) + if plan != nil { + writePlanHints(&b, plan) + } b.WriteString("\n\nRetrieved evidence (each block is a section of the document):\n") for i, e := range secs { fmt.Fprintf(&b, "\n[%d] section_id=%s, title=%q", i+1, e.sec.ID, e.sec.Title) @@ -1018,6 +1049,114 @@ func (d Deps) handleQueueWebhook(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } +// --- planning helpers --- + +// planningEnabled reports whether the request should go through the +// planner. The per-request body field (when present) wins over the +// server-side config; a nil body field falls back to the config. +func (d Deps) planningEnabled(bodyOverride *bool) bool { + if d.Planner == nil { + return false + } + if bodyOverride != nil { + return *bodyOverride + } + return d.Planning.Enabled +} + +// runPlanner issues the planning LLM call when planning is enabled. +// Returns (nil, zero usage) when planning is off, the query is empty, +// the planner is missing, or the planner gracefully degraded to a +// no-plan result. Transport errors from the planner are LOGGED but not +// propagated — the engine continues with the original query rather +// than 500ing a working retrieval request because of a planner blip. +func (d Deps) runPlanner(ctx context.Context, query string, bodyOverride *bool) (*retrieval.Plan, retrieval.Usage) { + if !d.planningEnabled(bodyOverride) { + return nil, retrieval.Usage{} + } + plan, usage, err := d.Planner.Plan(ctx, query) + if err != nil { + if d.Logger != nil { + d.Logger.Warn("planner: failed; continuing without plan", "err", err) + } + return nil, usage + } + return plan, usage +} + +// runSelection picks section IDs for the query, optionally going +// through the Decomposer when the plan is multi-hop AND planning-level +// decomposition is enabled. Returns the same []SectionID Strategy.Select +// would. +func (d Deps) runSelection(ctx context.Context, t *tree.Tree, plan *retrieval.Plan, query string, budget retrieval.ContextBudget) ([]tree.SectionID, error) { + if d.shouldDecompose(plan) { + ids, _, err := retrieval.NewDecomposer(d.Strategy).DecomposedSelect(ctx, t, plan, query, budget) + return ids, err + } + return d.Strategy.Select(ctx, t, query, budget) +} + +// runSelectionWithUsage is the cost-tracking variant used by /v1/answer. +// Returns the selected IDs plus the Usage accumulated during selection +// (across all sub-questions for multi-hop plans). +func (d Deps) runSelectionWithUsage(ctx context.Context, t *tree.Tree, plan *retrieval.Plan, query string, budget retrieval.ContextBudget) ([]tree.SectionID, retrieval.Usage, error) { + if d.shouldDecompose(plan) { + return retrieval.NewDecomposer(d.Strategy).DecomposedSelect(ctx, t, plan, query, budget) + } + if cs, ok := d.Strategy.(retrieval.CostStrategy); ok { + res, err := cs.SelectWithCost(ctx, t, query, budget) + if err != nil { + return nil, retrieval.Usage{}, err + } + if res == nil { + return nil, retrieval.Usage{}, nil + } + return res.SelectedIDs, res.Usage, nil + } + ids, err := d.Strategy.Select(ctx, t, query, budget) + if err != nil { + return nil, retrieval.Usage{}, err + } + return ids, retrieval.Usage{}, nil +} + +// shouldDecompose returns true when the plan is multi-hop AND +// decomposition is enabled at the config level. The Decomposer +// itself short-circuits to Strategy.Select when the plan is missing +// or non-multi-hop, but we duplicate that check here so we avoid +// allocating a Decomposer when it would be a no-op. +func (d Deps) shouldDecompose(plan *retrieval.Plan) bool { + if plan == nil || !plan.IsMultiHop || len(plan.SubQuestions) == 0 { + return false + } + return d.Planning.Decompose +} + +// writePlanHints appends a short, model-readable "Planner notes" block +// describing the structured plan. Synthesis uses this to orient itself +// before reading the evidence. +func writePlanHints(b *strings.Builder, plan *retrieval.Plan) { + if plan == nil { + return + } + b.WriteString("\n\nPlanner notes (structured understanding of the query):") + if plan.Intent != "" { + fmt.Fprintf(b, "\n- intent: %s", plan.Intent) + } + if len(plan.Entities) > 0 { + fmt.Fprintf(b, "\n- entities: %s", strings.Join(plan.Entities, ", ")) + } + if len(plan.ExpectedDocAreas) > 0 { + fmt.Fprintf(b, "\n- expected document areas: %s", strings.Join(plan.ExpectedDocAreas, ", ")) + } + if plan.IsMultiHop && len(plan.SubQuestions) > 0 { + b.WriteString("\n- sub-questions:") + for _, q := range plan.SubQuestions { + fmt.Fprintf(b, "\n - %s", q) + } + } +} + // --- helpers --- func writeJSON(w http.ResponseWriter, status int, v any) { diff --git a/openapi.yaml b/openapi.yaml index c561693..f215556 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -451,6 +451,17 @@ components: max_sections: type: integer description: Cap on returned sections. 0 = no cap. + enable_planning: + type: boolean + description: | + Opt this request into the query planner. One short LLM call + runs before retrieval and returns a structured Plan + (intent, entities, expected document areas, multi-hop + flag, sub-questions). When the plan is multi-hop and + `retrieval.planning.decompose` is enabled on the server, + retrieval fans out one selection call per sub-question + and unions the results. Overrides the server's + `retrieval.planning.enabled` setting for this request only. QueryResponse: type: object @@ -461,13 +472,15 @@ components: type: string strategy: type: string - enum: [single-pass, chunked-tree] + enum: [single-pass, chunked-tree, agentic] model: type: string sections: type: array items: $ref: "#/components/schemas/QuerySection" + plan: + $ref: "#/components/schemas/Plan" elapsed_ms: type: integer @@ -540,6 +553,13 @@ components: max_answer_tokens: type: integer description: Bound the synthesised answer length. Defaults to retrieval.answer.max_answer_tokens (1024). + enable_planning: + type: boolean + description: | + Opt this request into the query planner. See QueryRequest for + full semantics. When enabled, the synthesis prompt also sees + the planner's structured intent and entity hints, and the + response carries a top-level `plan` field. AnswerResponse: type: object @@ -567,9 +587,58 @@ components: total_tokens: {type: integer} cost_usd: {type: number} llm_calls: {type: integer} + plan: + $ref: "#/components/schemas/Plan" elapsed_ms: type: integer + Plan: + type: object + description: | + Structured understanding of the query produced by the Phase 2.1 + planner. Returned only when the request opts in (per-request + `enable_planning` or server-side `retrieval.planning.enabled`). + Synthesis (in /v1/answer) reads `intent`, `entities`, and + `expected_doc_areas` as orientation hints. When `is_multi_hop` + is true and `retrieval.planning.decompose` is on, retrieval + fans out one selection call per `sub_questions[i]` and unions + the per-sub-question selections. + properties: + intent: + type: string + description: | + Short snake_case label for the query intent. Open-ended: + common values are `factual_lookup`, `comparison`, `summary`, + `definition`, `list`, `calculation`. + entities: + type: array + items: + type: string + description: | + Salient proper nouns, dates, numbers, or specific terms the + query hinges on. Surfaced to the synthesis model for + orientation. + expected_doc_areas: + type: array + items: + type: string + description: | + Hints about which parts of a document are likely to hold the + answer (e.g. "balance sheet", "risk factors"). Empty when + no strong prior exists. + is_multi_hop: + type: boolean + description: | + True only when the query requires answering distinct + sub-questions whose answers must be combined. + sub_questions: + type: array + items: + type: string + description: | + Focused sub-questions the decomposer dispatches one-at-a-time. + Populated only when `is_multi_hop` is true. + AnswerCitation: type: object description: | diff --git a/pkg/config/config.go b/pkg/config/config.go index 525f83d..28ad5ac 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -203,6 +203,41 @@ type RetrievalConfig struct { Cache CacheBlock `yaml:"cache"` AnswerSpan AnswerSpanBlock `yaml:"answer_span"` Answer AnswerBlock `yaml:"answer"` + Planning PlanningBlock `yaml:"planning"` +} + +// PlanningBlock configures Phase 2.1 query planning + Phase 2.2 multi-hop +// decomposition. +// +// When enabled, every /v1/query and /v1/answer request issues one short +// LLM call before retrieval to build a Plan (intent + entities + expected +// document areas + multi-hop flag + sub-questions). On multi-hop plans, +// retrieval fans out one selection call per sub-question and unions the +// results. +// +// Planning is opt-in. The per-request `enable_planning` body field +// overrides this config block; the body field winning lets callers +// experiment without a server restart. +type PlanningBlock struct { + // Enabled toggles planning at the server level. Default: false. + Enabled bool `yaml:"enabled"` + + // Model overrides the planner's LLM model. Empty means use the + // request's model (which itself falls back to the engine default). + // Point this at a small/fast model — planning is a short prompt + // that should not run on the same flagship used for synthesis. + Model string `yaml:"model"` + + // CacheSize bounds the per-process LRU of (query, model) → Plan + // entries. 0 means use the planner's default (128). + CacheSize int `yaml:"cache_size"` + + // Decompose toggles Phase 2.2 multi-hop decomposition. When false, + // planning still runs (and the plan is surfaced in the response) + // but retrieval always sees the original query — useful for + // validating the planner in isolation before turning decomposition + // on. Default: true (when Enabled). + Decompose bool `yaml:"decompose"` } // AnswerSpanBlock configures the answer-span extractor. @@ -328,6 +363,11 @@ func Default() Config { MaxSections: 5, MaxAnswerTokens: 1024, }, + Planning: PlanningBlock{ + Enabled: false, + CacheSize: 128, + Decompose: true, + }, }, Ingest: IngestConfig{ GlobalLLMConcurrency: 12, @@ -492,6 +532,30 @@ func applyEnvOverrides(c *Config) { c.Retrieval.Answer.MaxSections = n } } + if v := os.Getenv("VLE_RETRIEVAL_PLANNING_ENABLED"); v != "" { + switch strings.ToLower(strings.TrimSpace(v)) { + case "1", "true", "yes", "on": + c.Retrieval.Planning.Enabled = true + case "0", "false", "no", "off": + c.Retrieval.Planning.Enabled = false + } + } + if v := os.Getenv("VLE_RETRIEVAL_PLANNING_MODEL"); v != "" { + c.Retrieval.Planning.Model = v + } + if v := os.Getenv("VLE_RETRIEVAL_PLANNING_CACHE_SIZE"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + c.Retrieval.Planning.CacheSize = n + } + } + if v := os.Getenv("VLE_RETRIEVAL_PLANNING_DECOMPOSE"); v != "" { + switch strings.ToLower(strings.TrimSpace(v)) { + case "1", "true", "yes", "on": + c.Retrieval.Planning.Decompose = true + case "0", "false", "no", "off": + c.Retrieval.Planning.Decompose = false + } + } } // Validate checks that required fields for the selected drivers are set. @@ -574,5 +638,9 @@ func (c Config) Validate() error { return fmt.Errorf("ingest.global_llm_concurrency must be >= 0, got %d", c.Ingest.GlobalLLMConcurrency) } + if c.Retrieval.Planning.CacheSize < 0 { + return fmt.Errorf("retrieval.planning.cache_size must be >= 0, got %d", c.Retrieval.Planning.CacheSize) + } + return nil } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 1f29c76..fb60973 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -37,11 +37,55 @@ func TestDefaultValues(t *testing.T) { if cfg.Retrieval.Cache.TTLSeconds != 600 { t.Errorf("cache.ttl_seconds = %d, want 600", cfg.Retrieval.Cache.TTLSeconds) } + if cfg.Retrieval.Planning.Enabled { + t.Error("retrieval.planning.enabled should default to false") + } + if cfg.Retrieval.Planning.CacheSize != 128 { + t.Errorf("retrieval.planning.cache_size = %d, want 128", cfg.Retrieval.Planning.CacheSize) + } + if !cfg.Retrieval.Planning.Decompose { + t.Error("retrieval.planning.decompose should default to true (when planning is enabled)") + } if cfg.Log.Level != "info" { t.Errorf("log.level = %q, want info", cfg.Log.Level) } } +func TestPlanningEnvOverride(t *testing.T) { + // Not parallel — mutates env. Restore on exit. + prevEnabled := os.Getenv("VLE_RETRIEVAL_PLANNING_ENABLED") + prevModel := os.Getenv("VLE_RETRIEVAL_PLANNING_MODEL") + prevCache := os.Getenv("VLE_RETRIEVAL_PLANNING_CACHE_SIZE") + prevDecompose := os.Getenv("VLE_RETRIEVAL_PLANNING_DECOMPOSE") + defer func() { + os.Setenv("VLE_RETRIEVAL_PLANNING_ENABLED", prevEnabled) + os.Setenv("VLE_RETRIEVAL_PLANNING_MODEL", prevModel) + os.Setenv("VLE_RETRIEVAL_PLANNING_CACHE_SIZE", prevCache) + os.Setenv("VLE_RETRIEVAL_PLANNING_DECOMPOSE", prevDecompose) + }() + + os.Setenv("VLE_RETRIEVAL_PLANNING_ENABLED", "true") + os.Setenv("VLE_RETRIEVAL_PLANNING_MODEL", "gemini-2.0-flash") + os.Setenv("VLE_RETRIEVAL_PLANNING_CACHE_SIZE", "256") + os.Setenv("VLE_RETRIEVAL_PLANNING_DECOMPOSE", "false") + + cfg := Default() + applyEnvOverrides(&cfg) + + if !cfg.Retrieval.Planning.Enabled { + t.Error("VLE_RETRIEVAL_PLANNING_ENABLED=true should enable planning") + } + if cfg.Retrieval.Planning.Model != "gemini-2.0-flash" { + t.Errorf("planning model = %q, want gemini-2.0-flash", cfg.Retrieval.Planning.Model) + } + if cfg.Retrieval.Planning.CacheSize != 256 { + t.Errorf("planning cache_size = %d, want 256", cfg.Retrieval.Planning.CacheSize) + } + if cfg.Retrieval.Planning.Decompose { + t.Error("VLE_RETRIEVAL_PLANNING_DECOMPOSE=false should disable decompose") + } +} + func TestValidateStorageLocal(t *testing.T) { t.Parallel() cfg := Default()