Skip to content

Add vLLM KV-event subscriber → ReportCacheState (C1)#15

Open
EdHasNoLife wants to merge 15 commits into
mainfrom
sunxuedward/cac-21-c1-vllm-kv-event-subscriber
Open

Add vLLM KV-event subscriber → ReportCacheState (C1)#15
EdHasNoLife wants to merge 15 commits into
mainfrom
sunxuedward/cac-21-c1-vllm-kv-event-subscriber

Conversation

@EdHasNoLife
Copy link
Copy Markdown
Collaborator

Summary

Adds the vLLM KV-event subscriber — a sidecar that subscribes to a vLLM engine replica's KV-cache events over ZMQ and reports cache state to the policy server, keeping the B6 CacheIndex live from real engine events. Metadata only — never tokens or prompt text. Lands in pkg/adapters/engine/ + a new cmd/kvevent-subscriber binary.

  • events.go — decodes vLLM's msgpack EventBatch wire format (msgspec array-tagged) into BlockStored / BlockRemoved / AllBlocksCleared. token_ids / parent_block_hash / lora_id are never materialized into the Go types (metadata-only by construction); unknown event tags are skipped (forward-compatible).
  • mapper.go — translates to the merged gRPC contract: BlockStoredCacheStateUpdate (additive, via ReportCacheState), BlockRemovedPREFIX_EVICTED, AllBlocksClearedALL_CLEARED (via PublishEvent). uint64 block hashes encode as 8-byte big-endian prefix_hash; every message is stamped with replica/model/tenant/hash_scheme.
  • forwarder.goReporter debounces adds over a window onto a long-lived ReportCacheState stream; removals go via unary PublishEvent. Fail-soft: reconnect-with-backoff, errors logged not propagated, a clear supersedes buffered adds, graceful-shutdown final flush.
  • subscriber.go — ZMQ SUB loop (pure-Go go-zeromq/zmq4) behind a frameSource interface; reconnects with backoff; malformed batches dropped.
  • cmd/kvevent-subscriber — sidecar entrypoint (engine endpoint, topic, server, replica/model/tenant/hash-scheme, window) + signal-driven drain.

No proto/CRD changes. Vendor-neutral; new deps: vmihailenco/msgpack/v5, go-zeromq/zmq4.

Verification

Unit / integration (CI):

  • decode (incl. >2⁶³ hashes, malformed input, unknown-tag skip), mapping, hash encoding, config validation.
  • forwarder over bufconn to a recording gRPC server (stream adds, PublishEvent removals, clear-supersedes-adds).
  • subscribe loop via a fake frame source (decode + skip-malformed + reconnect).
  • make pre-pr green (naming + fmt + vet + test + build + no drift).

Live end-to-end (run locally, GPU-free): real vLLM 0.21.0 CPU engine → ZMQ → kvevent-subscriber → policy server → B6 index. Two prefix requests populated the index: inferencecache_index_entries{model="qwen"} 17. Confirms the pure-Go ZMQ ↔ vLLM libzmq interop and the full add path against the real server.

Follow-ups (out of scope)

  • ZMQ replay (buffer_steps/ROUTER) for gap recovery on reconnect — soft state tolerates loss for now.
  • A container image + sidecar manifest for the subscriber.

First slice of the vLLM KV-event subscriber (pkg/adapters/engine):
- events.go: decode vLLM's msgpack EventBatch wire format (msgspec array-tagged)
  into BlockStored/BlockRemoved/AllBlocksCleared. token_ids/parent_block_hash/
  lora_id are intentionally never materialized — metadata only. Unknown event
  tags are skipped (forward-compatible).
- mapper.go: translate events to the gRPC contract — BlockStored →
  CacheStateUpdate (ReportCacheState, additive), BlockRemoved → PREFIX_EVICTED,
  AllBlocksCleared → ALL_CLEARED. uint64 block hashes encode as 8-byte big-endian
  prefix_hash bytes; every message stamped with replica/model/tenant/hash_scheme.
- config.go: per-replica identity + validation (non-empty hash_scheme, which
  would otherwise be dropped server-side).
- Unit tests for decode (incl. >2^63 hashes, malformed input, unknown-tag skip)
  and mapping/validation. Adds vmihailenco/msgpack/v5.

ZMQ subscriber + gRPC forwarder + cmd entrypoint + live integration follow.
- forwarder.go: Reporter forwards events to the server — BlockStored adds are
  debounced over a window onto a long-lived ReportCacheState stream; removals go
  via unary PublishEvent (PREFIX_EVICTED / ALL_CLEARED). Fail-soft: stream
  reconnects with backoff, errors are logged not propagated, a clear supersedes
  buffered adds. bufconn integration test against a recording gRPC server.
- subscriber.go: ZMQ SUB loop behind a frameSource interface (pure-Go
  go-zeromq/zmq4), decodes each batch and emits it; reconnects with backoff;
  malformed batches are dropped. Unit-tested via a fake source (real socket
  interop verified separately against a live engine).
- cmd/kvevent-subscriber: sidecar entrypoint wiring config (engine endpoint,
  topic, server, replica/model/tenant/hash-scheme, window) + signal-driven
  graceful drain.
- Build the new binary in `make build`; refresh package doc to the sidecar model.
@github-actions
Copy link
Copy Markdown

Codex review

Blocking

  • pkg/adapters/engine/events.go, pkg/adapters/engine/events.go, pkg/adapters/engine/mapper.go: the adapter hard-codes vLLM block hashes as uint64 and decodes []uint64, then re-encodes to 8-byte big-endian. vLLM’s own subscriber types model block_hashes as list[ExternalBlockHash], and ExternalBlockHash can be bytes or int; the conversion path returns raw bytes unless int hashes are explicitly enabled. This means default byte-hash events are dropped as undecodable, so ReportCacheState never receives live cache state. Decode both msgpack binary/bytes and uint/int, preserve raw bytes for prefix_hash, and add wire tests for byte hashes. (docs.vllm.ai)

Should-fix

  • cmd/kvevent-subscriber/main.go, cmd/kvevent-subscriber/main.go, pkg/adapters/engine/forwarder.go: shutdown cancels the reporter context instead of closing/draining out. Any batches already decoded into the channel may be skipped because Reporter.Run can select ctx.Done() and exit before consuming them; the final flush only sends prefixes already moved into pending. For the claimed signal-driven drain, stop the subscriber, close out, and let the reporter drain with a bounded flush context.

Nit

Vendor-neutral naming and proto/CRD scope look fine. I did not run the test suite in this read-only sandbox; git diff --check reported no whitespace errors.

Verdict: changes-requested.

@github-actions
Copy link
Copy Markdown

Codex review

Blocking
None.

Should-fix

  • pkg/adapters/engine/forwarder.go:69 / pkg/adapters/engine/forwarder.go:119: the shutdown “final flush” does not work once a report stream already exists. On SIGTERM, ctx is cancelled, but flush() keeps the existing stream because ensureStream() returns early when stream != nil; that stream was opened with the cancelled ctx, so Send() fails and the pending prefixes are dropped. This loses any adds buffered since the previous ticker flush. Close/reopen the stream with flushCtx before the final flush, or otherwise ensure the final send uses a non-cancelled stream. Add a test covering shutdown with an already-open stream and pending adds.
  • cmd/kvevent-subscriber/main.go:34 / pkg/adapters/engine/forwarder.go:61: -window=0 or a negative duration panics via time.NewTicker. Validate the flag/options and exit cleanly.

Nit
None.

Vendor-neutral surfaces and proto/CRD contract look clean: no forbidden OCI/Oracle identity leakage, no proto changes, and the adapter stays metadata-only.

I could not run tests in this sandbox because the filesystem is read-only and Go could not create a build cache. git diff --check passed.

Verdict: changes-requested

@github-actions
Copy link
Copy Markdown

Codex review

Blocking
None.

Should-Fix

  • pkg/adapters/engine/forwarder.go:90 / pkg/adapters/engine/forwarder.go:98: BlockStored adds are buffered, but BlockRemoved events are published immediately without removing matching hashes from pending. If a block is stored and evicted within the debounce window, the eviction reaches the server before the add, likely no-ops, then the later flush re-adds an already-evicted prefix until TTL. AllBlocksCleared handles this with pending = pending[:0]; BlockRemoved needs equivalent pending de-dup/removal for the evicted hashes, plus an over-the-wire test for add-then-remove-within-window.
  • pkg/adapters/engine/mapper.go:52: every stored block is reported with TokenCount: ev.BlockSize, but the index treats token_count as “how many tokens that prefix covers” (pkg/index/index.go:41). For multi-block prefixes this under-reports matched_tokens and score. If vLLM block hashes represent cumulative prefix hashes, this should be cumulative per block, not fixed block size. If this is intentionally “block-local” count, the contract/design should clarify that because it conflicts with current index semantics.

Nit

  • cmd/kvevent-subscriber/main.go:34 + pkg/adapters/engine/forwarder.go:61: -window=0 or a negative duration will panic in time.NewTicker. Validate the flag or clamp it, and add a small config/constructor test.

Verdict
changes-requested.

I could not run go test in this environment because the filesystem is read-only and Go could not create a build cache.

… guard

- Block hashes are now opaque bytes ([][]byte). vLLM's ExternalBlockHash is a
  union of bytes and int; the decoder accepts both (binary passes through, int
  normalizes to 8-byte big-endian). Previously only uint64 was decoded, so a
  byte-hash engine would have its batches dropped as undecodable. Adds a
  byte-hash wire test. (Blocking)
- Graceful shutdown now truly drains: the reporter runs on a background context
  and stops by draining a closed `out` channel, so batches already buffered on
  shutdown are flushed instead of lost to a ctx-cancel race; the final flush
  reopens the stream under a detached context. Adds a shutdown-flush test.
- NewReporter clamps a non-positive window so time.NewTicker can't panic
  (-window=0). Adds a clamp test.
@EdHasNoLife
Copy link
Copy Markdown
Collaborator Author

Addressed the Codex review in the latest commit (also rebased onto current main):

  • Blocking — hash type: block hashes are now opaque [][]byte. The decoder accepts both vLLM ExternalBlockHash variants — binary passes through verbatim, integer hashes normalize to 8-byte big-endian. Previously only uint64 was decoded, so a byte-hash engine's batches would have been dropped as undecodable. Added a byte-hash wire test (TestDecodeByteHashes).
  • Should-fix — graceful drain: the reporter now runs on a background context and stops by draining a closed out channel (the subscriber owns the signal context), so batches already buffered at shutdown are flushed instead of lost to a ctx-cancel race. The final flush also drops and reopens the report stream under a detached context (the run-ctx stream is dead after cancel). Added TestReporterFlushesPendingOnShutdown.
  • Nit — -window=0 panic: NewReporter clamps a non-positive window (and backoff). Added TestNewReporterClampsWindow.

Verified locally: make verify-no-internal-refs, race tests, and the coverage gate (78.6% ≥ 65%) all pass.

@github-actions
Copy link
Copy Markdown

Codex review

Blocking
None found.

Should-fix

Nit

Vendor-neutral naming looks fine: no OCI/Oracle identity leakage, and the vLLM-specific code stays under pkg/adapters/.... No proto/CRD changes were introduced.

Verdict: changes-requested.

…ckoff

Codex round 2:
- Reporter now uses a bounded per-flush/per-call context for every gRPC op
  (fresh time-bounded ReportCacheState stream per flush; bounded PublishEvent),
  so a stalled/unreachable server can't block the loop indefinitely — restores
  the fail-soft sidecar contract. Removes the long-lived-stream lifecycle.
- Decoder rejects a known tag with a truncated tuple (e.g. BlockStored without
  block_size) instead of indexing token_count=0. Adds a test.
- pendTs is reset after each flush (so a later ts<=0 batch keeps "0 = server now").
- NewSubscriber clamps a non-positive backoff (no tight reconnect loop).
@EdHasNoLife
Copy link
Copy Markdown
Collaborator Author

Round 2 addressed in the latest commit:

  • Bounded gRPC contexts: every RPC now uses its own time-bounded context — each flush opens a fresh, bounded ReportCacheState stream (send + CloseAndRecv), and PublishEvent is per-call bounded (default 5s, WithRPCTimeout). A stalled/unreachable server can no longer block the loop indefinitely, restoring the fail-soft contract. This also removed the long-lived-stream lifecycle.
  • Malformed known tags rejected: a BlockStored truncated before block_size (or any known tag with too few fields) is now an error and the batch is dropped/logged, instead of being indexed with token_count=0. Added TestDecodeBlockStoredMissingBlockSizeIsError.
  • pendTs reset after each flush, preserving "0 = server now" for a later non-timestamped batch.
  • Subscriber backoff clamp: NewSubscriber clamps a non-positive backoff, matching Reporter.

Verified: race tests clean, coverage 79.0% ≥ 65%, make pre-pr green.

@github-actions
Copy link
Copy Markdown

Codex review

Blocking
None.

Should-fix

  • pkg/adapters/engine/mapper.go:45: StoredPrefixes reports every hash in a BlockStored as TokenCount: ev.BlockSize. That undercounts multi-block prefixes: the contract’s PrefixEntry.token_count feeds matched_tokens/ranking, and vLLM’s BlockStored carries block_hashes, token_ids, and block_size separately, with one event able to cover multiple blocks. The repo’s existing reference subscriber redacts token_ids to a count rather than using block_size alone. This should derive the covered token count per reported prefix without retaining token contents, and add a multi-block test. (docs.vllm.ai)

Nit

  • pkg/adapters/engine/events.go:14: the comment says EventBatch = [ts, events], but current vLLM also carries data-parallel metadata on the batch. The decoder tolerates extra fields, so this is just a comment/doc accuracy issue. (docs.vllm.ai)

Verification
I attempted go test ./pkg/adapters/engine ./pkg/index ./pkg/server, but this environment is read-only and cannot create a Go build cache under /home/runner/.cache or /tmp.

Verdict
changes-requested.

Codex round 3 (should-fix + nit):
- StoredPrefixes now reports a cumulative token_count per block hash —
  (i+1)*block_size — since vLLM block hashes chain their parent, so block i's
  hash covers (i+1) blocks. Constant block_size flattened the matched_tokens
  ranking signal. Counts within-event only (parent length isn't in the event);
  never uses token contents. Updated the mapper test to assert [128, 256].
- events.go: note the EventBatch may carry trailing fields (e.g. a data-parallel
  rank) which the decoder ignores.
@EdHasNoLife
Copy link
Copy Markdown
Collaborator Author

Round 3 addressed in b838695: StoredPrefixes now reports a cumulative token_count per block hash — (i+1)*block_size — since vLLM block hashes chain their parent (block i covers i+1 blocks), so the matched_tokens ranking signal is preserved instead of flattened. It counts within-event tokens only (the parent prefix's length isn't in the event) and never touches token contents; mapper test asserts [128, 256]. Also documented that the EventBatch may carry trailing fields (e.g. a data-parallel rank) which the decoder ignores. Race + coverage (78.8%) + pre-pr green.

@github-actions
Copy link
Copy Markdown

Codex review

Blocking

  • pkg/adapters/engine/forwarder.go:98 / pkg/adapters/engine/forwarder.go:106: BlockStored entries are buffered, but BlockRemoved is published immediately without flushing or removing matching pending adds. If a block is stored and evicted within the debounce window, the server sees the eviction first, then the later flush re-adds the evicted prefix. Because CacheStateUpdate is additive and removals are the pruning path (docs/design/grpc-contract.md:69), this can leave stale entries in the index until TTL. Fix by preserving event order, e.g. flush pending adds before publishing removals, or filter matching pending prefixes before publishing the eviction. Add an over-the-wire test for stored-then-removed within one window.

Should-fix

  • None.

Nit

  • None.

I did not find vendor-neutral naming, proto/CRD, or metadata-contract violations in the PR. I could not run tests in this environment because the filesystem is read-only and Go cannot create its build cache.

Verdict: changes-requested.

Codex round 4 (blocking): adds are debounced but removals published immediately,
so a block stored-then-evicted within one window sent the eviction first and the
later flush re-added the evicted prefix (additive adds vs. removal pruning) —
leaving a stale index entry until TTL. The BlockRemoved path now flushes pending
adds before publishing the eviction, preserving order. Adds an over-the-wire
test asserting the add is received before the evict.
@EdHasNoLife
Copy link
Copy Markdown
Collaborator Author

Round 4 (blocking) addressed in 5126603: the BlockRemoved path now flushes pending adds before publishing the eviction, so a block stored-then-evicted within one debounce window reaches the server in store→evict order — the additive add can no longer overtake the removal and re-create the evicted prefix. Added an over-the-wire test (TestReporterFlushesAddsBeforeRemoval) asserting the add is received before the evict. Race + coverage (79.1%) + pre-pr green.

@github-actions
Copy link
Copy Markdown

Codex review

Blocking
None.

Should-fix

  • pkg/adapters/engine/events.go: BlockStored.block_size is accepted as any int32, including 0 or negative values. pkg/adapters/engine/mapper.go then forwards zero/negative token_count, which can create a PREFIX_MATCH hint with no valid matched tokens instead of dropping a malformed batch fail-open. Validate block_size > 0 during decode or before mapping.

Nit
None.

Verdict
changes-requested

I did not find vendor-neutral naming violations in the new core identity surfaces, and no proto/CRD/generated-code changes were introduced. I could not run go test because the sandbox filesystem is read-only and Go could not create a build cache.

Codex should-fix: a BlockStored with block_size <= 0 would forward a
zero/negative token_count and could create a PREFIX_MATCH hint with no valid
matched tokens. Reject it during decode so the batch fails open instead. Test.
@EdHasNoLife
Copy link
Copy Markdown
Collaborator Author

Addressed the latest should-fix in aac4782: a BlockStored with block_size <= 0 is now rejected during decode (fails open) rather than forwarding a zero/negative token_count that could yield a bogus PREFIX_MATCH hint. Added a test covering 0 and -1. Race + coverage + pre-pr green; no blocking items outstanding.

@github-actions
Copy link
Copy Markdown

Codex review

Blocking
None.

Should-fix
None.

Nit
None.

Verdict: approve.

I reviewed the PR diff and surrounding contract/index/server code. The changes stay in the optional engine adapter/sidecar path, do not touch proto/CRD/core identity, preserve metadata-only forwarding, and include focused tests for decode, mapping, forwarding, and subscriber control flow.

Verification note: I could not run go test here because the filesystem sandbox is read-only, so Go could not create a build cache under /tmp.

…-vllm-kv-event-subscriber

# Conflicts:
#	go.sum
@github-actions
Copy link
Copy Markdown

Codex review

Blocking
None.

Should-fix

  • pkg/adapters/engine/subscriber.go:143: dialZMQ does not close the SUB socket when sub.Dial(endpoint) fails. Since Subscriber.Run retries forever, a bad/unavailable endpoint can leak a socket/resources every backoff cycle. Close sub on the dial-error path, matching the existing cleanup on subscribe failure.

Nit
None.

Verdict
changes-requested

I did not find vendor-neutral naming violations or proto/CRD contract drift in this PR. I could not run tests in this environment because the filesystem is read-only and Go cannot create its build cache, even with GOCACHE=/tmp/go-build.

@EdHasNoLife
Copy link
Copy Markdown
Collaborator Author

Addressed the should-fix in a686dd4: dialZMQ now closes the SUB socket on the Dial failure path too (it only closed on the subscribe-failure path), so a bad/unavailable endpoint no longer leaks a socket every reconnect-backoff cycle. Also synced the branch with main (resolved a go.sum conflict via go mod tidy); CI + pre-pr green.

@github-actions
Copy link
Copy Markdown

Codex review

Blocking
None.

Should-Fix

  • pkg/adapters/engine/events.go: hashToBytes can panic on malformed hash elements. If block_hashes contains msgpack nil, msgpack.Unmarshal into interface{} yields nil; reflect.ValueOf(v).Kind() then panics instead of returning an error. Because subscriber.go calls DecodeEventBatch expecting malformed batches to be logged and skipped, one bad engine frame can crash the sidecar. Add an explicit v == nil guard and a regression test for []interface{}{nil} in block_hashes.

Nit
None.

Verdict
changes-requested

I also checked the vendor-neutral rule against the changed files; no OCI/Oracle core identity issue found. Proto/CRD files are unchanged. I could not run go test because the review sandbox is read-only and Go could not create its build cache; git diff --check passed.

A msgpack nil in block_hashes reached reflect-based normalization; guard it with
an explicit nil check that returns an error so one malformed engine frame is
logged and skipped (fail-soft) rather than risking a decoder panic. Test added.
@EdHasNoLife
Copy link
Copy Markdown
Collaborator Author

Addressed in 0cdfad2: hashToBytes now guards v == nil and returns an error, so a msgpack nil in block_hashes is logged + skipped (fail-soft) rather than reaching the reflect path. Added a regression test ([]interface{}{nil} in block_hashes).

@github-actions
Copy link
Copy Markdown

Codex review

Blocking
None.

Should-Fix
None.

Nit
None.

Verdict
Approve.

I reviewed the PR diff and surrounding contract/index code. The changes stay in the optional engine adapter plus a sidecar command, do not touch proto/CRD surfaces, do not introduce OCI/Oracle naming in the PR diff, preserve metadata-only reporting, and include focused unit/integration coverage for decoding, mapping, forwarding, and subscriber behavior.

I could not run tests in this sandbox: go test failed because the filesystem is read-only and Go cannot create its build cache.

@github-actions
Copy link
Copy Markdown

Codex review

Blocking
None.

Should-fix

  • pkg/adapters/engine/mapper.go:40: StoredPrefixes sets token_count as (i+1) * block_size, explicitly ignoring parent_block_hash. For any BlockStored batch that extends an existing parent prefix, this underreports the actual prefix length. The contract exposes PrefixEntry.token_count and lookup returns it as matched_tokens, and the index uses it for ranking, so this can produce incorrect route ordering and misleading match metadata. See docs/design/grpc-contract.md:50-52 and pkg/index/index.go:318-322. The adapter should track parent hash lengths or otherwise emit the real prefix token count.

Nit

  • The PR body says adds use a “long-lived ReportCacheState stream,” but pkg/adapters/engine/forwarder.go:128-145 opens a fresh client stream per flush. The implementation is internally consistent, but the PR description should match it.

Verdict
Changes requested.

I could not run tests in this environment because the filesystem is read-only and Go could not initialize the build cache: mkdir /home/runner/.cache: read-only file system.

@github-actions
Copy link
Copy Markdown

Codex review

Blocking

  • pkg/adapters/engine/forwarder.go:112: BlockRemoved publishes one unary RPC per hash, each with its own 5s timeout. If the server is down and vLLM emits a large eviction batch, Run can block for len(block_hashes) * rpcTimeout, stop draining out, and effectively wedge the subscriber. That violates the PR’s fail-soft behavior. Bound the whole removal batch, coalesce where possible, or use a non-blocking/bounded failure path, and add an unavailable-server test.

Should-fix

  • pkg/adapters/engine/mapper.go:47: TokenCount is fabricated as (i+1) * block_size, ignoring parent_block_hash / prior prefix length. For appended blocks, LookupRoute will return materially wrong matched_tokens and ranking scores because the index uses this field directly. Track observed parent hash lengths or otherwise send an accurate count; if exact count is impossible after reconnect, handle that explicitly rather than silently under-reporting.

Nit

  • Tests could not be run in this read-only review sandbox because Go could not create its build cache under either the default path or /tmp.

Verdict: changes-requested.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant