catalog: unified object arrangement across clusters and replicas#36214
catalog: unified object arrangement across clusters and replicas#36214leedqin merged 1 commit intoMaterializeInc:mainfrom
Conversation
1b5d737 to
05915ff
Compare
def-
left a comment
There was a problem hiding this comment.
After restarting Materialize, mz_object_arrangement_size_history can contain partial samples (collection_timestamps with rows for only a subset of the expected (object_id, replica_id) pairs) during the rehydration window. I'm not sure how bad that is?
With this test:
diff --git a/test/restart/mzcompose.py b/test/restart/mzcompose.py
index c8e4d4ab17..872619992d 100644
--- a/test/restart/mzcompose.py
+++ b/test/restart/mzcompose.py
@@ -976,6 +976,107 @@ def workflow_user_id_no_reuse_after_restart(c: Composition) -> None:
c.sql("DROP TABLE idreuse_t1")
+def workflow_arrangement_sizes_hydration_gate(c: Composition) -> None:
+ """Regression test: arrangement-sizes hydration gate must hold across restart.
+
+ The race between the introspection-subscribe install and the
+ arrangement-size collection timer is narrow, so we repeat the
+ kill/up/observe cycle several times to keep the test reliable.
+ """
+
+ num_replicas = 2
+ arranged_names = tuple(f"mv{i}" for i in range(1, 11)) + tuple(
+ f"idx{i}" for i in range(1, 11)
+ )
+ expected_count = len(arranged_names) * num_replicas
+ name_filter = "(" + ", ".join(f"'{n}'" for n in arranged_names) + ")"
+ restart_rounds = 5
+
+ c.down(destroy_volumes=True)
+
+ with c.override(
+ Materialized(
+ additional_system_parameter_defaults={
+ "arrangement_size_collection_interval": "500ms",
+ },
+ sanity_restart=False,
+ )
+ ):
+ c.up("materialized")
+
+ mv_ddls = "\n".join(
+ f"CREATE MATERIALIZED VIEW mv{i} IN CLUSTER test AS "
+ f"SELECT a % {i + 1} AS k, SUM(b) AS s FROM t GROUP BY a % {i + 1};"
+ f"CREATE INDEX idx{i} IN CLUSTER test ON mv{i} (k);"
+ for i in range(1, 11)
+ )
+ c.sql(
+ dedent(
+ f"""
+ CREATE CLUSTER test SIZE 'scale=1,workers=1', REPLICATION FACTOR {num_replicas};
+ CREATE TABLE t (a int, b int);
+ INSERT INTO t SELECT g, g % 100 FROM generate_series(1, 100000) g;
+ {mv_ddls}
+ """
+ )
+ )
+
+ def wait_for_full_sample() -> str:
+ deadline = time.time() + 120
+ while time.time() < deadline:
+ rows = c.sql_query(
+ f"""
+ SELECT max(collection_timestamp)::text FROM (
+ SELECT collection_timestamp
+ FROM mz_internal.mz_object_arrangement_size_history h
+ JOIN mz_objects o ON o.id = h.object_id
+ WHERE o.name IN {name_filter}
+ GROUP BY collection_timestamp
+ HAVING count(*) = {expected_count}
+ ) AS s
+ """
+ )
+ if rows and rows[0][0] is not None:
+ return rows[0][0]
+ time.sleep(0.5)
+ raise UIError(
+ f"timed out waiting for a sample with {expected_count} rows"
+ )
+
+ all_partials: list[tuple[int, str, int]] = []
+ for round_idx in range(restart_rounds):
+ pre_
restart_ts = wait_for_full_sample()
+
+ c.kill("materialized")
+ c.up("materialized")
+
+ time.sleep(8)
+
+ samples = c.sql_query(
+ f"""
+ SELECT collection_timestamp::text, count(*)
+ FROM mz_internal.mz_object_arrangement_size_history h
+ JOIN mz_objects o ON o.id = h.object_id
+ WHERE o.name IN {name_filter}
+ AND collection_timestamp > '{pre_restart_ts}'::timestamptz
+ GROUP BY collection_timestamp
+ ORDER BY collection_timestamp
+ """
+ )
+ assert (
+ samples
+ ), f"round {round_idx}: no post-restart collection_timestamps found"
+
+ for ts, cnt in samples:
+ if cnt != expected_count:
+ all_partials.append((round_idx, ts, cnt))
+
+ assert not all_partials, (
+ f"{len(all_partials)} post-restart samples (across {restart_rounds} "
+ f"rounds) had row counts != {expected_count}: {all_partials}"
+ )
+
+
def workflow_default(c: Composition) -> None:
def process(name: str) -> None:
if name == "default":We can run bin/mzcompose --find restart down && bin/mzcompose --find restart run arrangement-sizes-hydration-gate:
==> mzcompose: test case workflow-arrangement-sizes-hydration-gate failed: builtins.AssertionError: 5 post-restart samples (across 5 rounds) had row counts != 40: [(1, '2026-04-23 04:24:37.167+00', 20), (2, '2026-04-23 04:24:55.667+00', 19), (2, '2026-04-23 04:24:56.408+00', 38), (3, '2026-04-23 04:25:11.167+00', 3), (3, '2026-04-23 04:25:11.877+00', 7)]
Traceback (most recent call last):
File "/home/deen/git/materialize2/misc/python/materialize/mzcompose/composition.py", line 696, in test_case
yield
File "/home/deen/git/materialize2/misc/python/materialize/cli/mzcompose.py", line 857, in handle_composition
composition.workflow(
~~~~~~~~~~~~~~~~~~~~^
workflow_name, *args.unknown_subargs[1:], *extra_args
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
)
^
File "/home/deen/git/materialize2/misc/python/materialize/mzcompose/composition.py", line 584, in workflow
func(self)
~~~~^^^^^^
File "/home/deen/git/materialize2/test/restart/mzcompose.py", line 1074, in workflow_arrangement_sizes_hydration_gate
assert not all_partials, (
^^^^^^^^^^^^^^^^
AssertionError: 5 post-restart samples (across 5 rounds) had row counts != 40: [(1, '2026-04-23 04:24:37.167+00', 20), (2, '2026-04-23 04:24:55.667+00', 19), (2, '2026-04-23 04:24:56.408+00', 38), (3, '2026-04-23 04:25:11.167+00', 3), (3, '2026-04-23 04:25:11.877+00', 7)]
mzcompose: error: at least one test case failed
antiguru
left a comment
There was a problem hiding this comment.
Left some comments inline. I wonder about the < 10Mib/> 10MiB split. Small objects can have just as much churn as larger ones, why treat them differently? What is the problem we're trying to solve for?
| SELECT operator_id FROM mz_introspection.mz_arrangement_heap_size_raw | ||
| UNION ALL | ||
| SELECT operator_id FROM mz_introspection.mz_arrangement_batcher_size_raw |
There was a problem hiding this comment.
It's a bit unfortunate you'll need to go through the raw collections, but right now this seems to be the best approach. The views we define on top of the raw collections do more than what you need here, and certainly will be more expensive to maintain.
| SELECT operator_id FROM mz_introspection.mz_arrangement_batcher_size_raw | ||
| ) AS rs ON rs.operator_id = od.operator_id | ||
| GROUP BY ce.export_id | ||
| OPTIONS (AGGREGATE INPUT GROUP SIZE = 1000) |
There was a problem hiding this comment.
Count don't need the aggregate input group size annotation (it's only for hierarchical reductions, not for simple ones.)
| introspection_type: IntrospectionType::ComputeObjectArrangementSizes, | ||
| sql: "SUBSCRIBE ( | ||
| SELECT | ||
| ce.export_id AS object_id, |
There was a problem hiding this comment.
Note this bakes in an assumption that's not guaranteed to be true in the future, which is that one dataflow has one export. If it'd change, we'd double-count. Just calling it out. Once it happens, we'd need to re-think what the granularity for this information is instead.
| // Objects smaller than 10 MiB report exact bytes; larger ones are rounded | ||
| // to the nearest 10 MiB to suppress byte-level churn in the collection. |
There was a problem hiding this comment.
This will have a lot of noise independently of the 10MiB gate. Running the subscribe on an otherwise empty cluster shows an update for at least one export every second. You could filter temporary exports t*, as they won't be reflected anywhere, but that doesn't solve the problem.
| let interval_duration = mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_COLLECTION_INTERVAL | ||
| .get(self.catalog().system_config().dyncfgs()); |
There was a problem hiding this comment.
Make sure the collection task reacts to change to the interval (without going through an envd restart)
| // the full collection interval, so the first snapshot can lag | ||
| // hydration by up to one interval. | ||
| if !self.arrangement_sizes_hydration_observed { | ||
| if !self.check_arrangement_sizes_hydration().await { |
There was a problem hiding this comment.
In the limit this'll mean we never write any data as there can always be an object somewhere that isn't hydrated. This checks for all objects, correct? Alternatively, just write down the size for objects that are hydrated.
Why gate on hydration status?
There was a problem hiding this comment.
yes! I think this was just overly defensive and I assumed that it would make sense to care about the hydration across all replicas for that object. But I removed it so now i am writing down the size for objects that are hydrated.
| if *diff != 1 { | ||
| continue; | ||
| } | ||
| let datums = row.unpack(); |
There was a problem hiding this comment.
Use a DatumVecBorrow here too.
| /// Interval at which to collect per-object arrangement size snapshots for the history table. | ||
| pub const ARRANGEMENT_SIZE_COLLECTION_INTERVAL: Config<Duration> = Config::new( | ||
| "arrangement_size_collection_interval", | ||
| Duration::from_secs(60 * 60), |
There was a problem hiding this comment.
| Duration::from_secs(60 * 60), | |
| Duration::from_hours(1), |
| /// How long to retain per-object arrangement size history. | ||
| pub const ARRANGEMENT_SIZE_RETENTION_PERIOD: Config<Duration> = Config::new( | ||
| "arrangement_size_retention_period", | ||
| Duration::from_secs(7 * 24 * 60 * 60), |
There was a problem hiding this comment.
| Duration::from_secs(7 * 24 * 60 * 60), | |
| Duration::from_hours(7 * 24), |
| Duration::from_secs(7 * 24 * 60 * 60), | ||
| "How long to retain per-object arrangement size history.", | ||
| ); | ||
|
|
There was a problem hiding this comment.
Add a sentinel value (0s?) to disable the collection.
@antiguru That makes sense! I was reasoning about absolute churn rather than relative and the < 10 MiB exact-bytes branch was preserving precision for smaller object. I see that was the wrong abstraction. I am going to drop sub -10 MiB objects from mz_object_arrangments to help reduce the churn. |
05915ff to
edb42b2
Compare
|
@def- The test was happening due to a flag on hydration that is removed now Added this regression test 'edb42b2#diff-f65a1b69f0440a372762d0aeb6b80dcc2decd2ff75287c4961449abffa15964' |
| # Shorten arrangement-sizes cadence so `arrangement-sizes.td` can | ||
| # verify snapshots land within the testdrive timeout rather than | ||
| # the 1h default. Harmless for every other testdrive file. | ||
| "arrangement_size_collection_interval": "2s", | ||
| } |
There was a problem hiding this comment.
You can override the flag in the testdrive file, too, to avoid scope creep.
| pub const ARRANGEMENT_SIZE_COLLECTION_INTERVAL: Config<Duration> = Config::new( | ||
| "arrangement_size_collection_interval", | ||
| Duration::from_hours(1), | ||
| "Interval at which to collect and snapshot per-object arrangement sizes.", | ||
| ); | ||
|
|
||
| /// How long to retain per-object arrangement size history. | ||
| pub const ARRANGEMENT_SIZE_RETENTION_PERIOD: Config<Duration> = Config::new( | ||
| "arrangement_size_retention_period", | ||
| Duration::from_hours(7 * 24), | ||
| "How long to retain per-object arrangement size history.", | ||
| ); |
There was a problem hiding this comment.
Unfortunately we're not great at namespacing our feature flags. Do you think you could find a prefix for the two flags you're introducing to indicate it's arrangement size logging in adapter? There's also the mz_introspection arrangement size infrastructure, and I'd like to avoid any confusion between the two.
| // `mz_compute_hydration_times` reports `time_ns IS NULL` for objects | ||
| // still building, so we collect only the pairs with a non-null time. |
There was a problem hiding this comment.
It might be interesting to report their size during hydration, right?
There was a problem hiding this comment.
Made this change in this latest commit: a5f46ff
42c5ff2 to
a5f46ff
Compare
a05c3fd to
678a454
Compare
Implements mz_internal.mz_object_arrangement_sizes as a unified introspection source that continuously reflects arrangement memory per compute object across all replicas. Queryable from mz_catalog_server without session variables. The subscribe query uses mz_dataflow_addresses with address[1] extraction to map operators to dataflows, then joins with mz_compute_exports to get export IDs. Aggregate per-object memory with hybrid quantization: real precision for objects <10MB to enable precise monitoring of small objects, rounded to nearest 10MB for larger objects to suppress byte-level differential churn. Includes: - IntrospectionType::ComputeObjectArrangementSizes variant - BuiltinSource with columns (replica_id, object_id, size) - BuiltinIndex on replica_id for fast replica-scoped lookups - Storage controller wiring for differential collection management - SubscribeSpec with optimized query and OPTIONS hint This is the live table (differential collection) portion of the unified per-object arrangement sizes feature. History table and periodic collection tasks follow in subsequent commits. Refs: CNS-42 Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com> Add mz_object_arrangement_size_history with periodic collection Introduces a builtin history table that records periodic snapshots of mz_object_arrangement_sizes so users can see arrangement memory usage over time. What's added: * The history table and two indexes (on object_id and collection_timestamp). * Dyncfgs for collection interval (1h) and retention (7d). * A coordinator task that snapshots the live table every interval and appends rows, plus a startup pruner that drops rows past retention. * Metrics for collection latency and rows written. Notes for review: * Collection timing is offset by organization_id so environments don't all collect at the same instant. * A schema-migration guard prevents the history table from being truncated, which would break the pruner. dd hydration gate and tests for arrangement-size collection * Gate the first snapshot on every compute object being hydrated, so early history rows don't record partial sizes. Sticky flag. * Extract pruner cutoff and hydration check into pure helpers, each covered by new unit tests. * New sqllogictest for the 10 MiB quantization formula. * New testdrive for live + history behavior, plus a 2s cadence default in the testdrive harness so it fits the timeout. Lint fixes Address review on per-object arrangement sizes * Replace the sticky `arrangement_sizes_hydration_observed` flag with a per-row filter that joins `mz_compute_hydration_times` per snapshot. * Drop sub-10 MiB objects from the subscribe (`HAVING COUNT(*) >= 10485760`) and filter transient `t*` exports to suppress idle-cluster churn. * Cap collection-timer sleeps at 60s and treat `0s` as a disable sentinel so dyncfg changes take effect without an envd restart. * Use `DatumVec::borrow_with` instead of `Row::unpack` in the snapshot path. * Add a regression test (`test/restart`) and limits-test scenario (`test/limits`) covering the snapshot path under restart and at scale. Address PR comments and fix tests: * Namespace dyncfgs as `arrangement_size_history_*` to disambiguate from existing mz_introspection arrangement-size logging. * Move the testdrive collection-interval override out of the global defaults into `arrangement-sizes.td`. * Bump test data so each index clears the 10 MiB floor; rewrite SLT expectations and `catalog.td` for the new builtins/OIDs/counts. Tag hydration_complete on each arrangement size history row: * Replace the per-snapshot drop of un-hydrated `(replica_id, object_id)` pairs with a per-row flag derived from `mz_compute_hydration_times`, so mid-build pairs stay observable. * Consumers wanting only stable sizes can filter `WHERE hydration_complete`. * Update the catalog column comments, the `test/restart` regression test, and the testdrive/SLT expectations for the new column.
678a454 to
a1febfc
Compare
|
I just noticed Nightly was not run for this, but should have been! https://ci.dev.materialize.com/trigger |
Mostly follow-ups to MaterializeInc#36214, except the pg-cdc flake which is old
Motivation
We need to query per object memory across clusters and replicas from the catalog for Maintained Objects UI. There is no performant way of doing that without setting up session variables for each query against introspection. This allows the console to get all the object arrangements in the current moment and get historical data as well. This would also help make some other introspection queries querying object arrangements more simple in other parts of the console.
Design doc: doc/developer/design/20260331_unified_object_arrangement_sizes.md.
Linear issue: https://linear.app/materializeinc/issue/CNS-42/per-object-memory-arrangement-for-maintained-objects-in-the-console
Description
Adds per-object arrangement size tracking, following the introspection
subscribe pattern (same as
mz_compute_hydration_timesandmz_compute_error_counts_raw_unified): a livemz_internal.mz_object_arrangement_sizessource populated by aper-replica subscribe, and a
mz_internal.mz_object_arrangement_size_historytable that accumulates hourly snapshots (7-day retention). Both are
queryable from
mz_catalog_serverwith no session variables or clustertargeting.
Catalog surface
mz_internal.mz_object_arrangement_sizes(live) and builtin tablemz_internal.mz_object_arrangement_size_history(with indexes onobject_idandcollection_timestamp).arrangement_size_collection_interval(1h default) andarrangement_size_retention_period(7d default).Runtime
through the storage controller via a new
IntrospectionTypevariant.rows to the history table. Fires at the configured interval with a
per-environment offset so fleets of envs don't collect in lockstep.
period. A migration guard prevents the history table from being
truncated by schema migration, which would confuse the pruner.
objects have hydrated at least once, so early rows can't capture
half-built arrangements.
mz_arrangement_sizes_collection_time_seconds(histogram) andmz_arrangement_sizes_rows_written_total(counter).Tests
Verification
Verified with 10 Rust unit tests (pruner cutoff + hydration check), a
sqllogictest covering the 10 MiB quantization boundary, and a testdrive
end-to-end (live + history + drop retraction). Also ran locally against
a Postgres source with two replicas and confirmed ~300 history rows
accumulate with stable sizes and advancing
collection_timestamp.Verified with 10 Rust unit tests (pruner cutoff + hydration check), a
sqllogictest covering the 10 MiB quantization boundary, and a testdrive
end-to-end (live + history + drop retraction). Also ran locally against
a Postgres source with two replicas and confirmed ~300 history rows
accumulate with stable sizes and advancing
collection_timestamp.Tips For reviewing:
I would recommend reviewing it per commit