Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions cmd/engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,26 @@ func run() error {
// Multi-document query dispatcher.
multiDoc := retrieval.NewMultiDoc(strategy, pool.LoadTree)

// Planner: opt-in Phase 2.1. When disabled at boot we still
// instantiate it lazily — the per-request `enable_planning` body
// field overrides the config, so a server with planning.enabled=false
// but a Planner configured can still serve opt-in callers.
var planner *retrieval.Planner
if llmClient != nil {
plannerModel := cfg.Retrieval.Planning.Model
if plannerModel == "" {
plannerModel = modelFor(cfg.LLM)
}
planner = retrieval.NewPlannerWithCacheSize(llmClient, plannerModel, cfg.Retrieval.Planning.CacheSize)
if cfg.Retrieval.Planning.Enabled {
logger.Info("retrieval: planner enabled",
"model", plannerModel,
"cache_size", cfg.Retrieval.Planning.CacheSize,
"decompose", cfg.Retrieval.Planning.Decompose,
)
}
}

pipeline := ingest.NewPipeline(ingest.Pipeline{
DB: pool,
Storage: store,
Expand All @@ -135,6 +155,8 @@ func run() error {
LLMModel: modelFor(cfg.LLM),
AnswerSpan: cfg.Retrieval.AnswerSpan,
Answer: cfg.Retrieval.Answer,
Planner: planner,
Planning: cfg.Retrieval.Planning,
}

srv := &http.Server{
Expand Down
23 changes: 23 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,29 @@ retrieval:
max_sections: 5
max_answer_tokens: 1024

# planning: Phase 2.1 query planning + Phase 2.2 multi-hop decomposition.
# When enabled, every /v1/query and /v1/answer request first issues a
# short LLM call that returns a structured Plan (intent, entities,
# expected document areas, multi-hop flag, sub-questions). Multi-hop
# plans fan retrieval out one selection call per sub-question and
# union the results.
#
# OPT-IN. Default disabled. Per-request `enable_planning` body field
# overrides this block, so callers can experiment without a restart.
# Plans are cached in a per-process LRU keyed on (query, model);
# repeated questions don't burn extra LLM budget.
planning:
enabled: false
# Override the planner's model; empty inherits the engine's
# configured default. Point this at a small/fast model — planning
# is a short prompt that shouldn't run on the flagship model.
model: ""
cache_size: 128
# decompose: when planning runs, multi-hop plans fan retrieval
# out per sub-question. Set false to validate the planner in
# isolation (plan returned, but retrieval uses the original query).
decompose: true

ingest:
# The summarize and HyDE stages run concurrently. This caps the total
# number of LLM calls in flight across both stages combined, so the
Expand Down
191 changes: 165 additions & 26 deletions internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ type Deps struct {
// values (AnswerSpan disabled, Answer.MaxSections=5) are safe.
AnswerSpan config.AnswerSpanBlock
Answer config.AnswerBlock

// Planner runs one LLM call before retrieval to build a structured
// Plan (intent + entities + multi-hop sub-questions). Nil disables
// planning even when a request opts in via `enable_planning`.
Planner *retrieval.Planner

// Planning carries the server-side planning config. The body-level
// `enable_planning` field on /v1/query and /v1/answer overrides
// Planning.Enabled.
Planning config.PlanningBlock
}

// Router builds and returns the chi router wired with v1 routes.
Expand Down Expand Up @@ -368,8 +378,15 @@ func (d Deps) handleGetSection(w http.ResponseWriter, r *http.Request) {
// --- query ---

// handleQuery accepts { document_id, query, model?, max_tokens?,
// reserved_for_prompt?, max_parallel_calls?, max_sections? } and runs the
// configured retrieval.Strategy against the document's tree.
// reserved_for_prompt?, max_parallel_calls?, max_sections?,
// enable_planning? } and runs the configured retrieval.Strategy against
// the document's tree.
//
// When `enable_planning` is true (or `retrieval.planning.enabled` is on
// at config level) the request first issues a planning LLM call. The
// resulting Plan is surfaced in the response under "plan". If the plan
// is multi-hop and decomposition is enabled, retrieval fans out one
// strategy call per sub-question and unions the results.
func (d Deps) handleQuery(w http.ResponseWriter, r *http.Request) {
var body struct {
DocumentID tree.DocumentID `json:"document_id"`
Expand All @@ -379,6 +396,10 @@ func (d Deps) handleQuery(w http.ResponseWriter, r *http.Request) {
ReservedForPrompt int `json:"reserved_for_prompt"`
MaxParallelCalls int `json:"max_parallel_calls"`
MaxSections int `json:"max_sections"`
// EnablePlanning opts this request into the Phase 2.1 query
// planner. A pointer so we can distinguish "absent" from
// "explicit false" — absent falls back to the server config.
EnablePlanning *bool `json:"enable_planning"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
writeErr(w, http.StatusBadRequest, "invalid json: "+err.Error())
Expand Down Expand Up @@ -420,7 +441,9 @@ func (d Deps) handleQuery(w http.ResponseWriter, r *http.Request) {
}

started := time.Now()
ids, err := d.Strategy.Select(r.Context(), t, body.Query, budget)

plan, _ := d.runPlanner(r.Context(), body.Query, body.EnablePlanning)
ids, err := d.runSelection(r.Context(), t, plan, body.Query, budget)
if err != nil {
d.Logger.Error("query: strategy failed", "err", err, "document_id", body.DocumentID)
writeErr(w, http.StatusInternalServerError, "retrieval failed: "+err.Error())
Expand Down Expand Up @@ -461,14 +484,18 @@ func (d Deps) handleQuery(w http.ResponseWriter, r *http.Request) {
sections = append(sections, sectionWithContentToMap(e))
}

writeJSON(w, http.StatusOK, map[string]any{
resp := map[string]any{
"document_id": body.DocumentID,
"query": body.Query,
"strategy": d.Strategy.Name(),
"model": body.Model,
"sections": sections,
"elapsed_ms": time.Since(started).Milliseconds(),
})
}
if plan != nil {
resp["plan"] = plan
}
writeJSON(w, http.StatusOK, resp)
}

// sectionWithContent bundles a tree section with its loaded content
Expand Down Expand Up @@ -590,6 +617,9 @@ func (d Deps) handleAnswer(w http.ResponseWriter, r *http.Request) {
MaxParallelCalls int `json:"max_parallel_calls"`
MaxSections int `json:"max_sections"`
MaxAnswerTokens int `json:"max_answer_tokens"`
// EnablePlanning opts this request into the Phase 2.1 query
// planner. See handleQuery for the same field's semantics.
EnablePlanning *bool `json:"enable_planning"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
writeErr(w, http.StatusBadRequest, "invalid json: "+err.Error())
Expand Down Expand Up @@ -629,22 +659,13 @@ func (d Deps) handleAnswer(w http.ResponseWriter, r *http.Request) {
started := time.Now()
totalUsage := retrieval.Usage{}

var ids []tree.SectionID
var retrievalUsage retrieval.Usage
if cs, ok := d.Strategy.(retrieval.CostStrategy); ok {
res, err := cs.SelectWithCost(r.Context(), t, body.Query, budget)
if err != nil {
writeErr(w, http.StatusInternalServerError, "retrieval failed: "+err.Error())
return
}
ids, retrievalUsage = res.SelectedIDs, res.Usage
} else {
picks, err := d.Strategy.Select(r.Context(), t, body.Query, budget)
if err != nil {
writeErr(w, http.StatusInternalServerError, "retrieval failed: "+err.Error())
return
}
ids = picks
plan, planUsage := d.runPlanner(r.Context(), body.Query, body.EnablePlanning)
totalUsage.Add(planUsage)

ids, retrievalUsage, err := d.runSelectionWithUsage(r.Context(), t, plan, body.Query, budget)
if err != nil {
writeErr(w, http.StatusInternalServerError, "retrieval failed: "+err.Error())
return
}
totalUsage.Add(retrievalUsage)

Expand Down Expand Up @@ -700,7 +721,7 @@ func (d Deps) handleAnswer(w http.ResponseWriter, r *http.Request) {
maxAnswerTokens = 1024
}

answerText, synthUsage, err := synthesiseAnswer(r.Context(), d.LLM, synthModel, body.Query, enriched, maxAnswerTokens)
answerText, synthUsage, err := synthesiseAnswer(r.Context(), d.LLM, synthModel, body.Query, plan, enriched, maxAnswerTokens)
if err != nil {
writeErr(w, http.StatusInternalServerError, "synthesis failed: "+err.Error())
return
Expand Down Expand Up @@ -729,7 +750,7 @@ func (d Deps) handleAnswer(w http.ResponseWriter, r *http.Request) {
citations = append(citations, c)
}

writeJSON(w, http.StatusOK, map[string]any{
resp := map[string]any{
"document_id": body.DocumentID,
"query": body.Query,
"answer": answerText,
Expand All @@ -744,17 +765,27 @@ func (d Deps) handleAnswer(w http.ResponseWriter, r *http.Request) {
"llm_calls": totalUsage.LLMCalls,
},
"elapsed_ms": time.Since(started).Milliseconds(),
})
}
if plan != nil {
resp["plan"] = plan
}
writeJSON(w, http.StatusOK, resp)
}

// synthesiseAnswer runs one LLM call producing the final answer from
// retrieved sections + their extracted spans. The model is told to
// cite by section ID.
func synthesiseAnswer(ctx context.Context, client llmgate.Client, model, query string, secs []sectionWithContent, maxAnswerTokens int) (string, retrieval.Usage, error) {
// cite by section ID. When plan is non-nil its structured hints
// (intent, entities, expected_doc_areas, sub_questions) are folded
// into the prompt as a short "Planner notes" block so the model can
// reason with the same understanding the retrieval pipeline used.
func synthesiseAnswer(ctx context.Context, client llmgate.Client, model, query string, plan *retrieval.Plan, secs []sectionWithContent, maxAnswerTokens int) (string, retrieval.Usage, error) {
var b strings.Builder
b.WriteString("You are answering a user's question using ONLY the evidence below.\n\n")
b.WriteString("User query:\n")
b.WriteString(query)
if plan != nil {
writePlanHints(&b, plan)
}
b.WriteString("\n\nRetrieved evidence (each block is a section of the document):\n")
for i, e := range secs {
fmt.Fprintf(&b, "\n[%d] section_id=%s, title=%q", i+1, e.sec.ID, e.sec.Title)
Expand Down Expand Up @@ -1018,6 +1049,114 @@ func (d Deps) handleQueueWebhook(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

// --- planning helpers ---

// planningEnabled reports whether the request should go through the
// planner. The per-request body field (when present) wins over the
// server-side config; a nil body field falls back to the config.
func (d Deps) planningEnabled(bodyOverride *bool) bool {
if d.Planner == nil {
return false
}
if bodyOverride != nil {
return *bodyOverride
}
return d.Planning.Enabled
}

// runPlanner issues the planning LLM call when planning is enabled.
// Returns (nil, zero usage) when planning is off, the query is empty,
// the planner is missing, or the planner gracefully degraded to a
// no-plan result. Transport errors from the planner are LOGGED but not
// propagated — the engine continues with the original query rather
// than 500ing a working retrieval request because of a planner blip.
func (d Deps) runPlanner(ctx context.Context, query string, bodyOverride *bool) (*retrieval.Plan, retrieval.Usage) {
if !d.planningEnabled(bodyOverride) {
return nil, retrieval.Usage{}
}
plan, usage, err := d.Planner.Plan(ctx, query)
if err != nil {
if d.Logger != nil {
d.Logger.Warn("planner: failed; continuing without plan", "err", err)
}
return nil, usage
}
return plan, usage
}

// runSelection picks section IDs for the query, optionally going
// through the Decomposer when the plan is multi-hop AND planning-level
// decomposition is enabled. Returns the same []SectionID Strategy.Select
// would.
func (d Deps) runSelection(ctx context.Context, t *tree.Tree, plan *retrieval.Plan, query string, budget retrieval.ContextBudget) ([]tree.SectionID, error) {
if d.shouldDecompose(plan) {
ids, _, err := retrieval.NewDecomposer(d.Strategy).DecomposedSelect(ctx, t, plan, query, budget)
return ids, err
}
return d.Strategy.Select(ctx, t, query, budget)
}

// runSelectionWithUsage is the cost-tracking variant used by /v1/answer.
// Returns the selected IDs plus the Usage accumulated during selection
// (across all sub-questions for multi-hop plans).
func (d Deps) runSelectionWithUsage(ctx context.Context, t *tree.Tree, plan *retrieval.Plan, query string, budget retrieval.ContextBudget) ([]tree.SectionID, retrieval.Usage, error) {
if d.shouldDecompose(plan) {
return retrieval.NewDecomposer(d.Strategy).DecomposedSelect(ctx, t, plan, query, budget)
}
if cs, ok := d.Strategy.(retrieval.CostStrategy); ok {
res, err := cs.SelectWithCost(ctx, t, query, budget)
if err != nil {
return nil, retrieval.Usage{}, err
}
if res == nil {
return nil, retrieval.Usage{}, nil
}
return res.SelectedIDs, res.Usage, nil
}
ids, err := d.Strategy.Select(ctx, t, query, budget)
if err != nil {
return nil, retrieval.Usage{}, err
}
return ids, retrieval.Usage{}, nil
}

// shouldDecompose returns true when the plan is multi-hop AND
// decomposition is enabled at the config level. The Decomposer
// itself short-circuits to Strategy.Select when the plan is missing
// or non-multi-hop, but we duplicate that check here so we avoid
// allocating a Decomposer when it would be a no-op.
func (d Deps) shouldDecompose(plan *retrieval.Plan) bool {
if plan == nil || !plan.IsMultiHop || len(plan.SubQuestions) == 0 {
return false
}
return d.Planning.Decompose
}

// writePlanHints appends a short, model-readable "Planner notes" block
// describing the structured plan. Synthesis uses this to orient itself
// before reading the evidence.
func writePlanHints(b *strings.Builder, plan *retrieval.Plan) {
if plan == nil {
return
}
b.WriteString("\n\nPlanner notes (structured understanding of the query):")
if plan.Intent != "" {
fmt.Fprintf(b, "\n- intent: %s", plan.Intent)
}
if len(plan.Entities) > 0 {
fmt.Fprintf(b, "\n- entities: %s", strings.Join(plan.Entities, ", "))
}
if len(plan.ExpectedDocAreas) > 0 {
fmt.Fprintf(b, "\n- expected document areas: %s", strings.Join(plan.ExpectedDocAreas, ", "))
}
if plan.IsMultiHop && len(plan.SubQuestions) > 0 {
b.WriteString("\n- sub-questions:")
for _, q := range plan.SubQuestions {
fmt.Fprintf(b, "\n - %s", q)
}
}
}

// --- helpers ---

func writeJSON(w http.ResponseWriter, status int, v any) {
Expand Down
Loading
Loading