Skip to content

feat: concurrent-stream semaphore + rejection metric (Phase 3)#858

Merged
chaholl merged 1 commit intomainfrom
feat/stream-retry-phase3
Apr 5, 2026
Merged

feat: concurrent-stream semaphore + rejection metric (Phase 3)#858
chaholl merged 1 commit intomainfrom
feat/stream-retry-phase3

Conversation

@chaholl
Copy link
Copy Markdown
Contributor

@chaholl chaholl commented Apr 5, 2026

Summary

Completes the three-layer back-pressure story for streaming requests:

Phase Layer Merged
1 Per-call bounded retry (pre-first-chunk window) #855
2 Cross-call retry budget (thundering-herd control) #856
3 Total-in-flight semaphore (goroutine explosion control) this PR

At 1000+ concurrent streams the first two phases don't bound the number of streams a provider can hold open — each active stream costs a goroutine, an idle timer, and channel buffers even when only a handful of HTTP/2 connections are actually needed. The semaphore turns unbounded goroutine growth into back-pressure that surfaces cleanly at the caller's context.

Config (opt-in)

```yaml
stream_max_concurrent: 100 # 0 = unlimited (current default)
```

Top-level provider field; zero or negative disables. Existing providers are unchanged unless they opt in.

Design

  • Context-aware blocking Acquire via `golang.org/x/sync/semaphore`. The caller's context deadline controls fail-fast vs. queueing: a short timeout means "reject me quickly if full", a long one means "queue". Matches the shape of the existing retry budget (both respect caller deadlines rather than inventing their own).
  • Slot acquired BEFORE any HTTP work (retry driver, goroutine spawn, buffer allocation) so saturation rejects cheaply.
  • Slot released in the stream goroutine's defer on the success path, or in a deferred error handler via a `slotReleased` flag — same pattern used for `streams_in_flight` / `provider_calls_in_flight` gauges to guarantee pairing.
  • Nil semaphore is a no-op everywhere so existing providers are unchanged and backwards-compat is preserved.
  • New `streamConcurrencyConfigurable` interface kept independent of `streamRetryConfigurable` — a provider may want concurrency bounds without retry, or vice versa.
  • `BaseProvider.AcquireStreamSlot` / `ReleaseStreamSlot` encapsulate the metric-emitting, nil-safe wrapper so provider code doesn't re-implement the acquire/release/classify logic at every call site.

New metric

`promptkit_stream_concurrency_rejections_total{provider, reason}` — direct-update counter, incremented when Acquire returns a ctx error. Reason label distinguishes:

  • `context_canceled` — caller gave up (explicit cancellation)
  • `deadline_exceeded` — caller's timeout hit while waiting

Sustained spikes in either indicate the semaphore is undersized or upstream is saturated; both are actionable. Per the Phase 3 design-doc section, this is a healthy signal that back-pressure is working.

Coverage

Both OpenAI streaming paths (Chat Completions and Responses API) wired with identical semantics. gpt-5-pro, gpt-4o family, and everything in between share the same code path.

Tests

  • `stream_semaphore_test.go` — constructor nil-safety, limit enforcement under `DeadlineExceeded`, cancellation unblocks waiters, concurrent access under `-race` with an atomic high-water check that verifies the limit is never exceeded (50 workers × 20 ops × burst 10).
  • `stream_semaphore_integration_test.go` — end-to-end metric emission via `BaseProvider.AcquireStreamSlot`: `context_canceled` vs `deadline_exceeded` reason classification, nil-semaphore backwards-compat, release-returns-slot. Tests are serial (not parallel) because they share the `DefaultStreamMetrics` global — documented in the test file.

Coverage on changed files: `stream_semaphore.go` 100%, `stream_metrics.go` 95.1%, `base_provider.go` 93.0%, `openai.go` 87.5%.

Deferred (future work)

  • `promptkit_http_conns_in_use` gauge — needs `httptrace.ClientTrace` plumbing; more involved than the direct-update pattern used elsewhere.
  • Connection pool config exposure (`MaxConnsPerHost`) — transport-construction-time setting that touches every provider factory. Too much churn for this PR; small focused follow-up if/when a deployment actually needs to tune it.

Test plan

  • Unit + integration tests green (`go test ./runtime/... -race`)
  • Pre-commit hook passed (lint, build, coverage ≥80% on all changed files)
  • CI green on this PR
  • Operators can verify in the next capability-matrix run that `promptkit_streams_in_flight` tracks and `promptkit_stream_concurrency_rejections_total` stays zero (no semaphore configured by default in the example)

Related

Adds per-provider concurrency bounds for streaming requests. Closes
the loop on the three layers of back-pressure in the proposal:

- Phase 1: per-call bounded retry (pre-first-chunk window)
- Phase 2: cross-call retry budget (thundering-herd control)
- Phase 3: total-in-flight semaphore (goroutine/timer explosion control)

At 1000+ concurrent streams the first two phases don't bound the
*number* of streams a provider can hold open — each active stream
costs a goroutine, an idle timer, and channel buffers even when only
a handful of h2 connections are actually needed to serve them. The
semaphore turns unbounded goroutine growth into back-pressure that
surfaces cleanly at the caller's context.

Config (opt-in, top-level provider field):

    stream_max_concurrent: 100   # 0 = unlimited (current default)

Design:
- Context-aware blocking Acquire via golang.org/x/sync/semaphore. The
  caller's context deadline controls fail-fast vs. queueing: a short
  timeout means "reject me quickly if full", a long one means "queue".
- Slot is acquired BEFORE any HTTP work (retry driver, goroutine
  spawn, buffer allocation) so saturation rejects cheaply.
- Slot is released in the stream goroutine's defer on the success
  path, or in the deferred error handler via a `slotReleased` flag
  pattern matching how streams_in_flight and provider_calls_in_flight
  gauges are managed.
- Nil semaphore is a no-op everywhere (unlimited), so existing
  providers are unchanged unless they opt in via config.

Wired through both OpenAI streaming paths (Chat Completions and
Responses API) with identical semantics. A new
streamConcurrencyConfigurable interface on BaseProvider keeps the
semaphore-apply code independent of streamRetryConfigurable — a
provider may want concurrency bounds without retry, or vice versa.

New metric:
- promptkit_stream_concurrency_rejections_total{provider,reason}
  Direct-update counter, incremented when Acquire returns a ctx
  error. Reason label distinguishes "context_canceled" (caller gave
  up) from "deadline_exceeded" (timeout hit). Sustained spikes
  indicate the semaphore is undersized or upstream is saturated;
  both are actionable.

BaseProvider helpers AcquireStreamSlot/ReleaseStreamSlot encapsulate
the metric-emitting, nil-safe wrapper around the semaphore so
provider code doesn't re-implement the acquire/release/classify
logic at every call site.

Tests:
- stream_semaphore_test.go: constructor nil-safety, limit
  enforcement (under DeadlineExceeded), cancellation unblocks
  waiters, concurrent access under -race with an atomic high-water
  check that verifies the limit is never exceeded.
- stream_semaphore_integration_test.go: BaseProvider-level tests
  that exercise the metric emission path end-to-end (context_canceled
  and deadline_exceeded reasons), nil semaphore backwards-compat,
  release returns the slot.

Deferred to future phases:
- http_conns_in_use gauge (needs httptrace plumbing)
- Connection pool config exposure (MaxConnsPerHost) — touches every
  provider factory; too much churn for this PR
@chaholl chaholl merged commit 7e833a8 into main Apr 5, 2026
29 checks passed
@chaholl chaholl deleted the feat/stream-retry-phase3 branch April 5, 2026 15:45
@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud Bot commented Apr 5, 2026

chaholl added a commit that referenced this pull request Apr 5, 2026
Partial fix for #860 — wires the streaming retry driver, retry
budget, and concurrent-stream semaphore through Claude and VLLM's
streaming paths. Gemini moves to a follow-up because it uses a
streaming JSON array (parsed via json.Decoder) rather than SSE,
and so cannot share the OpenAI peekFirstSSEEvent path until the
pluggable frame detector in #861 lands.

## What each provider now gets

Both Claude (non-Bedrock path) and VLLM now go through
providers.OpenStreamWithRetryRequest with:

- Pre-first-chunk retry on transient h2 failures (Phase 1)
- Cross-call retry budget for herd-kill containment (Phase 2)
- Per-provider concurrent-stream semaphore (Phase 3)
- Direct-update metrics: streams_in_flight,
  provider_calls_in_flight, stream_first_chunk_latency_seconds,
  stream_retries_total, stream_retry_budget_available,
  stream_concurrency_rejections_total

All existing provider tests still pass under -race.

## Implementation notes

The wiring follows the OpenAI reference pattern established in
PRs #855, #856, #858 byte-for-byte:

1. Build a requestFn closure that re-constructs the HTTP request
   per attempt with fresh auth headers and body reader.
2. Acquire a stream slot via AcquireStreamSlot(ctx). Nil semaphore
   is a no-op; saturation blocks on ctx.
3. Inc in-flight gauges with a released flag so the deferred
   cleanup correctly handles early-error paths without
   double-counting.
4. Delegate to OpenStreamWithRetryRequest with policy, budget,
   host label, and idle timeout.
5. On success: spawn the stream goroutine with deferred Dec +
   ReleaseStreamSlot; flip released=true so the outer defer is a
   no-op.

## Claude Bedrock path deliberately unchanged

runtime/providers/claude/claude_streaming.go has an isBedrock()
branch that uses AWS binary eventstream framing, not SSE. That
path is not covered by this PR and is tracked separately in #865
(AWS Bedrock eventstream retry support), which depends on the
frame detector abstraction from #861.

## Why not Gemini too

Discovered while starting this work: Gemini's PredictStream in
runtime/providers/gemini/gemini_streaming.go does not use SSE at
all. It makes a POST to streamGenerateContent which returns a JSON
array ([{...}, {...}, ...]) parsed incrementally with
json.Decoder. The retry driver's peekFirstSSEEvent expects
SSE data: ...\n\n framing and would hang waiting for a blank
line that never arrives.

Gemini requires a new JSON-array frame detector, which is an
extension of the frame-detector abstraction tracked in #861. Both
#860 and #861 have been updated to reflect this finding. Gemini
will land in a separate PR after #861 grows a JSON-array variant.
chaholl added a commit that referenced this pull request Apr 5, 2026
* feat: extend streaming retry/budget/semaphore to Claude and VLLM

Partial fix for #860 — wires the streaming retry driver, retry
budget, and concurrent-stream semaphore through Claude and VLLM's
streaming paths. Gemini moves to a follow-up because it uses a
streaming JSON array (parsed via json.Decoder) rather than SSE,
and so cannot share the OpenAI peekFirstSSEEvent path until the
pluggable frame detector in #861 lands.

## What each provider now gets

Both Claude (non-Bedrock path) and VLLM now go through
providers.OpenStreamWithRetryRequest with:

- Pre-first-chunk retry on transient h2 failures (Phase 1)
- Cross-call retry budget for herd-kill containment (Phase 2)
- Per-provider concurrent-stream semaphore (Phase 3)
- Direct-update metrics: streams_in_flight,
  provider_calls_in_flight, stream_first_chunk_latency_seconds,
  stream_retries_total, stream_retry_budget_available,
  stream_concurrency_rejections_total

All existing provider tests still pass under -race.

## Implementation notes

The wiring follows the OpenAI reference pattern established in
PRs #855, #856, #858 byte-for-byte:

1. Build a requestFn closure that re-constructs the HTTP request
   per attempt with fresh auth headers and body reader.
2. Acquire a stream slot via AcquireStreamSlot(ctx). Nil semaphore
   is a no-op; saturation blocks on ctx.
3. Inc in-flight gauges with a released flag so the deferred
   cleanup correctly handles early-error paths without
   double-counting.
4. Delegate to OpenStreamWithRetryRequest with policy, budget,
   host label, and idle timeout.
5. On success: spawn the stream goroutine with deferred Dec +
   ReleaseStreamSlot; flip released=true so the outer defer is a
   no-op.

## Claude Bedrock path deliberately unchanged

runtime/providers/claude/claude_streaming.go has an isBedrock()
branch that uses AWS binary eventstream framing, not SSE. That
path is not covered by this PR and is tracked separately in #865
(AWS Bedrock eventstream retry support), which depends on the
frame detector abstraction from #861.

## Why not Gemini too

Discovered while starting this work: Gemini's PredictStream in
runtime/providers/gemini/gemini_streaming.go does not use SSE at
all. It makes a POST to streamGenerateContent which returns a JSON
array ([{...}, {...}, ...]) parsed incrementally with
json.Decoder. The retry driver's peekFirstSSEEvent expects
SSE data: ...\n\n framing and would hang waiting for a blank
line that never arrives.

Gemini requires a new JSON-array frame detector, which is an
extension of the frame-detector abstraction tracked in #861. Both
#860 and #861 have been updated to reflect this finding. Gemini
will land in a separate PR after #861 grows a JSON-array variant.

* refactor: extract RunStreamingRequest helper to deduplicate acquire/release scaffolding

SonarCloud quality gate flagged 42.9% duplication on new code in PR #868
because the acquire/release/gauge-bookkeeping scaffolding around
OpenStreamWithRetryRequest was copy-pasted across four streaming
functions (OpenAI Chat Completions, OpenAI Responses API, Claude,
VLLM) — each with ~60 lines of identical defer discipline.

Extract the pattern into BaseProvider.RunStreamingRequest, a helper
that:

  1. Acquires a concurrent-stream slot (nil-safe, ctx-aware)
  2. Inc's streams_in_flight / provider_calls_in_flight gauges
  3. Delegates to OpenStreamWithRetryRequest
  4. Spawns the consumer goroutine with deferred gauge Dec +
     slot release
  5. Handles the 'released' / 'slotReleased' flag pattern internally
     so the error paths correctly return resources without
     double-counting

All four streaming call sites now reduce to a single call:

    return p.RunStreamingRequest(ctx, &providers.StreamRetryRequest{
        Policy: ..., Budget: ..., RequestFn: requestFn, ...
    }, p.streamResponse)

The StreamConsumer callback has the shape
func(ctx, body, outChan) — same as every existing streamResponse
function, so the refactor is a pure signature-matching exercise
for the OpenAI paths and adds an inline closure for Claude (which
wraps the body in IdleTimeoutReader + SSEScanner before delegating
to its streamResponse).

Net diff: ~-200 / +100 lines. SonarCloud duplication on new code
should drop from 42.9% to effectively zero since the only remaining
'new' code is the req struct literal at each call site.

All provider tests green under -race.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant