Skip to content

fix(ingest): 26.05.1 critical hardening — 5 criticals + review-pass cleanup#413

Merged
xe-nvdk merged 9 commits intomainfrom
fix/ingest-26.05.1-criticals
Apr 28, 2026
Merged

fix(ingest): 26.05.1 critical hardening — 5 criticals + review-pass cleanup#413
xe-nvdk merged 9 commits intomainfrom
fix/ingest-26.05.1-criticals

Conversation

@xe-nvdk
Copy link
Copy Markdown
Member

@xe-nvdk xe-nvdk commented Apr 27, 2026

Summary

Five critical bugs surfaced by a 4-agent post-implementation review of the ingest path; this PR fixes all five plus the consolidated findings from a second 4-agent review of the fix branch (real bugs introduced, plus refactor/test/comment cleanups). All five are 26.05.1 GA blockers; fixing them does not regress the headline number — sustained 60s MessagePack columnar benchmark on this branch shows 19.04M rec/s @ p99 3.13ms vs baseline 18.6M rec/s @ p99 3.68ms (~17% p99 improvement, ~2% throughput uplift, 0 errors).

Critical fixes

  • C1 — graceful-shutdown panic: Close() closed flushQueue after cancel(), racing concurrent writers past shard.mu.Unlock(). Fixed: closing atomic.Bool flag set before cancel, channel never closed (workers exit on b.ctx.Done()).
  • C2 — schema-evolution corruption under concurrent writes: race window inside flushBufferLocked could install a third schema mid-flight; original caller appended into the racer's buffer. Fixed: bounded loop (8 iterations) re-checks schema after I/O + per-iteration ctx.Err() check.
  • C3 — WAL silent backpressure drop: AppendRaw/AppendRawWithMeta returned nil on full channel, making downstream "data preserved in WAL" log lines untrue. Fixed: typed wal.ErrWALDropped sentinel; ingest caller differentiates via errors.Is + sampled Warn + dedicated total_wal_dropped counter; cluster receivers treat as non-fatal (don't advance lastSeq to silent divergence).
  • C4 — gzip decompression bomb in LP/TLE handlers: Fiber's c.Body() transparently gunzipped without a size cap. Fixed: c.Request().Body() (raw) + magic-byte detection + pooled bounded decompressor (mirrors msgpack pattern).
  • C5 — ingest endpoints missing write-tier auth: read-only token could write data via /api/v1/write/* when RBAC was disabled (OSS default). Fixed: explicit auth.RequireWrite/RequireAdmin middleware on all 9 endpoints via new internal/api/auth_middleware.go helpers; SetAuthAndRBAC takes concrete *auth.AuthManager directly to eliminate silent type-assert misses.

Review-pass refactors and cleanups

  • Extract Writer.tryEnqueue (kills 2 copies of the WAL drop pattern).
  • Extract ArrowBuffer.tryEnqueueFlush (kills 4 copies of the closing-flag + select-with-ctx-arm + queue-full pattern). Returns a flushSendOutcome enum so callers know queued / closing / ctx-canceled / queue-full distinctly.
  • Extract ArrowBuffer.flushOnSchemaChangeLocked (kills 2 copies of the schema-evolution loop).
  • Extract withWriteAuth / withAdminAuth + passthroughMiddleware (collapses if/else registration in 4 ingest handlers).

Test plan

  • go build ./cmd/... ./internal/... clean
  • go vet ./cmd/... ./internal/... clean
  • go test ./internal/ingest/... ./internal/wal/... ./internal/api/... ./internal/cluster/... (all green, ~95s)
  • Race-detector pass on new tests: go test -race -run "CloseRace|WriteAfterClose|SchemaEvolutionConcurrent|ErrWALDropped" all green
  • Sustained 60s MessagePack columnar benchmark: 19.04M rec/s @ p99 3.13ms, 0 errors
  • New regression tests are deterministic (no t.Skip on fast machines, no fixed time.Sleep on slow ones)
  • Manual: hit /api/v1/write/msgpack with a read-only token, verify 403 (was 200)
  • Manual: post a 1MB gzip-bomb to /write with Content-Encoding: gzip, verify rejection (was OOM)

Process notes

  • Two rounds of 4-agent internal review (correctness / security / code-quality / perf) ran on the branch before opening this PR; all blocking findings from both rounds are addressed in this single commit.
  • Three of the four reviewers in round 2 independently flagged the cluster-receiver fallout from the C3 fix — that's the kind of cross-cutting find the new CLAUDE.md review framework is designed to catch before shipping to gemini.

🤖 Generated with Claude Code

…leanup

Five critical bugs surfaced by a 4-agent post-implementation review of
the ingest path; this commit fixes all five PLUS the consolidated
findings from a second 4-agent review of the fix branch.

CRITICAL — bugs that block 26.05.1 GA:

- C1: ArrowBuffer.Close() raced concurrent Write() goroutines via
  close(flushQueue) after cancel(). A writer past shard.mu.Unlock()
  but not yet at the channel send would panic with "send on closed
  channel" during graceful shutdown. Fixed: closing atomic.Bool flag
  set BEFORE cancel(); flushQueue is no longer closed (workers exit
  via b.ctx.Done()); senders short-circuit on the flag with a
  ctx.Done() defense-in-depth select arm. Regression test:
  TestArrowBuffer_CloseRaceWithConcurrentWrites uses wait-for-evidence
  on totalRecordsBuffered (not a fixed Sleep) for stability on slow CI.

- C2: Schema-evolution flush released shard.mu for I/O; a concurrent
  writer could install a third schema in the I/O window and the
  original caller would append records into the racer's schema buffer,
  mixing schemas and producing column mismatches at flush. Fixed:
  bounded loop (8 iterations) re-checks bufferSchemas after the I/O
  returns under re-acquired lock + ctx.Err() check on every iteration
  so a cancelled request isn't starved. Extracted into
  flushOnSchemaChangeLocked helper (was duplicated across both write
  paths). Regression test:
  TestArrowBuffer_SchemaEvolutionConcurrentNoCorruption.

- C3: WAL.AppendRaw / AppendRawWithMeta silently incremented
  DroppedEntries and returned nil on channel-full backpressure,
  making downstream "data preserved in WAL" log lines untrue.
  Operators couldn't distinguish backpressure drops from real I/O
  failures. Fixed: wal.ErrWALDropped sentinel returned on drop. Both
  drop sites consolidated into Writer.tryEnqueue helper.

  Caller-side (ingest): differentiates ErrWALDropped from I/O
  failure via errors.Is. Drops increment a NEW totalWALDropped
  counter (separate from totalWALErrors) and emit a SAMPLED Warn
  (max 1 line/sec via walDropLastLogNano) — the previous code path
  emitted an unsampled Error per record, which was log-spam at ~250
  bytes/event under sustained backpressure.

  Cluster receivers (replication/receiver + sharding/shard_receiver):
  ErrWALDropped is now NON-FATAL — receiver continues to apply the
  entry to the in-memory ingest buffer, increments
  totalLocalWALDropped, emits a warn. The previous behavior would
  have advanced lastSeq past the dropped entry, causing silent
  primary/follower divergence. Durability is preserved by the
  primary's WAL + Phase 2 peer Parquet replication.

- C4: Fiber's c.Body() transparently gunzipped Content-Encoding:
  gzip with NO size cap — a 1 MB gzip payload that decompressed to
  50 GB would OOM the process before any handler-level check fired.
  Fixed in lineprotocol and tle handlers (msgpack was already
  correct): use c.Request().Body() (raw bytes) and our pooled
  decompressGzip helper that enforces a hard size cap before
  allocation. Mirrors the msgpack pattern.

- C5: Ingest endpoints (msgpack, lineprotocol×3, tle, import×4)
  lacked auth.RequireWrite middleware — a read-only token could
  write data via /api/v1/write/* when RBAC was disabled (OSS
  default). Same vuln class CLAUDE.md flagged on CQ/delete. Fixed:
  SetAuthAndRBAC on the four ingest handlers now takes the concrete
  *auth.AuthManager directly (no silent type-assert miss); routes
  use the new withWriteAuth / withAdminAuth helpers in
  internal/api/auth_middleware.go that fall back to a no-op
  middleware when auth is disabled. Bulk import endpoints use
  RequireAdmin (rewriting history). LP /flush also uses RequireAdmin
  (global flush is operationally heavy + spammable).

Refactors / cleanups (review-pass findings):

- Extract Writer.tryEnqueue (eliminates 2 copies of the WAL drop pattern).
- Extract ArrowBuffer.tryEnqueueFlush (eliminates 4 copies of the
  closing-flag + select-with-ctx-arm + queue-full pattern). Returns a
  flushSendOutcome enum so the caller knows queued / closing /
  ctx-canceled / queue-full distinctly.
- Extract ArrowBuffer.flushOnSchemaChangeLocked (eliminates 2 copies
  of the schema-evolution loop).
- Extract withWriteAuth / withAdminAuth helpers + passthroughMiddleware
  in internal/api/auth_middleware.go. Collapses the if/else
  registration pattern in 4 ingest handlers.
- New regression tests with deterministic semantics: WAL drop test
  halts the drainer + manually fills the channel (no Skip on no-drops).
  Schema-evolution test uses iteration count + per-goroutine recover.
  Close-race test waits for evidence of writer activity.

Performance: 60s sustained MessagePack columnar benchmark on this
branch produced 19.04M rec/s @ p99 3.13ms vs baseline 18.6M rec/s @
p99 3.68ms — ~17% p99 improvement, ~2% throughput uplift, 0 errors.
The hot record path is unchanged; all new logic is per-flush or
per-shutdown.

Release notes: RELEASE_NOTES_2026.05.1.md updated with a new
"Ingestion Critical-Path Hardening (26.05.1 Pre-GA)" subsection
under Hardening, covering the five critical fixes and the
performance result.
@xe-nvdk
Copy link
Copy Markdown
Member Author

xe-nvdk commented Apr 27, 2026

/gemini review

@gemini-code-assist this is the 26.05.1 GA-blocker hardening PR. Five critical fixes plus the cleanup pass. Two rounds of 4-agent internal review on the branch caught real bugs in the first round of fixes (cluster-receiver fallout from C3, log-spam under WAL backpressure, type-assert silent miss in auth wiring) — all addressed before opening this PR. Looking for any remaining findings before merge.

Sustained-load benchmark on this branch:

  • 19.04M rec/s, p99 3.13ms, 0 errors over 60s (vs baseline 18.6M @ p99 3.68ms)
  • Hot record path is unchanged; all new logic is per-flush or per-shutdown

Reflects the 60s sustained MessagePack columnar benchmark on the
fix/ingest-26.05.1-criticals branch:

  19.04M rec/s @ p99 3.13ms over 1.14B records, 0 errors.

Previous claim was 18.6M rec/s @ p99 3.68ms (still true on baseline).
The other rows in the table (Zstd / GZIP / Line Protocol) were not
re-measured on this branch and are left as-is — only the path the
benchmark actually exercised gets updated.

Marketing site (basekick.net) and docs site (docs.basekick.net) carry
the same claims and will be updated separately.
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request hardens the ingestion pipeline by addressing concurrency races, durability gaps, and security vulnerabilities. It eliminates a shutdown panic in ArrowBuffer, fixes schema-evolution corruption using a bounded re-check loop, and prevents OOMs from gzip bombs by implementing capped decompression on raw request bodies. Additionally, it introduces explicit WAL backpressure handling via an ErrWALDropped sentinel and enforces write-tier authentication across all ingestion endpoints. Review feedback recommends restoring detailed error messages in decompression failure responses and suggests implementing zstd support in Line Protocol and TLE handlers for consistency.

Comment thread internal/api/lineprotocol.go Outdated
Comment thread internal/api/tle.go Outdated
Comment thread internal/api/lineprotocol.go Outdated
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request hardens the ingestion path by addressing several critical concurrency, durability, and security issues. Key improvements include eliminating a graceful-shutdown panic in ArrowBuffer, preventing schema-evolution corruption during concurrent writes via a bounded retry loop, and protecting against gzip decompression bombs by implementing manual decompression with a hard size cap. Additionally, WAL backpressure is now explicitly handled using a new ErrWALDropped sentinel error, and write-tier authentication has been enforced across all ingestion and import endpoints. Feedback was provided to improve the clarity of the release notes by expanding glob patterns in the endpoint lists.

Comment thread RELEASE_NOTES_2026.05.1.md Outdated
…on LP/TLE

Addresses all four gemini-code-assist findings on PR #413:

- LP and TLE gzip-decompress responses now include the underlying
  err.Error() so clients can diagnose oversized payloads without
  checking server logs (mirrors msgpack and import handlers).
- Line Protocol and TLE endpoints now accept Content-Encoding: zstd
  via magic-byte detection (0x28 0xB5 0x2F 0xFD), decompressed through
  the package-level zstdDecoderPool shared with msgpack. Same 100MB
  decompressed-size cap as the gzip path. Zstd is 3-5x faster than
  gzip on typical LP and TLE payloads (matters most for satellite-fleet
  TLE senders pushing large catalog refreshes per tick).
- Release notes: expanded the shell-glob endpoint list to explicit
  paths (gemini's prose-clarity suggestion) and added a new
  "Line Protocol and TLE: zstd Decompression Support" subsection.

New helper: decompressZstdPooled in tle.go (returns []byte; LP/TLE
keep their flat-byte handler shape rather than adopting msgpack's
PooledBuffer.Release contract).
@xe-nvdk
Copy link
Copy Markdown
Member Author

xe-nvdk commented Apr 27, 2026

/gemini review

@gemini-code-assist round 2 fixes pushed in commit 713bbd3. All four findings addressed:

MEDIUM (×2) — error-detail restore:

  • ✅ LP gzip-decompress response now includes err.Error() — line 229.
  • ✅ TLE gzip-decompress response now includes err.Error() — line 131.
  • Same change applied symmetrically to the new zstd branches in both handlers.

MEDIUM — zstd on LP/TLE (consistency with msgpack):

  • ✅ Both handlers now detect zstd via magic bytes (0x28 0xB5 0x2F 0xFD) after the gzip check, with the same 100 MB decompressed-size cap as gzip.
  • New helper decompressZstdPooled(data []byte, maxSize int) ([]byte, error) in tle.go reuses the package-level zstdDecoderPool and decompressBufferPool already declared in msgpack.go. Returns a freshly-copied []byte rather than a PooledBuffer — keeps the LP/TLE flat-byte handler shape unchanged. The pooled buffer itself is returned to its pool inside the helper, with the same oversized-buffer discard policy as msgpack (cap > maxPooledBufferSize → fresh small buffer for the pool, decompBufferDiscards counter incremented).
  • New release-notes subsection "Line Protocol and TLE: zstd Decompression Support" describes the addition, the magic-byte detection, the 100 MB cap, and the 3–5× speedup over gzip relevant for satellite-fleet TLE senders.

MEDIUM — release-notes prose clarity:

  • ✅ Replaced the shell-glob /api/v1/import/{csv,parquet,lp,tle} with the four explicit paths and rephrased the auth bullet for readability per your suggestion.

Build clean (go build ./internal/api/... ./cmd/...) and full api test suite passes (go test ./internal/api/...ok 5.848s). No new tests added for the zstd helper because it shares the contract and pool with msgpack's already-tested path; if you'd like a dedicated decompressZstdPooled unit test (oversized-payload cap, malformed-magic-bytes-but-valid-stream, etc.), I'm happy to add it.

Looking for any final findings before merge.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements critical-path hardening for the ingestion pipeline, addressing concurrency races, durability gaps during WAL backpressure, and security vulnerabilities related to decompression bombs and missing auth middleware. Key improvements include a graceful shutdown mechanism for ArrowBuffer, a bounded loop to prevent schema-evolution corruption, and the introduction of wal.ErrWALDropped to accurately report backpressure. Review feedback identified a critical data corruption risk in the Line Protocol and TLE handlers where raw request bodies were used without copying, potentially leading to use-after-free issues due to fasthttp's buffer reuse. Additionally, the safety limit for schema-evolution flushes was noted as a potential point for returning errors rather than allowing schema mixing.

Comment thread internal/api/lineprotocol.go Outdated
Comment thread internal/api/tle.go Outdated
Comment thread internal/ingest/arrow_writer.go
Addresses both gemini-code-assist findings on PR #413 commit 713bbd3:

CRITICAL (security-critical): Use-after-free risk in LP and TLE
handlers. c.Request().Body() returns a fasthttp-owned slice that is
reused after the handler returns. The current parser path produces
fresh strings (no aliasing today), but the contract was undocumented
and brittle. Both handlers now make a one-shot defensive copy at the
localProcessing entry point — append([]byte(nil), c.Request().Body()...).
Cost: one memcpy per request, dwarfed by per-record allocations. This
closes the silent-aliasing footgun if anyone ever adds a fast-path
that retains a sub-slice of the body.

MEDIUM: Schema-evolution iteration cap previously returned nil and
proceeded with a wide schema-mixed buffer. Now returns a typed
ingest.ErrSchemaChurnExceeded sentinel; LP and TLE handlers map it
to HTTP 503 (retryable backpressure). New totalSchemaChurnExceeded
atomic counter surfaced in ArrowBuffer.Stats() so operators can
alert on a non-zero rate. The per-iteration flushes inside the loop
already wrote the older schemas' rows to durable Parquet, so there
is no data loss — only this single request fails under sustained
schema-rotation churn.

New regression tests:
- TestArrowBuffer_SchemaChurnExceededErrorContract: pins the error
  type (errors.Is(err, ErrSchemaChurnExceeded)) and counter/error
  parity under a rotating-schema workload.
- TestArrowBuffer_ErrSchemaChurnExceeded_IsExported: sentinel must
  be exported and unwrappable via errors.Is.

Build clean, full ingest + api test suite passes.
@xe-nvdk
Copy link
Copy Markdown
Member Author

xe-nvdk commented Apr 27, 2026

/gemini review

@gemini-code-assist round 3 fixes pushed in commit 3c79ea1. Both findings addressed:

CRITICAL — body-aliasing footgun (LP and TLE):

  • ✅ Both handlers now copy the raw fasthttp body at localProcessing entry: body := append([]byte(nil), c.Request().Body()...).
  • Verified: the LP and TLE parsers already produce fresh strings (string(...) copies in unescape / parseFieldValue), so today's behavior was actually safe — but the no-aliasing contract was undocumented, brittle, and the silent-aliasing footgun would surface as silent data corruption if anyone ever added a fast-path that retained a sub-slice. The defensive copy is one memcpy per request, dwarfed by per-record allocations.
  • The decompression branches already returned fresh slices, so they're symmetric. msgpack handler is unchanged: its decoder.Decode(payload) is synchronous and consumes payload before returning, so fasthttp body reuse is fine there.

MEDIUM — schema-evolution cap behavior:

  • flushOnSchemaChangeLocked now returns a typed ingest.ErrSchemaChurnExceeded sentinel when schemaEvolutionMaxIters is reached (was: log + return nil, allowing wide schema-mixed buffers to land on disk).
  • ✅ LP and TLE write handlers map errors.Is(err, ingest.ErrSchemaChurnExceeded) → HTTP 503 so upstream senders treat it as retryable backpressure rather than permanent server error.
  • ✅ New totalSchemaChurnExceeded atomic counter surfaced in ArrowBuffer.Stats() as total_schema_churn_exceeded so operators can alert on a non-zero rate.
  • Important: the per-iteration flushes inside the loop already wrote older schemas' rows to durable Parquet — only the current request fails. No data loss.

New regression tests:

  • TestArrowBuffer_SchemaChurnExceededErrorContract — runs 16 writers × 200 iterations with 12 rotating distinct schemas against a single shard, asserts that (a) every non-nil write error must be errors.Is(err, ErrSchemaChurnExceeded) (catches a regression to "return nil") and (b) totalSchemaChurnExceeded counter delta == churn-error count (catches counter/error decoupling). Test passes deterministically; under steady-state runs the cap doesn't fire (0 errors), the contract still holds.
  • TestArrowBuffer_ErrSchemaChurnExceeded_IsExported — pins the sentinel as exported and unwrappable via errors.Is.

Full ingest + api suites pass (go test ./internal/ingest/... ./internal/api/... → ok 66s + 5s); build clean.

Looking for any final findings before merge.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements critical-path hardening for the ingestion pipeline, addressing concurrency races, schema-evolution corruption, and WAL backpressure handling. Key changes include a new shutdown protocol for ArrowBuffer to prevent panics, a bounded loop for schema changes to ensure data integrity, and the introduction of the wal.ErrWALDropped sentinel to accurately report durability status. Additionally, the PR adds zstd decompression support for Line Protocol and TLE, enforces stricter write-tier authentication across all ingestion endpoints, and mitigates decompression bomb vulnerabilities. Feedback focuses on correcting a docstring mismatch in the schema-evolution logic, optimizing memory usage by deferring defensive body copies until after compression checks, and ensuring consistent 503 error reporting for schema churn in the MessagePack handler.

Comment thread internal/ingest/arrow_writer.go Outdated
Comment thread internal/api/lineprotocol.go Outdated
Comment thread internal/api/tle.go Outdated
Comment thread internal/api/msgpack.go
Addresses all four NEW gemini-code-assist findings on PR #413 commit
3c79ea1. Two earlier findings (zstd consistency + cap-error return)
were stale duplicates already addressed in 713bbd3 / 3c79ea1.

MEDIUM: Defensive body copy in LP and TLE was performed unconditionally
before compression detection, doubling memory for large compressed
payloads (decompression returns a fresh slice, making the prior copy
redundant). Both handlers now use a switch over compression magic
bytes — gzip/zstd return fresh slices directly, only the uncompressed
default branch makes the defensive copy.

MEDIUM: writeMsgPack handler did not specifically handle
ingest.ErrSchemaChurnExceeded — it returned a generic 500 instead of
the retryable 503 used by the LP and TLE handlers. The error message
also dropped the underlying err.Error() detail. Both fixed: msgpack
now mirrors LP/TLE — errors.Is(err, ingest.ErrSchemaChurnExceeded)
maps to 503, and the generic-error response includes err.Error() for
client-side diagnosis (consistent with the gzip-decompress responses).

MEDIUM: flushOnSchemaChangeLocked docstring still listed
"schemaEvolutionMaxIters reached — return nil (best-effort)" but the
implementation in 3c79ea1 changed it to return ErrSchemaChurnExceeded.
Comment updated to match: documents the 503 retryable contract and
that per-iteration flushes already wrote older schemas to durable
Parquet so there is no data loss.

Build clean, full ingest + api test suite passes.
@xe-nvdk
Copy link
Copy Markdown
Member Author

xe-nvdk commented Apr 28, 2026

/gemini review

@gemini-code-assist round 4 fixes pushed in commit 0822793. All four NEW findings from round 3 addressed (the 2 earlier-rounds re-flags at lineprotocol.go:231 / arrow_writer.go:1440 were stale — already addressed by 713bbd3 / 3c79ea1 respectively, just gemini re-flagging them on shifted line numbers):

MEDIUM — defensive body copy now deferred (LP and TLE):

  • ✅ Both handlers switched from "unconditional copy + compression detection" to a single switch over magic bytes. Gzip and zstd branches return fresh slices directly (decompression already produces a fresh allocation); only the uncompressed default branch makes the defensive append([]byte(nil), rawBody...) copy.
  • This eliminates the doubled-memory case for large compressed payloads while preserving the silent-aliasing footgun fix from round 2.

MEDIUM — ErrSchemaChurnExceeded → 503 in msgpack handler:

  • writeMsgPack now mirrors LP/TLE — errors.Is(err, ingest.ErrSchemaChurnExceeded) maps to HTTP 503 with retryable-backpressure semantics.
  • ✅ Generic-error path also restored err.Error() in the response body for client-side diagnosis (was previously a hard-coded "Failed to write records" with no detail). Now consistent with the gzip-decompress responses.

MEDIUM — flushOnSchemaChangeLocked docstring stale:

  • ✅ Updated comment block to match implementation: "schemaEvolutionMaxIters reached — return ErrSchemaChurnExceeded so the caller can reject the write with a retryable status (HTTP 503). The per-iteration flushes inside the loop already wrote older schemas' rows to durable Parquet, so there is no data loss — only the current request fails under sustained schema-rotation churn."

Build clean, full ingest + api suites pass (go test ./internal/ingest/... ./internal/api/... → ok 67s + 5s).

Looking for any final findings before merge.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request hardens the ingestion pipeline by addressing concurrency races during shutdown and schema evolution, improving WAL backpressure visibility, and enforcing stricter authentication. It also adds zstd support for Line Protocol and TLE endpoints and implements defensive body copies to prevent memory aliasing. Review feedback identifies a potential OOM vulnerability in the zstd decompression path and suggests consolidating redundant decompression logic and magic-byte detection across handlers to improve maintainability.

Comment thread internal/api/tle.go Outdated
bufPtr := decompressBufferPool.Get().(*[]byte)
buf := (*bufPtr)[:0]

buf, err = decoder.DecodeAll(data, buf)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The use of decoder.DecodeAll is vulnerable to decompression bombs. While zstd.WithDecoderMaxMemory limits the decoder's internal window memory, DecodeAll will continue to grow the output buffer as long as the input payload provides valid compressed data, potentially leading to an Out-Of-Memory (OOM) condition before the length check at line 326 is reached. To properly harden this path, consider using the io.Reader interface with an io.LimitReader to bound the decompressed output size during the decoding process.

Comment thread internal/api/lineprotocol.go Outdated
Comment on lines 230 to 265
switch {
case len(rawBody) >= 2 && rawBody[0] == 0x1f && rawBody[1] == 0x8b:
// gzip — decompressGzip applies a hard 100MB decompressed cap so
// an adversarial gzip bomb is rejected before allocation.
h.totalBytesGzipped.Add(int64(originalSize))

decompressed, err := h.decompressGzip(body)
decompressed, err := h.decompressGzip(rawBody)
if err != nil {
h.totalErrors.Add(1)
h.logger.Error().Err(err).Msg("Failed to decompress gzip data")
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Failed to decompress gzip data: " + err.Error(),
})
}
body = decompressed

case len(rawBody) >= 4 && rawBody[0] == 0x28 && rawBody[1] == 0xB5 && rawBody[2] == 0x2F && rawBody[3] == 0xFD:
// zstd magic number 0x28 0xB5 0x2F 0xFD — same 100MB cap as gzip,
// pooled decoder reused via the package-level zstdDecoderPool.
h.totalBytesGzipped.Add(int64(originalSize))
decompressed, err := decompressZstdPooled(rawBody, 0)
if err != nil {
h.totalErrors.Add(1)
h.logger.Error().Err(err).Msg("Failed to decompress zstd data")
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Failed to decompress zstd data: " + err.Error(),
})
}
body = decompressed

default:
// Uncompressed payload — defensive copy out of the fasthttp-owned
// buffer so anything downstream that retains a sub-slice (today
// nothing does, but the contract is undocumented) stays valid
// after the handler returns.
body = append([]byte(nil), rawBody...)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for detecting compression magic bytes and performing decompression is identical between the Line Protocol and TLE handlers. Consider extracting this switch block into a shared helper function within the api package (e.g., func decompressRequest(rawBody []byte, maxSize int) ([]byte, string, error)) to ensure consistent behavior and simplify the handler implementations.

Comment thread internal/api/lineprotocol.go Outdated
h.totalBytesGzipped.Add(int64(originalSize))

decompressed, err := h.decompressGzip(body)
decompressed, err := h.decompressGzip(rawBody)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This call uses the LineProtocolHandler.decompressGzip method, which duplicates logic found in the package-level decompressGzipPooled function (defined in tle.go). Both use the same underlying pool. To reduce code duplication and improve maintainability, consider using the shared function here.

Suggested change
decompressed, err := h.decompressGzip(rawBody)
decompressed, err := decompressGzipPooled(rawBody, 0)

Addresses three NEW gemini-code-assist findings on PR #413 commit
0822793. Two earlier-round comments on arrow_writer.go:1440 and
msgpack.go:168 were stale duplicates already addressed in 3c79ea1
and 0822793 respectively (gemini's grep landed on shifted line
numbers; the relevant code now shows the fixed shape).

HIGH/security-high — zstd decompression bomb:
The msgpack zstd path (and the new LP/TLE zstd path added in 713bbd3)
used zstd.Decoder.DecodeAll, which grows the output buffer to fit
the entire decompressed stream regardless of WithDecoderMaxMemory
(that option only bounds the decoder's per-frame window, not output
buffer growth). A high-ratio bomb (e.g. 28KB compressed → 256MB
decompressed, ratio 9361x) would have OOM'd the process before any
post-hoc length check could fire — symmetric to the gzip-bomb fix
earlier in 26.05.1. All three handlers now use streaming
decoder.Reset + io.LimitReader + io.ReadAll with the same hard
100MB output cap as gzip; the bound is enforced *during* decoding.
New regression tests in internal/api/decompress_bomb_test.go
construct a 256MB bomb at a 100MB cap and assert clean rejection
without OOM.

MEDIUM — duplicated dispatch logic:
Extracted shared decompressRequest(rawBody, maxSize) ([]byte, codec, error)
helper in tle.go that handles gzip/zstd/uncompressed dispatch with
the hard output cap and the defensive copy of fasthttp-owned bytes
on the uncompressed branch. LP and TLE handlers now delegate to it
in three lines instead of a 30-line inline switch each.

MEDIUM — duplicated gzip helper:
LineProtocolHandler.decompressGzip method was a near-clone of the
package-level decompressGzipPooled (in tle.go), both pulling from
the same lpGzipReaderPool. The method is now removed; all callers
go through decompressRequest → decompressGzipPooled. Removed unused
bytes/io/gzip imports from lineprotocol.go.

Build clean, full ingest + api test suite passes (incl. 3 new
bomb-regression tests).
@xe-nvdk
Copy link
Copy Markdown
Member Author

xe-nvdk commented Apr 28, 2026

/gemini review

@gemini-code-assist round 5 fixes pushed in commit 1aca1e8. All three NEW findings from round 4 addressed (the 2 earlier-rounds re-flags at arrow_writer.go:1440 / msgpack.go:168 were stale — already fixed in 3c79ea1 and 0822793 respectively, gemini's grep just landed on shifted lines):

🚨 HIGH/security-high — zstd decompression bomb in all three handlers:

  • ✅ Confirmed real vulnerability in BOTH the new LP/TLE zstd path (added 713bbd3) AND the pre-existing msgpack zstd path. zstd.Decoder.DecodeAll grows its output buffer to fit the entire decompressed stream regardless of WithDecoderMaxMemory (which bounds only the per-frame window, not output growth).
  • ✅ All three switched to streaming decoder.Reset(bytes.NewReader(data)) + io.LimitReader(decoder, max+1) + io.ReadAll. Bound is enforced during decoding so the bomb is rejected with a bounded len(maxSize)+1 allocation. Same shape as the existing gzip path.
  • ✅ Pool semantics preserved: decoder is still pooled (the per-frame state is the expensive part); output is now a fresh slice on the streaming path because io.ReadAll's growth pattern fights decompressBufferPool's fixed-capacity contract. msgpack's PooledBuffer returned with bufPtr=nil so Release() is a safe no-op for the new path.
  • ✅ New regression tests in internal/api/decompress_bomb_test.go:
    • TestDecompressZstdPooled_BombRejected_WithoutOOM — 256 MB plaintext compresses to 28 KB (ratio 9361×); decompressZstdPooled correctly rejects with "exceeds 100MB limit".
    • TestDecompressZstdPooled_RejectsBenignOversizedPayload — non-adversarial cap-exceeded path.
    • TestDecompressZstdPooled_HappyPath_RoundTrip — pin against regression in normal-sized requests.

MEDIUM — extract shared decompressRequest helper:

  • ✅ New decompressRequest(rawBody []byte, maxSize int) (body []byte, codec string, err error) in tle.go. Returns the fresh-decoded slice plus a codec label ("gzip", "zstd", or "") for structured logging.
  • ✅ LP and TLE handlers reduced from a 30-line inline switch each to a 3-line body, codec, err := decompressRequest(rawBody, 0) call. All three CRITICAL invariants (no fasthttp gunzip, streaming-bounded zstd, defensive copy on uncompressed) are preserved inside the helper.

MEDIUM — remove duplicated gzip helper:

  • LineProtocolHandler.decompressGzip method (lineprotocol.go:373) was a near-clone of decompressGzipPooled (tle.go) and pulled from the same lpGzipReaderPool. Method removed; all routes go through decompressRequestdecompressGzipPooled. Removed now-unused bytes/io/gzip imports from lineprotocol.go.

Build clean, full suites pass (go test ./internal/api/... → 2.886s, ./internal/ingest/... → 66.912s); 3 new bomb-regression tests included.

Looking for any final findings before merge.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request hardens the ingestion path by fixing concurrency races during shutdown and schema evolution, mitigating decompression bomb vulnerabilities with streaming decoders, and enforcing explicit authentication across all write and import endpoints. It also improves durability tracking by surfacing WAL backpressure as a distinct error state. Feedback focuses on standardizing error messages with formatBytes and implementing log sampling for WAL drop warnings in the replication and sharding receivers to prevent log spam under load.

Comment thread internal/api/tle.go Outdated
Comment thread internal/cluster/replication/receiver.go
Comment thread internal/cluster/replication/receiver.go
Comment thread internal/cluster/sharding/shard_receiver.go
Comment thread internal/cluster/sharding/shard_receiver.go
…rmatBytes

Addresses 5 NEW gemini-code-assist findings on PR #413 commit 1aca1e8.
Two earlier-round comments at arrow_writer.go:1440 and msgpack.go:168
remain stale duplicates (already addressed in 3c79ea1 and 0822793 —
gemini's grep keeps landing on shifted lines).

MEDIUM (x4) — sampled Warn for follower-side WAL backpressure:
The replication.Receiver and sharding.ShardReceiver fix from 26.05.1
(cluster-receivers tolerate ErrWALDropped) emitted Warn unsampled.
Under sustained follower-side backpressure that produces a Warn per
replicated record, drowning operator dashboards. Both receivers now
mirror the ArrowBuffer.recordWALError pattern: walDropLastLogNano
atomic timestamp, walDropLogIntervalNano = 1s const, log line
suppressed if (now - last) < interval. Operators continue to read
the rate from totalLocalWALDropped; the Warn is just the degraded-
state signal.

MEDIUM — formatBytes consistency:
decompressZstdPooled and decompressGzipPooled previously formatted
cap errors as "exceeds %dMB limit". Switched to formatBytes(int64(
maxSize)) for consistency with msgpack's decompressZstd and
decompressGzip.

Build clean, full api + replication + sharding suites pass.
@xe-nvdk
Copy link
Copy Markdown
Member Author

xe-nvdk commented Apr 28, 2026

/gemini review

@gemini-code-assist round 6 fixes pushed in commit 4a05cfb. All 5 NEW round-5 findings addressed (the recurring arrow_writer.go:1440 and msgpack.go:168 re-flags remain stale — already fixed in 3c79ea1 and 0822793; gemini's grep just keeps landing on shifted lines):

MEDIUM × 4 — sampled Warn for follower-side WAL backpressure:

  • ✅ Added walDropLastLogNano atomic.Int64 field to both replication.Receiver and sharding.ShardReceiver.
  • ✅ Added walDropLogIntervalNano = int64(time.Second) const next to each receiver type.
  • ✅ Wrapped both Warn emissions in the same atomic-CAS sampling guard used by ArrowBuffer.recordWALError — at most one log line per second per receiver, suppression keyed on (now - last) < interval && CompareAndSwap(last, now).
  • ✅ Both Warn payloads now also emit the running total_dropped counter so operators can read the rate from a single line if they grep for it.
  • ✅ The non-fatal/applied semantics from the 26.05.1 fix are unchanged — only the log shape changed.

MEDIUM — formatBytes consistency:

  • decompressZstdPooled and decompressGzipPooled cap-error messages now use formatBytes(int64(maxSize)) instead of %dMB. Matches msgpack's decompressZstd / decompressGzip exactly.

Build clean, full suites pass: go test ./internal/api/... ./internal/cluster/replication/... ./internal/cluster/sharding/... → ok 4.7s + 0.8s + 3.0s.

Looking for any final findings before merge.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements critical hardening of the ingestion path, addressing concurrency races, durability gaps, and security vulnerabilities. Key improvements include a safe shutdown mechanism for the ArrowBuffer, a bounded loop for schema evolution to prevent corruption, and protection against decompression bombs. It also introduces explicit write-tier authentication and improves WAL backpressure observability. Feedback highlights a missing size check for uncompressed payloads in the shared decompression utility and a potential data corruption risk in the MessagePack handler due to the lack of a defensive body copy.

Comment thread internal/api/tle.go
Comment on lines +247 to +248
out := append([]byte(nil), rawBody...)
return out, "", nil
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The uncompressed branch in decompressRequest is missing a size check against maxSize. While the gzip and zstd paths are capped at 100MB (by default), an uncompressed request is currently only limited by the server's global body limit. This creates an inconsistency and a potential OOM vector if a large uncompressed payload is defensively copied. It should enforce the same maxSize limit as the compressed paths.

if maxSize <= 0 {
			maxSize = 100 * 1024 * 1024 // 100MB default
		}
		if len(rawBody) > maxSize {
			return nil, "", fmt.Errorf("payload exceeds %s limit", formatBytes(int64(maxSize)))
		}
		out := append([]byte(nil), rawBody...)
		return out, "", nil

Comment thread internal/api/msgpack.go
@@ -314,8 +330,16 @@ localProcessing:
if err := h.arrowBuffer.Write(ctx, database, records); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The MessagePack handler is missing the defensive copy for uncompressed payloads that was added to the Line Protocol and TLE handlers in this PR. Since c.Request().Body() returns a buffer that fasthttp reuses immediately after the handler returns, and arrowBuffer.Write processes data asynchronously, any sub-slices (like strings) retained by the decoder will point to invalid memory once the next request arrives. This is a significant data corruption risk. Consider refactoring writeMsgPack to use the decompressRequest helper or adding a manual copy for the uncompressed path.

…elper

Addresses 2 NEW HIGH findings on PR #413 commit 4a05cfb. Six earlier-
round comments remain stale duplicates (already fixed in 3c79ea1,
0822793, and 4a05cfb — gemini's grep keeps re-flagging on shifted
lines).

HIGH/security-high — uncompressed maxSize cap missing:
decompressRequest enforced the 100MB cap on gzip and zstd via the
streaming LimitReader but had no cap on the uncompressed branch.
A multi-GB raw body would have been both accepted AND defensively
copied, doubling the allocation in the uncompressed-OOM vector.
The helper now defaults maxSize<=0 to 100MB up front and applies
the same ceiling to all three branches; the uncompressed path
rejects over-cap input before the copy. New regression tests:
TestDecompressRequest_UncompressedRejectsOverCap (over-cap rejected),
TestDecompressRequest_UncompressedAcceptsAtCap (at-boundary accepted
+ defensive-copy no-aliasing invariant via mutate-and-check).

HIGH — msgpack missing defensive body copy:
The msgpack handler was the only ingest path still operating on
c.Request().Body() directly without the round-2 defensive copy. The
fork's msgpack decoder does string(b) copies on string fields and
io.ReadFull copies on []byte fields, so today's behavior is
defensible — but documenting that "no future fork update may
introduce zero-copy aliasing" is brittle, and the inconsistency
with LP/TLE was a real maintenance hazard. msgpack now routes
through the shared decompressRequest helper, which also gives it
the new uncompressed-size cap above for free.

Cleanup:
Deleted MsgPackHandler.decompressGzip and decompressZstd (240 lines
of per-handler pooled-buffer codecs replaced by the package-level
streaming helpers in tle.go). PooledBuffer remains in the package
for TestPooledBufferRelease. Removed unused bytes/io/gzip/zstd
imports from msgpack.go and updated handler error-message shape
across LP/TLE so codec-empty rejections produce a clean
"Failed to read request body: ..." (vs the awkward "Failed to
decompress  data: ...").

Build clean, full api + replication + sharding suites pass; 5
regression tests cover bomb / over-cap / boundary / aliasing.
@xe-nvdk
Copy link
Copy Markdown
Member Author

xe-nvdk commented Apr 28, 2026

/gemini review

@gemini-code-assist round 7 fixes pushed in commit 6561c86. Both NEW HIGH findings addressed; the 6 recurring re-flags (arrow_writer.go:1440, msgpack.go:168, replication/receiver.go:91+386, sharding/shard_receiver.go:81+494) remain stale — those code shapes were already fixed in 3c79ea1 / 0822793 / 4a05cfb and your grep just keeps landing on shifted lines.

🚨 HIGH/security-high — decompressRequest uncompressed branch missing maxSize cap:

  • ✅ Helper now defaults maxSize <= 0 to 100MB up front and applies the cap to all three branches symmetrically. The uncompressed path rejects over-cap input before the defensive copy, closing the OOM vector you flagged.
  • ✅ New regression tests:
    • TestDecompressRequest_UncompressedRejectsOverCapcap+1 raw bytes → "exceeds 100MB limit" error, codec="".
    • TestDecompressRequest_UncompressedAcceptsAtCap — exactly cap bytes accepted; mutates body[0] and verifies rawBody[0] is unchanged, locking in the defensive-copy no-aliasing invariant.

🚨 HIGH — msgpack handler missing defensive body copy:

  • writeMsgPack now routes through the shared decompressRequest(rawBody, int(h.maxPayloadSize)) helper, just like LP and TLE. The defensive copy + new uncompressed-cap apply automatically.
  • ✅ Deleted MsgPackHandler.decompressGzip and decompressZstd (240 lines of per-handler pooled-buffer codecs replaced by the package-level streaming helpers in tle.go). PooledBuffer itself remains for TestPooledBufferRelease.
  • ✅ Removed unused bytes/io/gzip/zstd imports from msgpack.go.
  • ✅ Error message shape unified: when codec is empty (uncompressed-rejection), the response is now "Failed to read request body: <err>" instead of the awkward "Failed to decompress data: <err>". When codec is gzip/zstd, the existing decoder-error message format is preserved.

Verification of the round-2 conclusion (no current data corruption):

  • The Basekick-Labs/msgpack/v6 fork was inspected: string fields go through string(b) (always allocates), and []byte fields go through io.ReadFull into a fresh slice (also always allocates). So today's msgpack handler was technically safe even without the defensive copy. But the contract was undocumented and brittle, and the inconsistency with LP/TLE was a real maintenance hazard — your finding to defensively copy is the right call regardless.

Build clean, full suites pass: go test ./internal/api/... ./internal/cluster/replication/... ./internal/cluster/sharding/... → ok 4.8s + 0.9s + 2.5s. 5 decompress regression tests (3 round-4 bomb + 2 round-6 cap/aliasing) pass deterministically.

Looking for any final findings before merge.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request hardens the ingestion pipeline by addressing critical concurrency, durability, and security issues. Key improvements include fixing a graceful-shutdown panic in ArrowBuffer, preventing schema-evolution corruption through a bounded retry loop, and introducing explicit wal.ErrWALDropped handling to distinguish backpressure from I/O errors. The PR also mitigates decompression bomb vulnerabilities across all ingestion handlers and enforces stricter authentication for write and import endpoints. Feedback highlights a potential risk in reusing decoders after a failed decompression, suggesting they should be dropped from the pool to avoid state corruption.

Comment thread internal/api/tle.go Outdated
Comment on lines +358 to +359
_ = decoder.Reset(nil)
zstdDecoderPool.Put(decoder)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Returning a decoder to the pool after a failed io.ReadAll might be risky if the decoder's internal state is corrupted. While Reset(nil) is called, it may be safer to drop the instance and let the pool allocate a fresh one if the error is not io.EOF or a simple limit exceeded error.

Three changes in this commit:

1) MEDIUM (gemini round 7) — drop poisoned zstd decoder on partial-
decode error. decompressZstdPooled previously called Reset(nil) and
returned the decoder to the pool after an io.ReadAll error, on the
assumption that Reset would clean any residual frame state. After a
mid-stream corruption error that's not a safe assumption — Close()
the decoder entirely; the next request's pool.Get() will allocate a
fresh one.

2) Plan B — msgpack uncompressed fast-path bypasses the defensive
body copy. After round-6 unified all three handlers behind
decompressRequest (which copies on the uncompressed branch), the
sustained MessagePack-columnar bench dropped from 19.04M rec/s to
18.04M rec/s — a 5% regression on the throughput-critical path
where 18M rec/s × ~50KB batches translates to a real allocation
storm. msgpack now inlines the dispatch: gzip/zstd still go through
the shared decompressGzipPooled / decompressZstdPooled (those need
the bomb cap and produce fresh slices anyway), but the uncompressed
branch hands rawBody directly to h.decoder.Decode. The decoder is
synchronous and the Basekick-Labs/msgpack/v6 fork copies every
string via `string(b)` and every []byte via io.ReadFull, so by the
time Decode returns no decoded value still aliases rawBody — fasthttp
can safely reuse the buffer. LP and TLE keep the defensive copy
because their throughput targets don't justify diverging from the
shared helper.

A new regression test (TestMessagePackDecoder_CopiesStringFields_
NoBodyAliasing) decodes a row-format payload, mutates the source
bytes to 0xFF, and asserts the decoded Measurement and Tags values
are unchanged. Locks in the no-aliasing invariant — if a future
fork update introduces zero-copy aliasing the test fails loudly
before production sees corruption.

Bench result with this commit: 19.13M rec/s @ p99 3.06ms over 60s,
slightly above the post-round-1 baseline of 19.04M / 3.13ms.

3) Bumped Basekick-Labs/msgpack/v6 from v6.0.0 to v6.1.0 (user-
released; pre-existing fork upgrade unrelated to gemini findings).
The no-aliasing regression test passes against v6.1.0.

The 7 other comments in gemini round 7 are recurring re-flag noise
on code shapes already fixed in 3c79ea1, 0822793, 4a05cfb, and 6561c86.

Build clean, all suites pass, no-aliasing invariant locked in test.
@xe-nvdk
Copy link
Copy Markdown
Member Author

xe-nvdk commented Apr 28, 2026

/gemini review

@gemini-code-assist round 8 fixes pushed in commit 04e64cc. One NEW round-7 finding addressed plus a perf restoration:

MEDIUM — drop poisoned zstd decoder on partial-decode error:

  • decompressZstdPooled now calls decoder.Close() on the io.ReadAll error path instead of Reset(nil) + returning to the pool. After a mid-stream corruption a poisoned internal frame state is not safely recoverable; closing the decoder lets the next request's pool.Get() allocate a fresh one. Cheap and defensive.

Perf restoration — msgpack uncompressed fast-path:

  • After round-6 unified all three handlers behind the shared decompressRequest helper (which copies on the uncompressed branch), the 60-second sustained MessagePack-columnar bench dropped from 19.04M rec/s (baseline) → 18.04M rec/s (~5% regression). At 18M rec/s × ~50KB batches, the new per-request defensive memcpy becomes a real allocation storm.
  • msgpack now inlines its codec dispatch: gzip/zstd still go through the shared helpers (those need the bomb cap and produce fresh slices anyway), but the uncompressed branch hands rawBody directly to h.decoder.Decode. The decoder is synchronous and the Basekick-Labs/msgpack/v6 fork copies every string via string(b) and every []byte via io.ReadFull — so by the time Decode returns, no decoded value still aliases rawBody.
  • LP and TLE keep the defensive copy in the shared helper — their throughput targets don't justify diverging.

New regression testTestMessagePackDecoder_CopiesStringFields_NoBodyAliasing decodes a row-format payload, mutates the source bytes to 0xFF, and asserts the decoded Measurement and Tags["region"] values are unchanged. Locks in the no-aliasing invariant: if a future fork update introduces zero-copy aliasing, the test fails loudly before production sees corruption. This is the load-bearing safety net for the bypass-the-copy decision.

Perf result with this commit: 19.13M rec/s @ p99 3.06ms over 60s — slightly above the post-round-1 baseline (19.04M / p99 3.13ms).

Bumped Basekick-Labs/msgpack/v6 from v6.0.0 to v6.1.0 (user-released; verified the no-aliasing regression test still passes).

The 7 other comments in round 7 are stale re-flag noise on code shapes already fixed in 3c79ea1 / 0822793 / 4a05cfb / 6561c86.

Build clean, full suites pass, no-aliasing invariant locked in test.

Looking for any final findings before merge.

@xe-nvdk xe-nvdk merged commit a70abf2 into main Apr 28, 2026
5 checks passed
@xe-nvdk xe-nvdk deleted the fix/ingest-26.05.1-criticals branch April 28, 2026 15:39
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request significantly hardens the ingestion path by addressing critical concurrency, durability, and security vulnerabilities. Key improvements include eliminating graceful-shutdown panics in the ArrowBuffer, preventing data corruption during concurrent schema evolution via a bounded retry loop, and improving WAL durability transparency by introducing an explicit ErrWALDropped sentinel. Security is bolstered by mitigating decompression-bomb vectors through streaming decoders with hard caps and enforcing write-tier authentication across all ingestion endpoints. Additionally, Line Protocol and TLE now support zstd decompression. Feedback is provided regarding an inconsistency in the MessagePack handler where a zero-valued maximum payload size would incorrectly reject all requests instead of falling back to the system default.

Comment thread internal/api/msgpack.go
Comment on lines +216 to 220
if int64(len(rawBody)) > h.maxPayloadSize {
return c.Status(fiber.StatusRequestEntityTooLarge).JSON(fiber.Map{
"error": fmt.Sprintf("Payload too large (max %s). Consider batching into smaller requests.", formatBytes(h.maxPayloadSize)),
})
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This check will reject all non-empty payloads if h.maxPayloadSize is 0 (e.g., if not explicitly configured). However, the downstream decompression helpers (decompressGzipPooled and decompressZstdPooled) treat a 0 limit as a signal to use the default 100MB cap. This check should be updated to handle 0 consistently with the rest of the pipeline to avoid unexpected rejections when using default settings.

Suggested change
if int64(len(rawBody)) > h.maxPayloadSize {
return c.Status(fiber.StatusRequestEntityTooLarge).JSON(fiber.Map{
"error": fmt.Sprintf("Payload too large (max %s). Consider batching into smaller requests.", formatBytes(h.maxPayloadSize)),
})
}
effectiveLimit := h.maxPayloadSize
if effectiveLimit <= 0 {
effectiveLimit = 100 * 1024 * 1024 // Match default in decompressRequest
}
if int64(len(rawBody)) > effectiveLimit {
return c.Status(fiber.StatusRequestEntityTooLarge).JSON(fiber.Map{
"error": fmt.Sprintf("Payload too large (max %s). Consider batching into smaller requests.", formatBytes(effectiveLimit)),
})
}

xe-nvdk added a commit that referenced this pull request Apr 28, 2026
…414)

* fix(query): six critical-path hardening fixes from query-path review

A 4-agent staff/principal-engineer review of the query execution path
(mirror of the ingest review that produced PR #413) surfaced six
issues we want addressed before 26.05.1 GA. All are fixed here. Bench
delta is within the 5% no-regression budget on a 99.9M-row ClickBench
GROUP BY workload (JSON p95 75.86ms→78.58ms, Arrow p95 76.14ms→78.36ms).

C1 — Expanded SQL denylist + comment-strip + literal-mask normalisation.
The query API guardrail previously gated only DDL/DML keywords. It
now also gates ATTACH/DETACH/COPY/EXPORT/IMPORT DATABASE/PRAGMA/
SET <var>=/LOAD/INSTALL/CALL — session/extension/file-system ops that
don't belong in a read-only query path. ValidateSQLRequest now strips
comments and masks string literals before the regex check, so
`DROP /* */ TABLE x` cannot interleave comments past token boundaries
and `SELECT 'DROP TABLE x'` is not falsely rejected. Removed the dead
(h *QueryHandler).validateSQL method (code-quality finding from the
same review). Test matrix covers every blocked keyword + comment-
injection + quoted-identifier false-positive cases.

C2 — x-arc-database header validation + universal read_parquet path
quoting. The header value lands inside read_parquet('<base>/<db>/...')
storage paths Arc generates internally; in some edge cases a header
containing a single quote or shell-active char could break out of the
literal. New validateHeaderDatabase helper validates at every entry
point (executeQuery, executeQueryArrow, estimateQuery). New quotePath()
helper routes every read_parquet('PATH', ...) interpolation site
through sqlutil.EscapeStringLiteral (added to internal/sql/mask.go as
a single source of truth). Eight read_parquet sites in query.go
converted. Tests cover SQLi vectors (quote, NUL, newline, comma,
backslash, path-traversal) and boundary conditions.

C3 — Reject direct read_parquet() in user SQL + extend CTE name regex.
Arc's transformation layer is the only legitimate source of
read_parquet — in some edge cases, a user query containing
read_parquet directly produced zero extracted table references and
bypassed the (database, measurement) RBAC pair-check. ValidateSQLRequest
now rejects user SQL containing read_parquet(. Companion fix: the
CTE-name extraction regex was extended to recognise the parenthesized
column-list form (WITH foo(c1, c2) AS (...)), so a CTE name doesn't
leak into the table-reference list. Tests cover read_parquet in JOIN,
subqueries, CTEs, plus the new CTE column-list parse.

C4 — Parallel-partition partial-failure now fails the whole request.
When the parallel executor fans out a query across N partitions and
one or more partition queries error, NewMergedRowIterator previously
returned the surviving partitions' rows as a 200/success — a fraction
of the result silently dropped. The handler now inspects per-partition
Error after ExecutePartitioned and fails the request with HTTP 500 on
any partition error. Companion fix in parallel_executor.go: the
goroutine fan-out semaphore is acquired in the launch loop instead of
inside each spawned goroutine, so a 10K-path query bounds in-flight
goroutines at MaxConcurrentPartitions (default 4) instead of spawning
10K goroutines parked on the semaphore.

C5 — Streaming-response error semantics. streamTypedJSON and
streamArrowJSON previously returned only the row count; in some edge
cases (Scan failures mid-stream, deferred *sql.Rows.Err(), context
cancellation after the response envelope was already flushed) the
loop silently `continue`'d and the caller marked the query
Complete(rowCount). Both functions now return (int, error), perform a
ctx.Err() check at every row/batch boundary, and check the iterator's
deferred error after the loop. Callers route any error to
IncQueryErrors, registry Fail() (or TimedOut() on
context.DeadlineExceeded), and an Error log line. The HTTP status
cannot be changed retroactively (headers already flushed) but
operator-side observability is now correct. The Arrow IPC stream loop
in executeQueryArrow adopts the same per-batch ctx-check pattern.

C6 — Arrow IPC streaming memory pinned by deferred Release. The
executeQueryArrow IPC loop used `defer batch.Release()` inside the
for-reader.Next() body. Defers accumulate on the closure stack until
the closure exits — for a 10M-row result with 10K-row batches,
1,000 deferred Release calls held all casted Arrow records alive
until the entire stream completed. Releases each casted batch
explicitly after ipcWriter.Write, restoring constant per-batch
memory. The reader-owned input batch needs no Release; reader.Next()
releases the prior record automatically.

Build clean; full ./internal/api/... ./internal/query/...
./internal/queryregistry/... ./internal/database/... test suites
pass. Smoke test against running Arc verified all 5 attack vectors
are rejected (header SQLi, direct read_parquet, ATTACH RCE, PRAGMA
hardening) and legitimate queries still work.

* fix(query): gemini round 1 — tighten SET/CALL regex + idiomatic ctx select

Addresses 5 NEW gemini findings on PR #414 commit 244849b. One MEDIUM
(G4 — simplify error-collection loop) was a no-op suggestion identical
to the existing code; skipped.

CRITICAL/security-critical (gemini G1) — SET regex bypassable:
The previous \bSET\s+(?:GLOBAL\s+|...)?\w+\s*= form required an
equals sign and a word after the optional scope keyword. DuckDB
also accepts:
  - SET enable_external_access TO true (TO instead of =)
  - SET VARIABLE x = 1            (VARIABLE keyword)
  - RESET enable_external_access  (mutates session state)
All three slipped past the regex. Replaced the multi-form pattern
with bare \bSET\b and added \bRESET\b. The query API is read-only
so any session-state mutation is forbidden — bare-keyword match is
correct here. New regression tests for all three bypass shapes.

CRITICAL/security-critical (gemini G2) — CALL regex bypassable:
\bCALL\s+\w+ required whitespace before the procedure name, but
DuckDB accepts CALL(proc_name) with no space. Replaced with bare
\bCALL\b. Regression test for the no-space form.

MEDIUM (gemini G3) — queryMeasurement streamCtx:
Switched from context.Background() to c.UserContext() so client
disconnects propagate to per-row cancellation. Fasthttp keeps
c.Context() alive across the SetBodyStreamWriter boundary so it's
safe inside the async stream callback (consistent with the
executeQuery and executeQueryArrow patterns).

MEDIUM (gemini G5+G6) — idiomatic select for ctx cancellation:
Replaced 'if ctx.Err() != nil' with 'select { case <-ctx.Done() }'
+ default at three call sites — streamTypedJSON, streamArrowJSON,
and the Arrow IPC stream loop in executeQueryArrow. Required
labeled-break (break scanLoop / batchLoop / streamLoop) because
plain break inside select-in-for would only break the select.
Same semantics, more idiomatic Go.

Bench: two runs on warm state show JSON avg 72.64-73.85ms, p95
74.34-81.31ms; Arrow avg 72.30-73.66ms, p95 74.30-75.50ms — well
within run-to-run jitter of baseline (73ms avg / 76ms p95). The
"+3.6% JSON p95" delta from the initial commit appears to have
been first-restart cold-cache noise, not a real regression.

Build clean (default + -tags=duckdb_arrow); full test sweep across
internal/api/... internal/query/... internal/queryregistry/...
internal/database/... passes.

* docs(release): SET/CALL denylist tightening from gemini round 1

The C1 denylist bullet listed SET <var>=/CALL (the original narrower
regex form). After gemini round 1 found two bypass shapes (SET TO,
SET VARIABLE, RESET, CALL with no whitespace before paren), the
regex simplified to bare-keyword anchors plus added RESET. Update
the release notes to match.
xe-nvdk added a commit that referenced this pull request Apr 28, 2026
Re-ran sustained_bench across all four ingestion paths after the
26.05.1 hardening landed. Numbers are 30-second runs, 12 workers,
0% errors:

| Protocol               | Old              | New              |
|------------------------|------------------|------------------|
| MessagePack Columnar   | 19.0M / p99 3.13ms | 19.9M / p99 2.95ms |
| MessagePack + Zstd     | 16.8M / p99 3.23ms | 16.5M / p99 2.70ms |
| MessagePack + GZIP     | 15.4M / p99 3.17ms | 16.5M / p99 2.71ms |
| Line Protocol          | 3.7M  / p99 10.63ms | 4.1M  / p99 8.85ms |

Compressed paths now sit within 13% of uncompressed columnar.
GZIP closed the gap to Zstd entirely (the C4 streaming-LimitReader
fix in #413 landed on the gzip path too). Line Protocol improved
~11% throughput / 17% better p99 — likely from the C5 stream-error
work + the C4 ingest hardening.

Bench harness fix: benchmarks/sustained_bench/main.go was
hardcoding /api/v1/write for the lineprotocol path. That endpoint
doesn't exist (404) — the actual route is /api/v1/write/line-protocol.
Fixed both the URL and the banner-printed endpoint label.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant