[catalog] fix quadratic consolidation during catalog sync#36217
[catalog] fix quadratic consolidation during catalog sync#36217mtabebe merged 2 commits intoMaterializeInc:mainfrom
Conversation
SQL-159 / incident-970 Problem: There was O(K * N log N) behavior in catalog sync where consolidate() was called on the entire snapshot for each of K timestamps. This was the root cause of incident-970: a read-only envd restarted due to DDL in the old env, and during the restart more DDL happened across many timestamps, causing the quadratic consolidation to take ~1000s. Solution: Now consolidate is called once after all timestamps are processed, making sync O(N log N). This is correct because the snapshot is append-only during the sync loop, andl nothing else reads from it between timestamps. Therefore, deferring consolidation to the end produces the same final state Add snapshot_consolidations metric to track consolidation passes Test: - Writer creates 100 databases across 100 distinct timestamps - Reader syncs through all timestamps - Asserts exactly 1 consolidation during sync (was 100 before the fix)
There was a problem hiding this comment.
Nightly triggered: https://buildkite.com/materialize/nightly/builds/16184
Edit: Green
There was a problem hiding this comment.
During a 0dt deployment, the read-only envd's memory usage grows proportionally to the number of DDL timestamps replayed during catch-up, not the size of the live catalog. I'm not sure if that's bad enough to warrant a change. Reproducer:
diff --git a/src/catalog/src/durable/metrics.rs b/src/catalog/src/durable/metrics.rs
index e58ada644d..0c9e862645 100644
--- a/src/catalog/src/durable/metrics.rs
+++ b/src/catalog/src/durable/metrics.rs
@@ -12,7 +12,7 @@
use mz_ore::metric;
use mz_ore::metrics::{IntCounter, MetricsRegistry};
use mz_ore::stats::histogram_seconds_buckets;
-use prometheus::{Counter, Histogram, IntGaugeVec};
+use prometheus::{Counter, Histogram, IntGauge, IntGaugeVec};
#[derive(Debug, Clone)]
pub struct Metrics {
@@ -26,6 +26,7 @@ pub struct Metrics {
pub collection_entries: IntGaugeVec,
pub allocate_id_seconds: Histogram,
pub snapshot_consolidations: IntCounter,
+ pub snapshot_max_entries: IntGauge,
}
impl Metrics {
@@ -74,6 +75,14 @@ impl Metrics {
name: "mz_catalog_snapshot_consolidations",
help: "Count of snapshot consolidation passes.",
)),
+ snapshot_max_entries: registry.register(metric!(
+ name: "mz_catalog_snapshot_max_entries",
+ help: "High-water mark of entries in the unconsolidated in-memory \
+ snapshot since process start. Reflects the transient memory \
+ footprint of catalog sync/catch-up: it can exceed the live \
+ catalog size when sync_inner replays many retract+insert \
+ pairs across timestamps before consolidating.",
+ )),
}
}
}
diff --git a/src/catalog/src/durable/persist.rs b/src/catalog/src/durable/persist.rs
index 638e006aa0..c093b0b196 100644
--- a/src/catalog/src/durable/persist.rs
+++ b/src/catalog/src/durable/persist.rs
@@ -665,6 +665,16 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
}
}
+ // Track the high-water mark of the unconsolidated snapshot. During
+ // sync_inner this metric climbs with replayed history rather than
+ // live state, because consolidate() is deferred to the end of the
+ // whole sync. Tests use this to assert the memory footprint scales
+ // as expected under catch-up workloads.
+ let len = i64::try_from(self.snapshot.len()).unwrap_or(i64::MAX);
+ if len > self.metrics.snapshot_max_entries.get() {
+ self.metrics.snapshot_max_entries.set(len);
+ }
+
errors.sort();
if let Some(err) = errors.into_iter().next() {
return Err(err);
diff --git a/src/catalog/tests/read-write.rs b/src/catalog/tests/read-write.rs
index 4a26c9ae59..b3b4d48265 100644
--- a/src/catalog/tests/read-write.rs
+++ b/src/catalog/tests/read-write.rs
@@ -18,8 +18,8 @@ use mz_audit_log::{EventDetails, EventType, EventV1, IdNameV1, VersionedEvent};
use mz_catalog::durable::objects::serialization::proto;
use mz_catalog::durable::objects::{DurableType, IdAlloc};
use mz_catalog::durable::{
- CatalogError, DurableCatalogError, FenceError, Item, Metrics, TestCatalogStateBuilder,
- USER_ITEM_ALLOC_KEY, test_bootstrap_args,
+ CatalogError, Database, DurableCatalogError, FenceError, Item, Metrics,
+ TestCatalogStateBuilder, USER_ITEM_ALLOC_KEY, test_bootstrap_args,
};
use mz_ore::assert_ok;
use mz_ore::collections::HashSet;
@@ -645,3 +645,83 @@ async fn test_persist_sync_consolidation_not_quadratic() {
Box::new(writer).expire().await;
Box::new(reader).expire().await;
}
+
+/// Reader's unconsolidated snapshot during `sync_inner` must stay bounded by
+/// live state, not replayed history. See review of 7da7861c8b.
+#[mz_ore::test(tokio::test)]
+#[cfg_attr(miri, ignore)]
+async fn test_persist_sync_snapshot_stays_bounded_under_churn() {
+ let persist_client = PersistClient::new_for_tests().await;
+ let metrics = Arc::new(Metrics::new(&MetricsRegistry::new()));
+ let state_builder = TestCatalogStateBuilder::new(persist_client)
+ .with_default_deploy_generation()
+ .with_metrics(Arc::clone(&metrics));
+
+ let mut writer = state_builder
+ .clone()
+ .unwrap_build()
+ .await
+ .open(SYSTEM_TIME().into(), &test_bootstrap_args())
+ .await
+ .unwrap()
+ .0;
+ let _ = writer.sync_to_current_updates().await.unwrap();
+
+ let mut txn = writer.transaction().await.unwrap();
+ let (db_id, db_oid) = txn
+ .insert_user_database("churn_db", RoleId::User(1), Vec::new(), &HashSet::new())
+ .unwrap();
+ let _ = txn.get_and_commit_op_updates();
+ let commit_ts = txn.upper();
+ txn.commit(commit_ts).await.unwrap();
+
+ let mut reader = state_builder
+ .unwrap_build()
+ .await
+ .open_read_only(&test_bootstrap_args())
+ .await
+ .unwrap();
+ let _ = reader.sync_to_current_updates().await.unwrap();
+ let peak_before = metrics.snapshot_max_entries.get();
+
+ let num_renames: u64 = 200;
+ let mut db = Database {
+ id: db_id,
+ oid: db_oid,
+ name: "churn_db".to_string(),
+ owner_id: RoleId::User(1),
+ privileges: Vec::new(),
+ };
+ for i in 0..num_renames {
+ let mut txn = writer.transaction().await.unwrap();
+ db.name = format!("churn_db_{i}");
+ txn.update_database(db.id, db.clone()).unwrap();
+ let _ = txn.get_and_commit_op_updates();
+ let commit_ts = txn.upper();
+ txn.commit(commit_ts).await.unwrap();
+ }
+
+ let _ = reader.sync_to_current_updates().await.unwrap();
+
+ let snapshot = reader.snapshot().await.unwrap();
+ let churn_dbs: Vec<_> = snapshot
+ .databases
+ .values()
+ .filter(|d| d.name.starts_with("churn_db"))
+ .collect();
+ assert_eq!(churn_dbs.len(), 1, "{churn_dbs:#?}");
+ assert_eq!(churn_dbs[0].name, format!("churn_db_{}", num_renames - 1));
+
+ let peak_after = metrics.snapshot_max_entries.get();
+ let peak_delta = peak_after - peak_before;
+ let bounded = i64::try_from(num_renames / 4).expect("fits in i64");
+ assert!(
+ peak_delta < bounded,
+ "peak unconsolidated snapshot grew by {peak_delta} over {num_renames} \
+ renames (peak_before={peak_before}, peak_after={peak_after}); \
+ expected < {bounded}"
+ );
+
+ Box::new(writer).expire().await;
+ Box::new(reader).expire().await;
+}Run with: cargo test -p mz-catalog --test read-write test_persist_sync_snapshot_stays_bounded_under_churn:
thread 'test_persist_sync_snapshot_stays_bounded_under_churn' (3903602) panicked at src/catalog/tests/read-write.rs:718:5:
peak unconsolidated snapshot grew by 387 over 200 renames (peak_before=56, peak_after=443); expected < 50
aljoscha
left a comment
There was a problem hiding this comment.
Excellent! It's nice that you added the metric and the test
| snapshot consolidations, suggesting quadratic behavior" | ||
| ); | ||
|
|
||
| Box::new(writer).expire().await; |
There was a problem hiding this comment.
I assume rust async shenanigans?
This seems ok to me. I think the absolute data sizes that are involved in each catalog object are pretty small. Also, we usually have plenty of free envd memory, including swap. E.g., envd swap usage was 0 bytes across the entire prod us-east-1 for the past several days. (Also, I wouldn't really expect users doing a hot loop that keeps renaming the same object again and again.) |
| // Consolidate once after all timestamps, not per-timestamp. This is safe because | ||
| // nothing in the loop reads from self.snapshot: the update_applier maintains its | ||
| // own internal state (configs, settings, fence tokens) independently. Consolidating | ||
| // per-timestamp was O(K * N log N); consolidating once is O(N log N). |
There was a problem hiding this comment.
Alternatively, consolidate whenever we double the size compared to the last consolidated state. This way you still get the amortized
Follow-up to the quadratic consolidation fix in previous commit. While the original fix moved consolidation out of the per-timestamp loop, the snapshot could still grow unboundedly during catch-up when many timestamps churn the same objects (each rename adds a retract+insert pair). Now sync_inner consolidates when the snapshot doubles in size, keeping memory within ~2x the live catalog size while maintaining amortized work. Validated with: - snapshot_max_entries metric (high-water mark of unconsolidated snapshot) - Test that 200 renames of the same database keep the snapshot bounded
SQL-159 / incident-970
Problem:
There was O(K * N log N) behavior in catalog sync where consolidate() was called on the entire snapshot for each of K timestamps.
This was the root cause of incident-970: a read-only envd restarted due to DDL in the old env, and during the restart more DDL happened across many timestamps, causing the quadratic consolidation to take ~1000s.
Solution:
Now consolidate is called once after all timestamps are processed, making sync O(N log N).
This is correct because the snapshot is append-only during the sync loop, andl nothing else reads from it between timestamps. Therefore, deferring consolidation to the end produces the same final state
Add snapshot_consolidations metric to track consolidation passes
Test: