cannon: add direct-to-clickhouse output sink#808
Merged
Conversation
Cannon's canonical-table events flow xatu-server→Kafka→consumoor→CH today, but the server adds nothing the canonical routes consume (network name is populated by cannon itself). Collapse that path with a new `clickhouse` output sink that writes straight to CH, gated purely by config. Refactor the writer/router/route stack out of pkg/consumoor (zero Kafka coupling) into pkg/clickhouse so both consumoor and the new sink can reuse it. Telemetry subsystem is now a parameter so cannon's metrics land under xatu_cannon_* and consumoor dashboards stay on xatu_consumoor_*. The sink intentionally does NOT use processor.BatchItemProcessor: each HandleNewDecoratedEvents call routes the input slice and flushes one INSERT per affected table inline. Cannon derivers already deliver one full epoch per callback, so this preserves per-epoch atomicity. Coordinator checkpoint advance only fires after the sink returns nil — i.e., after CH has acked the writes (option-2 at-least-once). RestrictToTablePrefixes lets cannon-only deployments scope schema validation to canonical_* without provisioning libp2p/mev/execution table schemas.
The creasty/defaults library applies default tags AFTER yaml unmarshal, treating bool zero (false) as unset and overwriting it with the default. For these two fields the documented default is true, which means a user setting `failOnMissingTables: false` or `enabled: false` in YAML was silently flipped back to true on startup. Changing both fields to *bool gives the defaults pass a way to tell "unset" (nil) apart from "explicitly false" (pointer to false). Added ShouldFailOnMissingTables() / IsEnabled() accessors so callers don't have to deal with the pointer themselves.
5215440 to
8c59ed9
Compare
… tests) - Codegen tool's go-build target and output dir referenced the pre-rename pkg/consumoor/route/* paths, which no longer exist. Point them at the current pkg/clickhouse/route/* layout. - Writer.Start opened the connection pool and spawned the pool-metrics goroutine before validating tables. If validation failed (a likely outcome on a misprovisioned target), both leaked and a retried Start would early-return success because w.pool was still non-nil. Add a cleanupStartFailure helper that closes the pool, drains the metrics goroutine, and clears w.pool so retries actually retry. - The clickhouse output sink had no unit tests. Add coverage for filterRoutesByTablePrefix (single/multiple/non-matching/exact prefixes), Config validation (nil and missing-DSN), New construction (nil config, non-matching prefix list), and the *bool accessors on Config and AdaptiveLimiterConfig that survived the creasty/defaults pass. - Add a compile-time interface compliance assertion via a local outputSink mirror so future edits can't silently break the contract, and include the prefix list in the route-not-found error message so ops can distinguish 'no route exists' from 'route was filtered out'.
The earlier `*bool → IsEnabled() returns true on nil` semantic was meant
to mirror the YAML `default:"true"` tag, but it broke direct struct
construction (test fixtures and any programmatic users): a zero-value
ChGoConfig{} has AdaptiveLimiter.Enabled = nil, IsEnabled would return
true, and AdaptiveLimiterConfig.Validate() would then complain about the
zero MinLimit.
Switch IsEnabled / ShouldFailOnMissingTables to "explicit-only": nil
means off, only an explicit pointer value flips the switch. YAML callers
are unaffected because creasty/defaults runs during config load and
fills the nil with &true so the documented default still applies.
Programmatic callers now get the safer "off when unset" semantic that
matches Go zero-value intuition.
Also explicitly disable the adaptive limiter in writer_test.go and
writer_benchmark_test.go fixtures so the intent is visible at the call
site rather than relying on the accessor's nil semantics.
mattevans
approved these changes
May 5, 2026
- Makefile: fix stale `pkg/consumoor/route/cmd/generate` path; rename target to `clickhouse-routes` to reflect the new package layout. - pkg/clickhouse/route/correctness_test.go: update stale doc paths to `pkg/clickhouse/route/`. - pkg/clickhouse/config.go: tighten group-retry validation — when `groupRetryMaxAttempts > 0`, both delays must be > 0 (was `>= 0`). Prevents a 0-delay spin-loop when a user enables retries without setting backoff bounds. - pkg/clickhouse/telemetry/metrics.go: drop the vestigial `consumoor` fallback in `NewMetrics`; an empty subsystem now falls back to `clickhouse`. All callers already pass an explicit value. - pkg/output/clickhouse/clickhouse.go: introduce `chWriter` interface for the runtime methods the sink uses (Start/Stop/FlushTableEvents) so unit tests can substitute a stub. Production wiring unchanged. - pkg/output/clickhouse/clickhouse_test.go: add unit tests covering the seven `HandleNewDecoratedEvents` branches — empty slice, all-nil events, filter-drops, StatusErrored with and without restrictPrefixes, flush-failure propagation, InvalidEvents-only, and routing fan-out. - cannon-smoke-test.yaml: matrix the existing job over `output_type` so the new direct-CH path is exercised in CI alongside the existing xatu→kafka→consumoor path.
After the existing per-table assertion against xatu-clickhouse-01 (which cannon dials directly), re-run the same query against xatu-clickhouse-02 with select_sequential_consistency=1. Exercises the full durability chain — Distributed forward queue → shard insert → quorum replication — and surfaces any case where cannon's checkpoint advanced for rows that aren't durable on a non-write replica. Conditioned on the clickhouse matrix variant only; the xatu path goes via consumoor and has its own at-least-once semantics that aren't in scope for this check.
…replica step The previous run hung silently because `clickhouse-client` was waiting for auth on stdin. Mirror the existing assertion script's pattern: `-u default --password ""` plus `</dev/null` for safety. Also use `--select_sequential_consistency=1` (`=` form) and tighten the retry budget — 15×2s instead of 30×2s — now that the happy-path query returns immediately.
The previous "cross-replica durability" step was structurally wrong for this docker-compose: cluster_2S_1R has no actual replicas (1 replica per shard), so SELECT … select_sequential_consistency=1 returned empty for every table even when data was correctly inserted and forwarded. Replace with a direct test for the actual failure mode the user is worried about in this topology: the Distributed forward queue. Cannon writes to a Distributed table, which acks once rows are appended to the local on-disk forward dir; the actual forward to the target shard happens async. If that queue is stuck (network, schema mismatch on target shard, …) cannon's checkpoint advances for rows still in the queue, and a node crash loses them. The new step waits briefly, then asserts `SELECT sum(data_files), sum(error_count) FROM system.distribution_queue` filtered to `canonical_*` is (0, 0). Also dumps a per-table breakdown on failure for diagnostics.
Move two cannon-knows-this fields out of user-facing YAML: - metricsSubsystem (always "cannon" for the cannon binary) - restrictToTablePrefixes (always [canonical_] — cannon's derivers emit canonical_* events by definition) Cannon's CreateSinks now constructs the clickhouse sink directly when the type matches, mirroring output.NewSink's unmarshal+defaults flow but slotting in cannon's defaults between the YAML decode and the sink constructor. User-supplied values still win. Sink config godocs updated to remove the cannon-specific framing — the sink itself stays binary-agnostic; the binary is responsible for its own subsystem name and prefix taxonomy. Smoke test config drops both fields, so CI now exercises the actual minimal user-facing surface (\`dsn:\` plus the in-CI-only adaptive limiter override).
The deriver-callback handler used to call sinks sequentially with fail-fast. For dual-write configs (e.g. cannon → xatu-server AND cannon → clickhouse) this meant total wall time was the sum of all sink latencies, and a slow sibling held up the rest. Replace with a concurrent fan-out using sync.WaitGroup + errors.Join: - All sinks attempted on every call regardless of whether siblings failed. A flaky sink no longer prevents healthy siblings from making progress on an attempt. The deriver still gates checkpoint advance on a nil return, so a single failure forces an epoch retry against every sink — ReplacingMergeTree absorbs the duplicate writes on the previously-succeeded paths. - Total wall time = max(sink latencies), not sum. - Joined error surfaces all failures, not just the first one — useful when one sink is consistently failing in dual-write. Single-sink configs hit a fast path that skips the wg/goroutine machinery entirely (2.4 ns/op, 0 allocs benchmarked). Per-call goroutines are the right shape here over persistent workers: cannon's fan-out fires at most a handful of times per second per deriver in steady state, so per-call goroutines (~200 ns spawn) are cheaper than parking persistent workers on channels (~80 ns per send/recv plus 2 KB resident stack each, even when idle). Benchmarked on M4 Max: 1 sink (fast path): 2.4 ns/op 0 allocs 2 sinks: 604 ns/op 6 allocs 5 sinks: 1,425 ns/op 12 allocs 10 sinks: 2,741 ns/op 22 allocs At cannon's ~10 callbacks/sec peak, a 10-sink deployment burns ~27 µs/sec on fan-out overhead — rounding error.
Sibling to ValidateTables. For every registered route, query system.columns for the target table and assert every column the route's batch declares actually exists. Mismatches fail Writer.Start with a per-table breakdown of missing columns. Catches the schema-vs-code skew that otherwise surfaces only at first-INSERT time as `DB::Exception: There is no column 'X' in table` and stalls the deriver's checkpoint while it retries forever. Now: deploy refused at the boundary instead of the next event. Direction asymmetry is intentional. Only "route writes a column the table is missing" is fatal — the other direction (table has columns the route doesn't write) is fine, since those columns are simply not listed in the INSERT and ClickHouse fills them with DEFAULT. That preserves the supported "cannon writes to legacy schema, leaves geo columns empty" semantic. Honors the same failOnMissingTables toggle as ValidateTables — set false to downgrade to warnings.
The xatu→server→kafka→vector→consumoor→CH path is going away. Stop
smoke-testing it and stop bringing up the legacy stack — kafka,
vector, sentry-logs, tempo, prometheus, grafana, nginx, init-kafka,
kafka-storage-format are all off the path for the cannon→CH
direct sink and were the source of the startup-race CI flakes
("xatu-clickhouse-02 exit 138", vector readiness timeouts) we kept
re-running to dodge.
The job now lists three leaf services for docker compose up:
xatu-postgres-migrator, xatu-server, xatu-clickhouse-migrator. Their
depends_on chains pull in postgres + zookeepers + clickhouse-01/02.
xatu-server runs only as the coordinator gRPC endpoint cannon uses
to manage cannon_location checkpoints in postgres.
Cannon config is unconditionally the clickhouse output. The
Distributed-forward-queue-drained assertion runs unconditionally.
…annon Slim-stack regression: docker compose up returns as soon as containers are created, but the clickhouse migration is a separate one-shot container that takes ~5-10s to apply ~5000 lines of DDL. In the old full stack, init-kafka and other downstream services pinned via depends_on: condition: service_completed_successfully delayed cannon indirectly; with those services gone, cannon started immediately, ran ValidateTables/ValidateColumns against an empty schema, and exited fatal with "17 registered route table(s) missing". Add an explicit wait on xatu-clickhouse-migrator's exit status before proceeding past Wait-for-Xatu-stack. Polls every 2s up to 120s. Fails loudly with the migrator's logs if it errored or didn't finish.
mattevans
added a commit
that referenced
this pull request
May 8, 2026
* master: (35 commits) feat(discovery): support bootstrapRpcUrl for RPC-backed Status (#815) cannon: add direct-to-clickhouse output sink (#808) [codex] improve execution mimicry peering (#811) fix(kafka): exporter size check should use MaxMessageBytes, not FlushBytes (#810) fix(kafka): make Producer.MaxMessageBytes configurable (#809) ci(smoke): probe vector-http-kafka via host port (busybox has no /dev/tcp) ci(govulncheck): whitelist unfixable docker/docker server-side vulns ci: pick latest Go for govulncheck, wait for vector-http-kafka in smoke test consumoor: remove kzg_commitments array from data column sidecar table consumoor: add clientId config for shared Kafka client-id fix attestation order by fix: bump grpc to v1.79.3 for GO-2026-4762, fix buf action hash consumoor: fix lint issues from CI consumoor: fix review findings — config strictness, failure isolation, watcher bug yeah (#804) consumoor: drop intentionally unsupported events; NAK unknown routes consumoor: fix error handling gaps — DLQ invalid events, permanent flatten errors, group retry consumoor: log bad events in JSON form for invalid events and flatten errors consumoor: add rate-limited logging, unknown event warnings, and invalid event diagnostics fix(consumoor): align data transformations with Vector pipeline ...
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Why
For cannon's canonical-table events the central server adds nothing the canonical routes consume — `meta.client.ethereum.network.name` is the only field they read, and cannon already populates it. The xatu-server-and-Kafka hop is pure overhead for this slice.
Notes