Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion beava-website/project/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@
bv.<span style={S.fn}>get</span>(<span style={S.str}>"SiteMetrics"</span>) <span style={S.cmt}># → {`{median_dwell_1h: 6_250, views_today: 14_207, …}`}</span>{'\n'}
{'\n'}
<span style={S.cmt}># Or over HTTP (the panel above is hitting this exact endpoint):</span>{'\n'}
<span style={S.kw}>$</span> curl https://beava.dev/api/features/site
<span style={S.kw}>$</span> curl https://beava.dev/api/get -d '{`{"table":"SiteMetrics","key":""}`}'
</pre>
<div style={{ display: 'grid', gridTemplateColumns: 'repeat(3, 1fr)', gap: 0, borderTop: '1px solid var(--border)', background: '#fff' }}>
{[
Expand Down
62 changes: 52 additions & 10 deletions crates/beava-server/src/apply_shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,35 @@ impl ApplyShard {
};
}

// `force=true` on a destructive payload: pre-remove the
// conflicting descriptors so the apply path treats them
// as new. `execute_register_with_wal` then emits a
// single `RegistryBump` capturing the new payload's
// nodes (and bumps `registry_version`).
if force && !diff.destructive.is_empty() {
// `force=true` carries the explicit "drop existing state and
// replace fully" intent for any descriptor that already
// exists in the registry. We pre-remove every existing
// descriptor that the new payload would change so the
// legacy compute_diff inside `execute_register_with_wal`
// sees them as fully new (no `changed` entries → no
// `registration_conflict`). Two reasons we can't just gate
// on `diff.destructive`:
//
// 1. `classify_register_diff` classifies new fields on an
// existing event source and new aggregations in an
// existing block as `additive`, not destructive. The
// legacy `compute_diff` flags those same shapes as
// `changed: [ConflictDetail]` (schema_mismatch,
// ops_mismatch). Without pre-removal here, the legacy
// path 409s with force silently dropped — the prod
// bug behind the deploy-hetzner failure.
//
// 2. `NewDescriptor` is the only additive variant we
// skip — it's genuinely-new (not in current
// registry), so there's nothing to pre-remove.
//
// Pre-removal drops compiled chains, aggregations, feature
// index entries, and accumulated state for the named
// descriptors. That state loss is the documented force=true
// semantic; callers who want to add a field without losing
// state should send the request without `force` (additive
// path is allowed by `register_check_force_required`).
if force {
let mut to_remove: Vec<String> = Vec::new();
for entry in &diff.destructive {
match entry {
Expand Down Expand Up @@ -297,12 +320,31 @@ impl ApplyShard {
_ => {}
}
}
// Additive-against-existing: the descriptor already
// exists, so the legacy `compute_diff` would flag it
// as `changed` and 409 unconditionally. Pre-remove
// here so it lands as a clean add.
for entry in &diff.additive {
match entry {
beava_core::registry_diff::DiffEntry::NewField { event, .. } => {
to_remove.push(event.clone());
}
beava_core::registry_diff::DiffEntry::NewAgg { table, .. } => {
to_remove.push(table.clone());
}
// `NewDescriptor` is genuinely-new — not in
// current registry, nothing to pre-remove.
_ => {}
}
}
to_remove.sort();
to_remove.dedup();
self.state
.dev_agg
.registry
.force_remove_descriptors(&to_remove);
if !to_remove.is_empty() {
self.state
.dev_agg
.registry
.force_remove_descriptors(&to_remove);
}
}

// Register is cold path: delegate to the async WAL-backed
Expand Down
61 changes: 61 additions & 0 deletions crates/beava-server/tests/phase13_4_force_register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,64 @@ async fn register_destructive_agg_removal_without_force_returns_409() {

ts.shutdown().await.ok();
}

// ─── Test 11 — additive NewField on existing event with force=true succeeds ─
//
// Regression for the production deploy that 409'd after PR #1 merged. The
// pipeline added `session_id` to the `PageView` event source. Two diff
// systems disagreed on the classification:
//
// - `classify_register_diff` (apply_shard pre-flight): NewField on an
// existing event source → `additive`. No destructive entries.
// - `compute_diff` (legacy, inside `execute_register_with_wal`): the
// event source's schema differs → `changed: [ConflictDetail]`,
// unconditionally returns `RegisterOutcome::Conflict` (HTTP 409
// `registration_conflict`) regardless of `force=true` on the wire.
//
// `apply_shard`'s force-handling block at the time of the bug only ran
// `force_remove_descriptors` when `diff.destructive` was non-empty. Pure
// additive changes against existing descriptors slipped through the
// pre-flight, hit the legacy compute_diff inside execute_register, and
// 409'd — even with `force=true` set on the request body.
//
// Fixed behavior: with `force=true`, additive entries that target an
// existing descriptor (NewField on event, NewAgg on table) MUST also be
// pre-removed so execute_register sees them as fully new. The
// caller-explicit `force=true` carries the "drop existing state" intent
// for both destructive AND additive-against-existing changes.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn register_additive_new_field_on_existing_event_with_force_true_succeeds() {
let ts = TestServer::spawn().await.expect("spawn");
let (status_a, body_a) = post_register(&ts, &baseline_payload()).await;
assert!((200..300).contains(&status_a));
let v_a = body_a["registry_version"]
.as_u64()
.unwrap_or_else(|| panic!("baseline response must include registry_version: {body_a:#}"));

// Re-register the SAME shape with one new field added to the existing
// `Tx` event source. `classify_register_diff` will emit
// `additive: [NewField{event:Tx, field:session_id}]`,
// `destructive: []`. With force=true, the request must succeed.
let mut payload_b = baseline_payload();
payload_b["nodes"][0]["schema"]["fields"]["session_id"] = json!("str");
payload_b["force"] = json!(true);

let (status_b, body_b) = post_register(&ts, &payload_b).await;
assert!(
(200..300).contains(&status_b),
"force=true on additive NewField against existing event must succeed, got status={status_b}, body={body_b:#}"
);
assert!(
body_b.get("error").is_none(),
"force=true additive register must not return error envelope, got: {body_b:#}"
);
let v_b = body_b["registry_version"]
.as_u64()
.unwrap_or_else(|| panic!("force=true response must include registry_version: {body_b:#}"));
assert!(
v_b > v_a,
"registry_version must bump after additive force=true apply: was {v_a}, now {v_b}"
);

ts.shutdown().await.ok();
}
Loading