feat(ingest): run HyDE in parallel with summarize#15
Conversation
Summarize and HyDE now run as concurrent goroutines instead of strictly sequential stages. HyDE's input is (title, content) — the section summary was a weak hint and is now omitted from the prompt, which removes the only ordering dependency between the two stages. A new ingest.global_llm_concurrency knob (default 12) caps total LLM-in-flight across both stages so the provider's per-tenant limit isn't blown. Option A (fully concurrent stages) was chosen over per-section pipelining because HyDE has no hard dependency on summary text: title + the first 4K of content carry strictly more signal than a 60-word summary derived from that same content. Test coverage: - runParallelStages: interleave proved by blocking summarize while HyDE completes - global semaphore: peak in-flight never exceeds the cap under load - cancellation: acquire returns ok=false on a canceled ctx - prompt regression guard: s.Summary text must not appear in the HyDE user prompt - integration: gated on TEST_DATABASE_URL, ingests the rust markdown fixture end-to-end, asserts every section has a summary and every leaf has candidate_questions, and verifies the first HyDE call's timestamp precedes the last summarize call's
Reviewer's GuideRuns the summarize and HyDE ingest stages concurrently while introducing a shared global LLM concurrency cap, updating HyDE prompts to no longer depend on persisted summaries, wiring configuration/CLI, and adding unit/integration tests to validate concurrency and prompt behavior. Sequence diagram for parallel summarize and HyDE stages with global LLM concurrencysequenceDiagram
title Parallel summarize and HyDE with shared global LLM cap
participant Pipeline
participant runParallelStages
participant SummarizeStage as summarize
participant HyDEStage as generateCandidateQuestions
participant GlobalLLMSemaphore as globalLLMSem
Pipeline->>runParallelStages: runParallelStages(ctx, summarizeFn, hydeFn)
par summarize goroutine
runParallelStages->>SummarizeStage: summarize(ctx, docID, profile)
loop per section
SummarizeStage->>Pipeline: acquireGlobalLLM(ctx)
alt globalLLMSem enabled
Pipeline->>GlobalLLMSemaphore: send struct{}
GlobalLLMSemaphore-->>Pipeline: acquired
else globalLLMSem disabled
Pipeline-->>SummarizeStage: no-op
end
SummarizeStage->>SummarizeStage: summaryFor(ctx, section, childLines, profile)
SummarizeStage-->>GlobalLLMSemaphore: release slot
end
and HyDE goroutine
runParallelStages->>HyDEStage: generateCandidateQuestions(ctx, docID, profile)
loop per leaf section
HyDEStage->>Pipeline: acquireGlobalLLM(ctx)
alt globalLLMSem enabled
Pipeline->>GlobalLLMSemaphore: send struct{}
GlobalLLMSemaphore-->>Pipeline: acquired
else globalLLMSem disabled
Pipeline-->>HyDEStage: no-op
end
HyDEStage->>HyDEStage: candidateQuestionsFor(ctx, section, profile)
HyDEStage-->>GlobalLLMSemaphore: release slot
end
end
runParallelStages-->>Pipeline: summarizeErr, hydeErr
Pipeline->>Pipeline: log summarizeErr / hydeErr and SetDocumentStatus(StatusReady)
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Warning Review limit reached
More reviews will be available in 31 minutes and 53 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (9)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- runParallelStages writes summarizeErr/hydeErr from separate goroutines without synchronization, which is a data race; consider returning errors via channels or capturing them in local variables guarded by a mutex instead of assigning to outer variables from goroutines.
- The GlobalLLMConcurrency semantics are inconsistent: comments/config say 0 disables the global cap, but NewPipeline currently treats 0 as "set default 12" so there is no way to disable the cap; either adjust the constructor logic to leave 0 as 0 (and keep globalLLMSem nil) or update the comments/config description to match the actual behavior.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- runParallelStages writes summarizeErr/hydeErr from separate goroutines without synchronization, which is a data race; consider returning errors via channels or capturing them in local variables guarded by a mutex instead of assigning to outer variables from goroutines.
- The GlobalLLMConcurrency semantics are inconsistent: comments/config say 0 disables the global cap, but NewPipeline currently treats 0 as "set default 12" so there is no way to disable the cap; either adjust the constructor logic to leave 0 as 0 (and keep globalLLMSem nil) or update the comments/config description to match the actual behavior.
## Individual Comments
### Comment 1
<location path="pkg/ingest/ingest.go" line_range="133-142" />
<code_context>
if p.HyDEConcurrency <= 0 {
p.HyDEConcurrency = 4
}
+ // Default the global cap to a value that comfortably exceeds the
+ // sum of the two default per-stage caps (4 + 4 = 8) while leaving
+ // some headroom — but stays well below typical provider per-tenant
+ // concurrency limits.
+ if p.GlobalLLMConcurrency < 0 {
+ p.GlobalLLMConcurrency = 0
+ }
+ if p.GlobalLLMConcurrency == 0 {
+ p.GlobalLLMConcurrency = 12
+ }
+ if p.GlobalLLMConcurrency > 0 {
+ p.globalLLMSem = make(chan struct{}, p.GlobalLLMConcurrency)
+ }
</code_context>
<issue_to_address>
**issue (bug_risk):** GlobalLLMConcurrency handling conflicts with the documented "0 disables the global cap" semantics
Current behavior treats `GlobalLLMConcurrency == 0` as "use default (12)" and always initializes `globalLLMSem` when the value is > 0, so there is no way to disable the global semaphore despite comments stating that `0` disables the cap.
To align with the documented semantics, you could either:
- Distinguish "unspecified" vs. "explicit zero" (e.g., pointer in config or a sentinel like `-1`), or
- Only apply the default of 12 when constructing a fresh `Pipeline` with the field at its zero value, and otherwise respect an explicit `0` as "disabled".
As written, the code in `Pipeline`, `IngestConfig`, and the example configs documents behavior that this initialization does not implement.
</issue_to_address>
### Comment 2
<location path="pkg/ingest/integration_test.go" line_range="173" />
<code_context>
+ }
+
+ hasChildren := map[tree.SectionID]bool{}
+ for _, s := range sections {
+ if s.ParentID != "" {
+ hasChildren[s.ParentID] = true
</code_context>
<issue_to_address>
**suggestion (testing):** Also assert that non-leaf sections do NOT get candidate_questions to fully exercise the HyDE targeting contract
The integration test confirms that all leaves get `candidate_questions`, but it should also assert the inverse: sections listed in `hasChildren` (internal nodes) have an empty `CandidateQuestions` slice. This will better enforce the contract that HyDE only targets leaves and help catch regressions where questions are written to internal sections.
Suggested implementation:
```golang
var missingSummary, missingQuestions, unexpectedQuestions []tree.SectionID
for _, s := range sections {
if strings.TrimSpace(s.Summary) == "" {
missingSummary = append(missingSummary, s.ID)
}
// HyDE only targets leaves (internal nodes are skipped on purpose).
// Assert that all leaves have candidate_questions and all internal nodes do not.
if !hasChildren[s.ID] {
if len(s.CandidateQuestions) == 0 {
missingQuestions = append(missingQuestions, s.ID)
}
} else if len(s.CandidateQuestions) > 0 {
unexpectedQuestions = append(unexpectedQuestions, s.ID)
}
```
To fully enforce the "HyDE only targets leaves" contract, add an assertion after the loop that `len(unexpectedQuestions) == 0`, similar to how `missingQuestions` is currently asserted. For example, fail the test with a helpful message if `unexpectedQuestions` is non-empty, indicating that internal sections (those present in `hasChildren`) incorrectly received `CandidateQuestions`.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| // Default the global cap to a value that comfortably exceeds the | ||
| // sum of the two default per-stage caps (4 + 4 = 8) while leaving | ||
| // some headroom — but stays well below typical provider per-tenant | ||
| // concurrency limits. | ||
| if p.GlobalLLMConcurrency < 0 { | ||
| p.GlobalLLMConcurrency = 0 | ||
| } | ||
| if p.GlobalLLMConcurrency == 0 { | ||
| p.GlobalLLMConcurrency = 12 | ||
| } |
There was a problem hiding this comment.
issue (bug_risk): GlobalLLMConcurrency handling conflicts with the documented "0 disables the global cap" semantics
Current behavior treats GlobalLLMConcurrency == 0 as "use default (12)" and always initializes globalLLMSem when the value is > 0, so there is no way to disable the global semaphore despite comments stating that 0 disables the cap.
To align with the documented semantics, you could either:
- Distinguish "unspecified" vs. "explicit zero" (e.g., pointer in config or a sentinel like
-1), or - Only apply the default of 12 when constructing a fresh
Pipelinewith the field at its zero value, and otherwise respect an explicit0as "disabled".
As written, the code in Pipeline, IngestConfig, and the example configs documents behavior that this initialization does not implement.
| } | ||
|
|
||
| hasChildren := map[tree.SectionID]bool{} | ||
| for _, s := range sections { |
There was a problem hiding this comment.
suggestion (testing): Also assert that non-leaf sections do NOT get candidate_questions to fully exercise the HyDE targeting contract
The integration test confirms that all leaves get candidate_questions, but it should also assert the inverse: sections listed in hasChildren (internal nodes) have an empty CandidateQuestions slice. This will better enforce the contract that HyDE only targets leaves and help catch regressions where questions are written to internal sections.
Suggested implementation:
var missingSummary, missingQuestions, unexpectedQuestions []tree.SectionID
for _, s := range sections {
if strings.TrimSpace(s.Summary) == "" {
missingSummary = append(missingSummary, s.ID)
}
// HyDE only targets leaves (internal nodes are skipped on purpose).
// Assert that all leaves have candidate_questions and all internal nodes do not.
if !hasChildren[s.ID] {
if len(s.CandidateQuestions) == 0 {
missingQuestions = append(missingQuestions, s.ID)
}
} else if len(s.CandidateQuestions) > 0 {
unexpectedQuestions = append(unexpectedQuestions, s.ID)
}To fully enforce the "HyDE only targets leaves" contract, add an assertion after the loop that len(unexpectedQuestions) == 0, similar to how missingQuestions is currently asserted. For example, fail the test with a helpful message if unexpectedQuestions is non-empty, indicating that internal sections (those present in hasChildren) incorrectly received CandidateQuestions.
Summary
sequential stages. On a 200-section 10-K this roughly halves total
ingest wall time.
s.Summary(which may notbe persisted yet when HyDE runs in parallel). Title + first 4K of
content carry strictly more signal anyway.
ingest.global_llm_concurrencyknob (default 12) caps totalLLM-in-flight across both stages so the provider's per-tenant rate
limit isn't blown. Per-stage caps (
summary_concurrency,ingest.hyde.concurrency) still apply.non-fatal;
p.failis only called on parse / persist errors.Rationale (Option A over B)
HyDE only needs
Title + Contentto produce useful questions —the summary was a 60-word hint derived from the same content the
prompt already gets in full. Removing it lets the two stages run
fully concurrently with no per-section ordering. Per-section
pipelining (Option B) would deliver the same wall-time win at
significantly higher orchestration cost.
Test plan
go build ./...cleango vet ./...cleango test ./...— all tests passTestRunParallelStagesInterleavesproves HyDE goroutinecan complete while summarize is still blocked
TestGlobalLLMSemaphoreCapsInFlightproves the sharedsemaphore never lets peak in-flight exceed the configured cap
TestHyDEPromptOmitsSummaryregression guard againstreintroducing the prompt dependency on
s.SummaryTestPipelineRunParallelSummarizeAndHyDEIntegration(gated onTEST_DATABASE_URL) runs the full pipeline against therust-ownership.mdfixture and asserts (a) doc reachesready, (b) every section has a summary, (c) every leaf hascandidate_questions, (d) first HyDE call's timestamp precedes
the last summarize call's
Summary by Sourcery
Run the summarize and HyDE ingest stages in parallel under a configurable global LLM concurrency cap, and update configuration, prompts, and tests to support and validate the new orchestration.
New Features:
Enhancements:
Documentation:
Tests: