From 60c506d9816838a9c912a4ad84a6235df216be28 Mon Sep 17 00:00:00 2001 From: James Ross Date: Thu, 21 May 2026 11:30:30 -0700 Subject: [PATCH 01/11] feat(core): dispatch installed contract mutations through ticketed ingress --- crates/warp-core/src/coordinator.rs | 62 +++ ...nstalled_contract_intent_pipeline_tests.rs | 379 ++++++++++++++++++ 2 files changed, 441 insertions(+) create mode 100644 crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs diff --git a/crates/warp-core/src/coordinator.rs b/crates/warp-core/src/coordinator.rs index be677be1..25ec703c 100644 --- a/crates/warp-core/src/coordinator.rs +++ b/crates/warp-core/src/coordinator.rs @@ -15,6 +15,8 @@ use crate::engine_impl::{CommitOutcome, Engine, EngineError}; use crate::head::{ HeadEligibility, PlaybackHeadRegistry, RunnableWriterSet, WriterHead, WriterHeadKey, }; +#[cfg(feature = "native_rule_bootstrap")] +use crate::head_inbox::IngressPayload; use crate::head_inbox::{InboxAddress, InboxIngestResult, IngressEnvelope, IngressTarget}; use crate::ident::Hash; use crate::optic_artifact::OpticAdmissionTicket; @@ -102,6 +104,16 @@ pub enum RuntimeError { /// A different admission ticket already staged this witnessed submission. #[error("witnessed submission already has ticketed runtime ingress: {0:?}")] TicketedIngressAlreadyStaged(Hash), + /// Installed contract runtime ingress received malformed canonical intent bytes. + #[error("installed contract invocation is not a canonical EINT intent")] + MalformedInstalledContractIntent, + /// Installed contract runtime ingress named a mutation operation id that no + /// installed package supports. + #[error("unsupported installed contract mutation op id: {op_id}")] + UnsupportedInstalledContractMutation { + /// The unsupported canonical EINT mutation operation id. + op_id: u32, + }, /// Ticketed runtime ingress attempted to claim an envelope that was already /// pending or committed through another ingress path. #[error("ticketed runtime ingress cannot claim duplicate runtime ingress {ingress_id:?} for head {head_key:?}")] @@ -1312,6 +1324,39 @@ impl WorldlineRuntime { Ok(TicketedRuntimeIngressDisposition::Staged { record, ingress }) } + /// Stages a witnessed installed-contract mutation invocation into runtime ingress. + /// + /// This is the package boundary between lawful admission evidence and the + /// scheduler-owned runtime path. It verifies that the canonical EINT mutation + /// operation id is supported by an installed contract package before the work + /// becomes runtime-visible. The method does not tick, dispatch handlers, or + /// execute contracts. + /// + /// # Errors + /// + /// Returns an error when the envelope is not a canonical EINT local intent, + /// no installed contract package supports its mutation operation id, or the + /// underlying ticketed ingress boundary rejects the submission. + #[cfg(feature = "native_rule_bootstrap")] + pub fn ingest_installed_contract_invocation( + &mut self, + authority: &TicketedRuntimeIngressAuthority, + engine: &Engine, + submission_id: Hash, + ticket: &OpticAdmissionTicket, + envelope: IngressEnvelope, + ) -> Result { + let op_id = installed_contract_mutation_op_id(&envelope)?; + if engine + .installed_contract_mutation_package_id(op_id) + .is_none() + { + return Err(RuntimeError::UnsupportedInstalledContractMutation { op_id }); + } + + self.ingest_ticketed_invocation(authority, submission_id, ticket, envelope) + } + /// Resolves an ingress envelope to a specific writer head and stores it in that inbox. /// /// # Errors @@ -1613,6 +1658,14 @@ fn derive_ticketed_runtime_ingress_id( hasher.finalize().into() } +#[cfg(feature = "native_rule_bootstrap")] +fn installed_contract_mutation_op_id(envelope: &IngressEnvelope) -> Result { + let IngressPayload::LocalIntent { intent_bytes, .. } = envelope.payload(); + echo_wasm_abi::unpack_intent_v1(intent_bytes) + .map(|(op_id, _vars)| op_id) + .map_err(|_error| RuntimeError::MalformedInstalledContractIntent) +} + fn derive_scheduler_run_id(next_global_tick: GlobalTick, keys: &[WriterHeadKey]) -> SchedulerRunId { let mut hasher = blake3::Hasher::new(); hasher.update(b"echo.scheduler-run"); @@ -1678,6 +1731,8 @@ fn scheduler_fault_scope_for_error( | RuntimeError::UnknownIntentSubmission(_) | RuntimeError::TicketedIngressSubmissionMismatch(_) | RuntimeError::TicketedIngressAlreadyStaged(_) + | RuntimeError::MalformedInstalledContractIntent + | RuntimeError::UnsupportedInstalledContractMutation { .. } | RuntimeError::TicketedIngressDuplicateRuntimeIngress { .. } => { SchedulerFaultScope::Runtime } @@ -2201,6 +2256,13 @@ fn scheduler_error_cause_digest(err: &RuntimeError) -> Hash { hasher.update(b"ticketed-ingress-already-staged"); hasher.update(submission_id); } + RuntimeError::MalformedInstalledContractIntent => { + hasher.update(b"malformed-installed-contract-intent"); + } + RuntimeError::UnsupportedInstalledContractMutation { op_id } => { + hasher.update(b"unsupported-installed-contract-mutation"); + hasher.update(&op_id.to_le_bytes()); + } RuntimeError::TicketedIngressDuplicateRuntimeIngress { head_key, ingress_id, diff --git a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs new file mode 100644 index 00000000..9fe41f07 --- /dev/null +++ b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs @@ -0,0 +1,379 @@ +// SPDX-License-Identifier: Apache-2.0 +// © James Ross Ω FLYING•ROBOTS +//! Installed contract intent pipeline tests. +#![cfg(all(feature = "native_rule_bootstrap", feature = "host_test"))] + +use echo_registry_api::{ + ArgDef, ContractArtifactVerificationPolicy, ObjectDef, OpDef, OpKind, RegistryInfo, + RegistryProvider, +}; +use warp_core::{ + make_head_id, make_intent_kind, make_node_id, make_type_id, ContractMutationHandler, + ContractPackageIdentity, Engine, EngineBuilder, GraphStore, GraphView, InboxPolicy, + IngressEnvelope, IngressTarget, IntentSubmissionDisposition, NodeId, NodeRecord, + OpticAdmissionTicket, OpticArtifactHandle, PatternGraph, PlaybackMode, ProvenanceService, + RuntimeError, SchedulerCoordinator, SchedulerKind, TickDelta, TicketedRuntimeIngressAuthority, + WorldlineId, WorldlineRuntime, WorldlineState, WriterHead, WriterHeadKey, + OPTIC_ADMISSION_TICKET_KIND, OPTIC_ARTIFACT_HANDLE_KIND, +}; + +const SCHEMA_SHA256_HEX: &str = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"; +const MUTATION_OP_ID: u32 = 1001; +const UNKNOWN_OP_ID: u32 = 9999; +const MUTATION_VARS: &[u8] = b"amount=42"; +const RESULT_TYPE: &str = "test/toy-counter/increment-result"; +const RESULT_BYTES: &[u8] = b"value=42"; +const MUTATION_RULE_NAME: &str = + "cmd/contract/0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef/1001/increment"; + +static INCREMENT_ARGS: &[ArgDef] = &[ArgDef { + name: "input", + ty: "IncrementInput", + required: true, + list: false, +}]; + +static OPS: &[OpDef] = &[OpDef { + kind: OpKind::Mutation, + name: "increment", + op_id: MUTATION_OP_ID, + args: INCREMENT_ARGS, + result_ty: "CounterValue", + directives_json: "{}", + footprint_certificate: None, +}]; + +struct StaticRegistry; + +impl RegistryProvider for StaticRegistry { + fn info(&self) -> RegistryInfo { + RegistryInfo { + codec_id: "cbor-canon-v1", + registry_version: 1, + schema_sha256_hex: SCHEMA_SHA256_HEX, + } + } + + fn op_by_id(&self, op_id: u32) -> Option<&'static OpDef> { + OPS.iter().find(|op| op.op_id == op_id) + } + + fn all_ops(&self) -> &'static [OpDef] { + OPS + } + + fn all_enums(&self) -> &'static [echo_registry_api::EnumDef] { + &[] + } + + fn all_objects(&self) -> &'static [ObjectDef] { + &[] + } +} + +fn empty_engine() -> Engine { + let mut store = GraphStore::default(); + let root = make_node_id("root"); + store.insert_node( + root, + NodeRecord { + ty: make_type_id("world"), + }, + ); + EngineBuilder::new(store, root) + .scheduler(SchedulerKind::Radix) + .workers(1) + .build() +} + +fn package_identity() -> ContractPackageIdentity<'static> { + ContractPackageIdentity { + package_name: "toy-counter", + package_version: "0.1.0", + artifact_hash_hex: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", + } +} + +fn verification_policy() -> ContractArtifactVerificationPolicy<'static> { + ContractArtifactVerificationPolicy { + codec_id: "cbor-canon-v1", + registry_version: 1, + schema_sha256_hex: SCHEMA_SHA256_HEX, + footprint_certificates: &[], + require_mutation_footprint_certificates: false, + } +} + +fn result_node_id(scope: &NodeId) -> NodeId { + let mut hasher = blake3::Hasher::new(); + hasher.update(b"test.installed-contract.pipeline.result-node"); + hasher.update(scope.as_bytes()); + NodeId(hasher.finalize().into()) +} + +fn contract_matches(view: GraphView<'_>, scope: &NodeId) -> bool { + warp_core::eint_vars_for_op(view, scope, MUTATION_OP_ID) == Some(MUTATION_VARS) +} + +fn contract_execute(view: GraphView<'_>, scope: &NodeId, delta: &mut TickDelta) { + if warp_core::eint_vars_for_op(view, scope, MUTATION_OP_ID) != Some(MUTATION_VARS) { + return; + } + let warp_id = view.warp_id(); + let result = result_node_id(scope); + delta.push(warp_core::WarpOp::UpsertNode { + node: warp_core::NodeKey { + warp_id, + local_id: result, + }, + record: NodeRecord { + ty: make_type_id(RESULT_TYPE), + }, + }); + delta.push(warp_core::WarpOp::SetAttachment { + key: warp_core::AttachmentKey::node_alpha(warp_core::NodeKey { + warp_id, + local_id: result, + }), + value: Some(warp_core::AttachmentValue::Atom( + warp_core::AtomPayload::new( + make_type_id(RESULT_TYPE), + bytes::Bytes::copy_from_slice(RESULT_BYTES), + ), + )), + }); +} + +fn contract_footprint(view: GraphView<'_>, scope: &NodeId) -> warp_core::Footprint { + let mut footprint = warp_core::runtime_ingress_eint_read_footprint(view, scope); + let warp_id = view.warp_id(); + let result = result_node_id(scope); + footprint.n_write.insert_with_warp(warp_id, result); + footprint + .a_write + .insert(warp_core::AttachmentKey::node_alpha(warp_core::NodeKey { + warp_id, + local_id: result, + })); + footprint +} + +fn contract_rule() -> warp_core::RewriteRule { + warp_core::RewriteRule { + id: make_type_id(&format!("rule:{MUTATION_RULE_NAME}")).0, + name: MUTATION_RULE_NAME, + left: PatternGraph { nodes: vec![] }, + matcher: contract_matches, + executor: contract_execute, + compute_footprint: contract_footprint, + factor_mask: 0, + conflict_policy: warp_core::ConflictPolicy::Abort, + join_fn: None, + } +} + +fn install_contract(engine: &mut Engine) { + static REGISTRY: StaticRegistry = StaticRegistry; + engine + .install_contract_package(warp_core::InstalledContractPackage { + identity: package_identity(), + registry: ®ISTRY, + verification_policy: verification_policy(), + mutation_handlers: vec![ContractMutationHandler { + op_id: MUTATION_OP_ID, + rule: contract_rule(), + }], + query_observers: vec![], + }) + .expect("contract package should install"); +} + +fn register_head(runtime: &mut WorldlineRuntime, worldline_id: WorldlineId) -> WriterHeadKey { + let key = WriterHeadKey { + worldline_id, + head_id: make_head_id("default"), + }; + runtime + .register_writer_head(WriterHead::with_routing( + key, + PlaybackMode::Play, + InboxPolicy::AcceptAll, + None, + true, + )) + .expect("writer head should register"); + key +} + +fn runtime_store(runtime: &WorldlineRuntime, worldline_id: WorldlineId) -> &GraphStore { + let frontier = runtime + .worldlines() + .get(&worldline_id) + .expect("worldline should exist"); + frontier + .state() + .warp_state() + .store(&frontier.state().root().warp_id) + .expect("frontier store should exist") +} + +fn provenance_for(runtime: &WorldlineRuntime) -> ProvenanceService { + let mut provenance = ProvenanceService::new(); + for (worldline_id, frontier) in runtime.worldlines().iter() { + provenance + .register_worldline(*worldline_id, frontier.state()) + .expect("provenance should register"); + } + provenance +} + +fn admission_ticket(seed: u8) -> OpticAdmissionTicket { + OpticAdmissionTicket { + kind: OPTIC_ADMISSION_TICKET_KIND.to_owned(), + artifact_handle: OpticArtifactHandle { + kind: OPTIC_ARTIFACT_HANDLE_KIND.to_owned(), + id: format!("installed-contract-pipeline-{seed}"), + }, + artifact_hash: format!("artifact-hash-{seed}"), + operation_id: format!("operation-{seed}"), + requirements_digest: format!("requirements-{seed}"), + canonical_variables_digest: vec![seed], + basis_request_digest: [seed; 32], + aperture_request_digest: [seed.wrapping_add(1); 32], + budget_request_digest: [seed.wrapping_add(2); 32], + law_witness_digest: [seed.wrapping_add(3); 32], + ticket_digest: [seed.wrapping_add(4); 32], + } +} + +fn ticketed_authority() -> TicketedRuntimeIngressAuthority { + TicketedRuntimeIngressAuthority::assume_runtime_owner() +} + +fn pipeline_runtime() -> (WorldlineRuntime, Engine, WorldlineId, WriterHeadKey) { + let mut runtime = WorldlineRuntime::new(); + let mut engine = empty_engine(); + install_contract(&mut engine); + let worldline_id = WorldlineId::from_bytes([1; 32]); + runtime + .register_worldline(worldline_id, WorldlineState::empty()) + .expect("worldline should register"); + let head = register_head(&mut runtime, worldline_id); + (runtime, engine, worldline_id, head) +} + +fn eint_envelope(worldline_id: WorldlineId, op_id: u32, vars: &[u8]) -> IngressEnvelope { + IngressEnvelope::local_intent( + IngressTarget::DefaultWriter { worldline_id }, + make_intent_kind("echo.intent/eint-v1"), + echo_wasm_abi::pack_intent_v1(op_id, vars).expect("EINT should pack"), + ) +} + +#[test] +fn installed_contract_mutation_dispatches_only_through_ticketed_scheduler_tick() { + let (mut runtime, mut engine, worldline_id, head) = pipeline_runtime(); + let envelope = eint_envelope(worldline_id, MUTATION_OP_ID, MUTATION_VARS); + let event_id = NodeId(envelope.ingress_id()); + let result = result_node_id(&event_id); + + let submission = match runtime + .submit_intent(envelope.clone()) + .expect("submission should be witnessed") + { + IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, + IntentSubmissionDisposition::Duplicate { .. } => { + panic!("first submission should not be duplicate") + } + }; + + assert!( + runtime_store(&runtime, worldline_id) + .node(&result) + .is_none(), + "witnessed submission must not execute installed contract handlers" + ); + + let ticket = admission_ticket(7); + runtime + .ingest_installed_contract_invocation( + &ticketed_authority(), + &engine, + submission, + &ticket, + envelope, + ) + .expect("package-supported ticketed ingress should stage"); + + assert!( + runtime_store(&runtime, worldline_id) + .node(&result) + .is_none(), + "ticketed runtime ingress must stage work without executing" + ); + + let mut provenance = provenance_for(&runtime); + let records = SchedulerCoordinator::super_tick(&mut runtime, &mut provenance, &mut engine) + .expect("scheduler-owned tick should execute installed contract handler"); + + assert_eq!(records.len(), 1); + assert_eq!(records[0].head_key, head); + assert_eq!(records[0].admitted_count, 1); + let store = runtime_store(&runtime, worldline_id); + assert_eq!( + store.node(&result).map(|record| record.ty), + Some(make_type_id(RESULT_TYPE)) + ); + assert!(matches!( + store.node_attachment(&result), + Some(warp_core::AttachmentValue::Atom(payload)) + if payload.type_id == make_type_id(RESULT_TYPE) + && payload.bytes.as_ref() == RESULT_BYTES + )); + assert!(runtime + .receipt_correlation_for_submission(&submission) + .is_some()); +} + +#[test] +fn unsupported_installed_contract_mutation_cannot_enter_ticketed_runtime_ingress() { + let (mut runtime, engine, worldline_id, head) = pipeline_runtime(); + let envelope = eint_envelope(worldline_id, UNKNOWN_OP_ID, MUTATION_VARS); + let submission = match runtime + .submit_intent(envelope.clone()) + .expect("unsupported submission should still be witnessed") + { + IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, + IntentSubmissionDisposition::Duplicate { .. } => { + panic!("first submission should not be duplicate") + } + }; + let ticket = admission_ticket(8); + + let err = runtime + .ingest_installed_contract_invocation( + &ticketed_authority(), + &engine, + submission, + &ticket, + envelope, + ) + .expect_err("unsupported contract op must not stage runtime ingress"); + + assert!(matches!( + err, + RuntimeError::UnsupportedInstalledContractMutation { + op_id: UNKNOWN_OP_ID + } + )); + assert_eq!(runtime.ticketed_runtime_ingress_count(), 0); + assert_eq!( + runtime + .heads() + .get(&head) + .expect("head should exist") + .inbox() + .pending_count(), + 0 + ); +} From 3daf63834d56457a3562c050afc3c8ac40dc44df Mon Sep 17 00:00:00 2001 From: James Ross Date: Thu, 21 May 2026 11:38:24 -0700 Subject: [PATCH 02/11] feat(core): report intent outcome receipt decisions --- crates/warp-core/src/coordinator.rs | 97 +++++++++++++++++-- crates/warp-core/src/lib.rs | 11 ++- crates/warp-core/tests/inbox.rs | 7 +- ...nstalled_contract_intent_pipeline_tests.rs | 31 ++++-- 4 files changed, 124 insertions(+), 22 deletions(-) diff --git a/crates/warp-core/src/coordinator.rs b/crates/warp-core/src/coordinator.rs index 25ec703c..457a8b9c 100644 --- a/crates/warp-core/src/coordinator.rs +++ b/crates/warp-core/src/coordinator.rs @@ -18,12 +18,13 @@ use crate::head::{ #[cfg(feature = "native_rule_bootstrap")] use crate::head_inbox::IngressPayload; use crate::head_inbox::{InboxAddress, InboxIngestResult, IngressEnvelope, IngressTarget}; -use crate::ident::Hash; +use crate::ident::{Hash, NodeId}; use crate::optic_artifact::OpticAdmissionTicket; use crate::provenance_store::{ HistoryError, ProvenanceCheckpoint, ProvenanceEntry, ProvenanceEventKind, ProvenanceRef, ProvenanceService, ProvenanceStore, ReplayError, }; +use crate::receipt::{TickReceiptDisposition, TickReceiptRejection}; use crate::strand::{ForkBasisRef, Strand, StrandError, StrandId, StrandRegistry, SupportPin}; use crate::worldline::{ApplyError, WorldlineId}; use crate::worldline_registry::WorldlineRegistry; @@ -472,12 +473,39 @@ pub struct ReceiptCorrelationRecord { pub commit_hash: Hash, } +/// Scheduler-owned decision observed for a witnessed intent submission. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum IntentOutcomeDecision { + /// The correlated receipt entry applied during the scheduler-owned tick. + Applied { + /// Entry index inside the correlated tick receipt. + receipt_entry_index: u32, + /// Scheduler rule that produced the entry. + rule_id: Hash, + }, + /// The correlated receipt entry was rejected during the scheduler-owned tick. + Rejected { + /// Entry index inside the correlated tick receipt. + receipt_entry_index: u32, + /// Scheduler rule that produced the entry. + rule_id: Hash, + /// Deterministic rejection reason emitted by the tick receipt. + reason: TickReceiptRejection, + /// Receipt entry indices that blocked this candidate. + blocked_by: Vec, + }, + /// Echo has a receipt correlation, but the retained receipt no longer yields + /// a matching entry for this ingress id. + NoMatchingReceiptEntry { + /// Digest recorded by the correlation index. + tick_receipt_digest: Hash, + }, +} + /// Polling observation for a witnessed intent submission. /// -/// This is intentionally narrower than a final applied/rejected application -/// outcome. Until receipt entries are bound to intent-level semantics, Echo can -/// report whether the submission is unknown, still pending, or decided by a -/// scheduler-owned tick receipt. +/// This is a read-only outcome surface. Echo reports whether the submission is +/// unknown, still pending, or decided by a scheduler-owned tick receipt. #[derive(Clone, Debug, PartialEq, Eq)] pub enum IntentOutcomeObservation { /// Echo has no witnessed submission for the supplied id. @@ -497,7 +525,9 @@ pub enum IntentOutcomeObservation { /// Echo has correlated the submission to a scheduler-owned tick receipt. Decided { /// Scheduler-owned receipt correlation. - correlation: ReceiptCorrelationRecord, + correlation: Box, + /// Applied/rejected decision derived from the retained receipt entry. + decision: IntentOutcomeDecision, }, } @@ -895,8 +925,7 @@ impl WorldlineRuntime { /// Observes the current scheduler-owned outcome posture for a submission. /// /// This is a zero-write polling surface. It does not tick, dispatch - /// handlers, subscribe to streams, or infer applied/rejected semantics from - /// receipt entries. + /// handlers, or subscribe to streams. #[must_use] pub fn observe_intent_outcome(&self, submission_id: &Hash) -> IntentOutcomeObservation { let Some(submission) = self.witnessed_submissions.get(submission_id) else { @@ -906,7 +935,8 @@ impl WorldlineRuntime { }; if let Some(correlation) = self.receipt_correlation_for_submission(submission_id) { return IntentOutcomeObservation::Decided { - correlation: correlation.clone(), + correlation: Box::new(correlation.clone()), + decision: self.intent_outcome_decision(correlation), }; } IntentOutcomeObservation::Pending { @@ -919,6 +949,55 @@ impl WorldlineRuntime { } } + fn intent_outcome_decision( + &self, + correlation: &ReceiptCorrelationRecord, + ) -> IntentOutcomeDecision { + let no_match = || IntentOutcomeDecision::NoMatchingReceiptEntry { + tick_receipt_digest: correlation.tick_receipt_digest, + }; + let Some(tick_index) = correlation.worldline_tick_after.as_u64().checked_sub(1) else { + return no_match(); + }; + let Ok(tick_index) = usize::try_from(tick_index) else { + return no_match(); + }; + let Some(frontier) = self.worldlines.get(&correlation.head_key.worldline_id) else { + return no_match(); + }; + let Some((_snapshot, receipt, _patch)) = frontier.state().tick_history().get(tick_index) + else { + return no_match(); + }; + if receipt.digest() != correlation.tick_receipt_digest { + return no_match(); + } + + let ingress_node = NodeId(correlation.ingress_id); + for (idx, entry) in receipt.entries().iter().enumerate() { + if entry.scope.local_id != ingress_node { + continue; + } + let Ok(receipt_entry_index) = u32::try_from(idx) else { + return no_match(); + }; + return match entry.disposition { + TickReceiptDisposition::Applied => IntentOutcomeDecision::Applied { + receipt_entry_index, + rule_id: entry.rule_id, + }, + TickReceiptDisposition::Rejected(reason) => IntentOutcomeDecision::Rejected { + receipt_entry_index, + rule_id: entry.rule_id, + reason, + blocked_by: receipt.blocked_by(idx).to_vec(), + }, + }; + } + + no_match() + } + /// Returns the current correlation tick. #[must_use] pub fn global_tick(&self) -> GlobalTick { diff --git a/crates/warp-core/src/lib.rs b/crates/warp-core/src/lib.rs index 8a94a74f..04f6069e 100644 --- a/crates/warp-core/src/lib.rs +++ b/crates/warp-core/src/lib.rs @@ -344,11 +344,12 @@ pub use worldline::{ /// Prefer this coordinator/runtime API for new stepping and routing code. pub use coordinator::{ ForkStrandReceipt, ForkStrandRequest, IngressDisposition, IngressSubmissionGeneration, - IntentOutcomeObservation, IntentSubmissionDisposition, IntentSubmissionRecord, - ReceiptCorrelationRecord, RuntimeError, SchedulerCoordinator, SchedulerFaultGeneration, - SchedulerFaultId, SchedulerFaultRecord, SchedulerFaultRecoveryAuthority, SchedulerFaultScope, - SchedulerFaultStatus, SchedulerRunId, StepRecord, TicketedRuntimeIngressAuthority, - TicketedRuntimeIngressDisposition, TicketedRuntimeIngressRecord, WorldlineRuntime, + IntentOutcomeDecision, IntentOutcomeObservation, IntentSubmissionDisposition, + IntentSubmissionRecord, ReceiptCorrelationRecord, RuntimeError, SchedulerCoordinator, + SchedulerFaultGeneration, SchedulerFaultId, SchedulerFaultRecord, + SchedulerFaultRecoveryAuthority, SchedulerFaultScope, SchedulerFaultStatus, SchedulerRunId, + StepRecord, TicketedRuntimeIngressAuthority, TicketedRuntimeIngressDisposition, + TicketedRuntimeIngressRecord, WorldlineRuntime, }; /// Writer-head registry and routing primitives used by the runtime-owned ingress path. pub use head::{ diff --git a/crates/warp-core/tests/inbox.rs b/crates/warp-core/tests/inbox.rs index 885c0490..ca7f6124 100644 --- a/crates/warp-core/tests/inbox.rs +++ b/crates/warp-core/tests/inbox.rs @@ -604,8 +604,11 @@ fn ticketed_submission_outcome_observation_is_decided_after_scheduler_tick() { .clone(); assert!(matches!( runtime.observe_intent_outcome(&submission), - IntentOutcomeObservation::Decided { correlation: observed } - if observed == correlation + IntentOutcomeObservation::Decided { + correlation: observed, + .. + } + if *observed == correlation )); } diff --git a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs index 9fe41f07..38d3f0b0 100644 --- a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs +++ b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs @@ -10,11 +10,12 @@ use echo_registry_api::{ use warp_core::{ make_head_id, make_intent_kind, make_node_id, make_type_id, ContractMutationHandler, ContractPackageIdentity, Engine, EngineBuilder, GraphStore, GraphView, InboxPolicy, - IngressEnvelope, IngressTarget, IntentSubmissionDisposition, NodeId, NodeRecord, - OpticAdmissionTicket, OpticArtifactHandle, PatternGraph, PlaybackMode, ProvenanceService, - RuntimeError, SchedulerCoordinator, SchedulerKind, TickDelta, TicketedRuntimeIngressAuthority, - WorldlineId, WorldlineRuntime, WorldlineState, WriterHead, WriterHeadKey, - OPTIC_ADMISSION_TICKET_KIND, OPTIC_ARTIFACT_HANDLE_KIND, + IngressEnvelope, IngressTarget, IntentOutcomeDecision, IntentOutcomeObservation, + IntentSubmissionDisposition, NodeId, NodeRecord, OpticAdmissionTicket, OpticArtifactHandle, + PatternGraph, PlaybackMode, ProvenanceService, RuntimeError, SchedulerCoordinator, + SchedulerKind, TickDelta, TicketedRuntimeIngressAuthority, WorldlineId, WorldlineRuntime, + WorldlineState, WriterHead, WriterHeadKey, OPTIC_ADMISSION_TICKET_KIND, + OPTIC_ARTIFACT_HANDLE_KIND, }; const SCHEMA_SHA256_HEX: &str = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"; @@ -25,6 +26,8 @@ const RESULT_TYPE: &str = "test/toy-counter/increment-result"; const RESULT_BYTES: &[u8] = b"value=42"; const MUTATION_RULE_NAME: &str = "cmd/contract/0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef/1001/increment"; +const MUTATION_RULE_ID_LABEL: &str = + "rule:cmd/contract/0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef/1001/increment"; static INCREMENT_ARGS: &[ArgDef] = &[ArgDef { name: "input", @@ -160,7 +163,7 @@ fn contract_footprint(view: GraphView<'_>, scope: &NodeId) -> warp_core::Footpri fn contract_rule() -> warp_core::RewriteRule { warp_core::RewriteRule { - id: make_type_id(&format!("rule:{MUTATION_RULE_NAME}")).0, + id: make_type_id(MUTATION_RULE_ID_LABEL).0, name: MUTATION_RULE_NAME, left: PatternGraph { nodes: vec![] }, matcher: contract_matches, @@ -333,6 +336,22 @@ fn installed_contract_mutation_dispatches_only_through_ticketed_scheduler_tick() assert!(runtime .receipt_correlation_for_submission(&submission) .is_some()); + + assert_eq!( + runtime.observe_intent_outcome(&submission), + IntentOutcomeObservation::Decided { + correlation: Box::new( + runtime + .receipt_correlation_for_submission(&submission) + .expect("receipt correlation should exist") + .clone(), + ), + decision: IntentOutcomeDecision::Applied { + receipt_entry_index: 0, + rule_id: make_type_id(MUTATION_RULE_ID_LABEL).0, + }, + } + ); } #[test] From 02c39363e7760458394f359c899599ece1fe7a36 Mon Sep 17 00:00:00 2001 From: James Ross Date: Thu, 21 May 2026 11:41:58 -0700 Subject: [PATCH 03/11] test(core): lock explicit conflict retry policy --- ...nstalled_contract_intent_pipeline_tests.rs | 200 ++++++++++++++++-- 1 file changed, 185 insertions(+), 15 deletions(-) diff --git a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs index 38d3f0b0..ab4d4ca1 100644 --- a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs +++ b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs @@ -13,21 +13,29 @@ use warp_core::{ IngressEnvelope, IngressTarget, IntentOutcomeDecision, IntentOutcomeObservation, IntentSubmissionDisposition, NodeId, NodeRecord, OpticAdmissionTicket, OpticArtifactHandle, PatternGraph, PlaybackMode, ProvenanceService, RuntimeError, SchedulerCoordinator, - SchedulerKind, TickDelta, TicketedRuntimeIngressAuthority, WorldlineId, WorldlineRuntime, - WorldlineState, WriterHead, WriterHeadKey, OPTIC_ADMISSION_TICKET_KIND, + SchedulerKind, TickDelta, TickReceiptRejection, TicketedRuntimeIngressAuthority, WorldlineId, + WorldlineRuntime, WorldlineState, WriterHead, WriterHeadKey, OPTIC_ADMISSION_TICKET_KIND, OPTIC_ARTIFACT_HANDLE_KIND, }; const SCHEMA_SHA256_HEX: &str = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"; const MUTATION_OP_ID: u32 = 1001; +const CONFLICT_OP_ID: u32 = 1002; const UNKNOWN_OP_ID: u32 = 9999; const MUTATION_VARS: &[u8] = b"amount=42"; +const CONFLICT_VARS_A: &[u8] = b"amount=1"; +const CONFLICT_VARS_B: &[u8] = b"amount=2"; const RESULT_TYPE: &str = "test/toy-counter/increment-result"; const RESULT_BYTES: &[u8] = b"value=42"; const MUTATION_RULE_NAME: &str = "cmd/contract/0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef/1001/increment"; const MUTATION_RULE_ID_LABEL: &str = "rule:cmd/contract/0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef/1001/increment"; +const CONFLICT_RULE_NAME: &str = + "cmd/contract/0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef/1002/conflict"; +const CONFLICT_RULE_ID_LABEL: &str = + "rule:cmd/contract/0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef/1002/conflict"; +const SHARED_CONFLICT_RESULT: &str = "test/toy-counter/shared-conflict-result"; static INCREMENT_ARGS: &[ArgDef] = &[ArgDef { name: "input", @@ -36,15 +44,26 @@ static INCREMENT_ARGS: &[ArgDef] = &[ArgDef { list: false, }]; -static OPS: &[OpDef] = &[OpDef { - kind: OpKind::Mutation, - name: "increment", - op_id: MUTATION_OP_ID, - args: INCREMENT_ARGS, - result_ty: "CounterValue", - directives_json: "{}", - footprint_certificate: None, -}]; +static OPS: &[OpDef] = &[ + OpDef { + kind: OpKind::Mutation, + name: "increment", + op_id: MUTATION_OP_ID, + args: INCREMENT_ARGS, + result_ty: "CounterValue", + directives_json: "{}", + footprint_certificate: None, + }, + OpDef { + kind: OpKind::Mutation, + name: "conflict", + op_id: CONFLICT_OP_ID, + args: INCREMENT_ARGS, + result_ty: "CounterValue", + directives_json: "{}", + footprint_certificate: None, + }, +]; struct StaticRegistry; @@ -175,6 +194,50 @@ fn contract_rule() -> warp_core::RewriteRule { } } +fn shared_conflict_node_id() -> NodeId { + make_node_id(SHARED_CONFLICT_RESULT) +} + +fn conflict_matches(view: GraphView<'_>, scope: &NodeId) -> bool { + warp_core::eint_vars_for_op(view, scope, CONFLICT_OP_ID).is_some() +} + +fn conflict_execute(view: GraphView<'_>, _scope: &NodeId, delta: &mut TickDelta) { + let warp_id = view.warp_id(); + let result = shared_conflict_node_id(); + delta.push(warp_core::WarpOp::UpsertNode { + node: warp_core::NodeKey { + warp_id, + local_id: result, + }, + record: NodeRecord { + ty: make_type_id(RESULT_TYPE), + }, + }); +} + +fn conflict_footprint(view: GraphView<'_>, scope: &NodeId) -> warp_core::Footprint { + let mut footprint = warp_core::runtime_ingress_eint_read_footprint(view, scope); + footprint + .n_write + .insert_with_warp(view.warp_id(), shared_conflict_node_id()); + footprint +} + +fn conflict_rule() -> warp_core::RewriteRule { + warp_core::RewriteRule { + id: make_type_id(CONFLICT_RULE_ID_LABEL).0, + name: CONFLICT_RULE_NAME, + left: PatternGraph { nodes: vec![] }, + matcher: conflict_matches, + executor: conflict_execute, + compute_footprint: conflict_footprint, + factor_mask: 0, + conflict_policy: warp_core::ConflictPolicy::Abort, + join_fn: None, + } +} + fn install_contract(engine: &mut Engine) { static REGISTRY: StaticRegistry = StaticRegistry; engine @@ -182,10 +245,16 @@ fn install_contract(engine: &mut Engine) { identity: package_identity(), registry: ®ISTRY, verification_policy: verification_policy(), - mutation_handlers: vec![ContractMutationHandler { - op_id: MUTATION_OP_ID, - rule: contract_rule(), - }], + mutation_handlers: vec![ + ContractMutationHandler { + op_id: MUTATION_OP_ID, + rule: contract_rule(), + }, + ContractMutationHandler { + op_id: CONFLICT_OP_ID, + rule: conflict_rule(), + }, + ], query_observers: vec![], }) .expect("contract package should install"); @@ -396,3 +465,104 @@ fn unsupported_installed_contract_mutation_cannot_enter_ticketed_runtime_ingress 0 ); } + +#[test] +fn footprint_conflict_is_final_without_hidden_retry() { + let (mut runtime, mut engine, worldline_id, _head) = pipeline_runtime(); + let envelope_a = eint_envelope(worldline_id, CONFLICT_OP_ID, CONFLICT_VARS_A); + let envelope_b = eint_envelope(worldline_id, CONFLICT_OP_ID, CONFLICT_VARS_B); + + let submission_a = match runtime + .submit_intent(envelope_a.clone()) + .expect("first submission should be witnessed") + { + IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, + IntentSubmissionDisposition::Duplicate { .. } => { + panic!("first submission must not be duplicate") + } + }; + let submission_b = match runtime + .submit_intent(envelope_b.clone()) + .expect("second submission should be witnessed") + { + IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, + IntentSubmissionDisposition::Duplicate { .. } => { + panic!("second submission must not be duplicate") + } + }; + + runtime + .ingest_installed_contract_invocation( + &ticketed_authority(), + &engine, + submission_a, + &admission_ticket(11), + envelope_a.clone(), + ) + .expect("first conflict candidate should stage"); + runtime + .ingest_installed_contract_invocation( + &ticketed_authority(), + &engine, + submission_b, + &admission_ticket(12), + envelope_b.clone(), + ) + .expect("second conflict candidate should stage"); + + let mut provenance = provenance_for(&runtime); + SchedulerCoordinator::super_tick(&mut runtime, &mut provenance, &mut engine) + .expect("conflict rejection is a lawful tick outcome"); + + let decisions = [ + runtime.observe_intent_outcome(&submission_a), + runtime.observe_intent_outcome(&submission_b), + ]; + let applied = decisions + .iter() + .filter(|decision| { + matches!( + decision, + IntentOutcomeObservation::Decided { + decision: IntentOutcomeDecision::Applied { .. }, + .. + } + ) + }) + .count(); + let rejected = decisions + .iter() + .filter_map(|decision| match decision { + IntentOutcomeObservation::Decided { + decision: + IntentOutcomeDecision::Rejected { + reason, blocked_by, .. + }, + .. + } => Some((*reason, blocked_by.as_slice())), + _ => None, + }) + .collect::>(); + + assert_eq!(applied, 1, "one conflicting candidate should apply"); + assert_eq!(rejected.len(), 1, "one conflicting candidate should reject"); + assert_eq!(rejected[0].0, TickReceiptRejection::FootprintConflict); + assert_eq!( + rejected[0].1, + &[0], + "conflict receipt should attribute the blocking applied candidate" + ); + + assert!(matches!( + runtime + .submit_intent(envelope_b) + .expect("duplicate submit should be observed"), + IntentSubmissionDisposition::Duplicate { submission_id, .. } + if submission_id == submission_b + )); + assert_eq!( + runtime.ticketed_runtime_ingress_count(), + 2, + "duplicate submission must not create a hidden retry ingress" + ); +} From 52c8d288a512cca65931b16fc74f2fcc7e26cd54 Mon Sep 17 00:00:00 2001 From: James Ross Date: Thu, 21 May 2026 11:45:32 -0700 Subject: [PATCH 04/11] feat(core): replay witnessed intent submission history --- crates/warp-core/src/coordinator.rs | 89 +++++++++++++++++++ ...nstalled_contract_intent_pipeline_tests.rs | 70 +++++++++++++++ 2 files changed, 159 insertions(+) diff --git a/crates/warp-core/src/coordinator.rs b/crates/warp-core/src/coordinator.rs index 457a8b9c..cfdd6a99 100644 --- a/crates/warp-core/src/coordinator.rs +++ b/crates/warp-core/src/coordinator.rs @@ -96,6 +96,10 @@ pub enum RuntimeError { /// the runtime counter can represent. #[error("intent submission generation overflow")] IntentSubmissionGenerationOverflow, + /// Replayed witnessed submission material did not match its derived identity + /// or conflicted with an existing witnessed submission. + #[error("witnessed intent submission replay mismatch: {0:?}")] + IntentSubmissionReplayMismatch(Hash), /// Ticketed runtime ingress referenced a submission Echo has not witnessed. #[error("unknown intent submission: {0:?}")] UnknownIntentSubmission(Hash), @@ -757,6 +761,23 @@ impl WorldlineRuntime { self.witnessed_submissions.values() } + /// Returns witnessed submissions in deterministic replay order. + /// + /// Replay order follows Echo-owned submission generation, then submission id + /// as a stable tie-breaker. These records are ingress-history evidence only; + /// importing them does not stage runtime ingress, tick, dispatch handlers, or + /// mutate application state. + #[must_use] + pub fn witnessed_submission_replay_records(&self) -> Vec { + let mut records = self + .witnessed_submissions + .values() + .cloned() + .collect::>(); + records.sort_by_key(|record| (record.submission_generation, record.submission_id)); + records + } + /// Returns the number of witnessed submission records. #[must_use] pub fn witnessed_submission_count(&self) -> usize { @@ -1264,6 +1285,69 @@ impl WorldlineRuntime { Ok(()) } + /// Imports witnessed submission records as replayed ingress history. + /// + /// This restores Echo's semantic submission ledger only. It does not enter + /// envelopes into head inboxes, stage ticketed runtime ingress, advance + /// ticks, dispatch handlers, or execute contracts. + /// + /// # Errors + /// + /// Returns an error when a replay record names an unknown writer head, its + /// submission id does not match the canonical head/ingress-derived identity, + /// it conflicts with already-replayed submission material, or the replayed + /// generation cannot advance the runtime's next generation counter. + pub fn replay_witnessed_submissions(&mut self, records: I) -> Result<(), RuntimeError> + where + I: IntoIterator, + { + let mut records = records.into_iter().collect::>(); + records.sort_by_key(|record| (record.submission_generation, record.submission_id)); + + for record in records { + if self.heads.get(&record.head_key).is_none() { + return Err(RuntimeError::UnknownHead(record.head_key)); + } + let expected_submission_id = + derive_intent_submission_id(record.head_key, record.ingress_id); + if record.submission_id != expected_submission_id { + return Err(RuntimeError::IntentSubmissionReplayMismatch( + record.submission_id, + )); + } + if self + .witnessed_submissions + .get(&record.submission_id) + .is_some_and(|existing| existing != &record) + { + return Err(RuntimeError::IntentSubmissionReplayMismatch( + record.submission_id, + )); + } + if self + .submission_by_target + .get(&(record.head_key, record.ingress_id)) + .is_some_and(|submission_id| *submission_id != record.submission_id) + { + return Err(RuntimeError::IntentSubmissionReplayMismatch( + record.submission_id, + )); + } + let Some(next_generation) = record.submission_generation.checked_increment() else { + return Err(RuntimeError::IntentSubmissionGenerationOverflow); + }; + if next_generation > self.next_submission_generation { + self.next_submission_generation = next_generation; + } + self.submission_by_target + .insert((record.head_key, record.ingress_id), record.submission_id); + self.witnessed_submissions + .insert(record.submission_id, record); + } + + Ok(()) + } + /// Records an accepted intent submission without entering runtime ingress. /// /// This is witnessed Echo ingress history only. It does not store the @@ -1807,6 +1891,7 @@ fn scheduler_fault_scope_for_error( | RuntimeError::Replay(_) | RuntimeError::Strand(_) | RuntimeError::IntentSubmissionGenerationOverflow + | RuntimeError::IntentSubmissionReplayMismatch(_) | RuntimeError::UnknownIntentSubmission(_) | RuntimeError::TicketedIngressSubmissionMismatch(_) | RuntimeError::TicketedIngressAlreadyStaged(_) @@ -2323,6 +2408,10 @@ fn scheduler_error_cause_digest(err: &RuntimeError) -> Hash { RuntimeError::IntentSubmissionGenerationOverflow => { hasher.update(b"intent-submission-generation-overflow"); } + RuntimeError::IntentSubmissionReplayMismatch(submission_id) => { + hasher.update(b"intent-submission-replay-mismatch"); + hasher.update(submission_id); + } RuntimeError::UnknownIntentSubmission(submission_id) => { hasher.update(b"unknown-intent-submission"); hasher.update(submission_id); diff --git a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs index ab4d4ca1..9d1ea592 100644 --- a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs +++ b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs @@ -566,3 +566,73 @@ fn footprint_conflict_is_final_without_hidden_retry() { "duplicate submission must not create a hidden retry ingress" ); } + +#[test] +fn witnessed_submission_replay_restores_pending_history_without_runtime_ingress() { + let (mut runtime, _engine, worldline_id, _head) = pipeline_runtime(); + let envelope_a = eint_envelope(worldline_id, MUTATION_OP_ID, MUTATION_VARS); + let envelope_b = eint_envelope(worldline_id, CONFLICT_OP_ID, CONFLICT_VARS_A); + + let submission_a = match runtime + .submit_intent(envelope_a.clone()) + .expect("first submission should be witnessed") + { + IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, + IntentSubmissionDisposition::Duplicate { .. } => { + panic!("first submission must not be duplicate") + } + }; + let submission_b = match runtime + .submit_intent(envelope_b.clone()) + .expect("second submission should be witnessed") + { + IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, + IntentSubmissionDisposition::Duplicate { .. } => { + panic!("second submission must not be duplicate") + } + }; + let replay_records = runtime.witnessed_submission_replay_records(); + + let (mut replayed, _engine, _worldline_id, replay_head) = pipeline_runtime(); + replayed + .replay_witnessed_submissions(replay_records) + .expect("witnessed submission replay should import"); + + assert_eq!(replayed.witnessed_submission_count(), 2); + assert_eq!(replayed.ticketed_runtime_ingress_count(), 0); + assert_eq!( + replayed + .heads() + .get(&replay_head) + .expect("replay head should exist") + .inbox() + .pending_count(), + 0, + "replaying witnessed submissions must not stage runtime ingress" + ); + assert_eq!(replayed.global_tick().as_u64(), 0); + + assert!(matches!( + replayed.observe_intent_outcome(&submission_a), + IntentOutcomeObservation::Pending { + submission_id, + ticketed_ingress_id: None, + .. + } if submission_id == submission_a + )); + assert!(matches!( + replayed.observe_intent_outcome(&submission_b), + IntentOutcomeObservation::Pending { + submission_id, + ticketed_ingress_id: None, + .. + } if submission_id == submission_b + )); + assert!(matches!( + replayed + .submit_intent(envelope_a) + .expect("replayed duplicate should be recognized"), + IntentSubmissionDisposition::Duplicate { submission_id, .. } + if submission_id == submission_a + )); +} From 696fec917f633e3e0251b3c96f8517ca30b9bc99 Mon Sep 17 00:00:00 2001 From: James Ross Date: Thu, 21 May 2026 11:47:30 -0700 Subject: [PATCH 05/11] test(core): prove installed intent pipeline replay --- ...nstalled_contract_intent_pipeline_tests.rs | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs index 9d1ea592..aca1bfa1 100644 --- a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs +++ b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs @@ -636,3 +636,78 @@ fn witnessed_submission_replay_restores_pending_history_without_runtime_ingress( if submission_id == submission_a )); } + +#[test] +fn installed_contract_pipeline_replays_to_same_receipt_and_outcome() { + let (mut original_runtime, mut original_engine, worldline_id, _head) = pipeline_runtime(); + let envelope = eint_envelope(worldline_id, MUTATION_OP_ID, MUTATION_VARS); + let ticket = admission_ticket(21); + let submission = match original_runtime + .submit_intent(envelope.clone()) + .expect("submission should be witnessed") + { + IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, + IntentSubmissionDisposition::Duplicate { .. } => { + panic!("first submission must not be duplicate") + } + }; + let replay_records = original_runtime.witnessed_submission_replay_records(); + original_runtime + .ingest_installed_contract_invocation( + &ticketed_authority(), + &original_engine, + submission, + &ticket, + envelope.clone(), + ) + .expect("original ticketed ingress should stage"); + let mut original_provenance = provenance_for(&original_runtime); + let original_steps = SchedulerCoordinator::super_tick( + &mut original_runtime, + &mut original_provenance, + &mut original_engine, + ) + .expect("original tick should commit"); + let original_correlation = original_runtime + .receipt_correlation_for_submission(&submission) + .expect("original receipt correlation should exist") + .clone(); + let original_outcome = original_runtime.observe_intent_outcome(&submission); + + let (mut replayed_runtime, mut replayed_engine, _worldline_id, _head) = pipeline_runtime(); + replayed_runtime + .replay_witnessed_submissions(replay_records) + .expect("witnessed submission replay should import"); + replayed_runtime + .ingest_installed_contract_invocation( + &ticketed_authority(), + &replayed_engine, + submission, + &ticket, + envelope, + ) + .expect("replayed ticketed ingress should stage"); + let mut replayed_provenance = provenance_for(&replayed_runtime); + let replayed_steps = SchedulerCoordinator::super_tick( + &mut replayed_runtime, + &mut replayed_provenance, + &mut replayed_engine, + ) + .expect("replayed tick should commit"); + let replayed_correlation = replayed_runtime + .receipt_correlation_for_submission(&submission) + .expect("replayed receipt correlation should exist") + .clone(); + let replayed_outcome = replayed_runtime.observe_intent_outcome(&submission); + + assert_eq!(replayed_steps, original_steps); + assert_eq!( + replayed_correlation.tick_receipt_digest, + original_correlation.tick_receipt_digest + ); + assert_eq!( + replayed_correlation.commit_hash, + original_correlation.commit_hash + ); + assert_eq!(replayed_outcome, original_outcome); +} From d024f24e6349ed02b36ee391cfc6e9c0378b5844 Mon Sep 17 00:00:00 2001 From: James Ross Date: Thu, 21 May 2026 11:50:48 -0700 Subject: [PATCH 06/11] docs: record installed intent pipeline progress --- CHANGELOG.md | 20 +++++-- docs/BEARING.md | 46 ++++++++-------- ...FORM_contract-queryview-observer-bridge.md | 5 +- ...installed-wesley-contract-host-dispatch.md | 52 ++++++++++++------- 4 files changed, 75 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ec84d56a..9126e69c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,19 @@ ### Added +- `warp-core` now connects the installed contract package boundary to the + witnessed intent pipeline. Package-supported canonical EINT mutation ids can + be staged through ticketed runtime ingress only after Echo has witnessed the + submission and verified an installed package owns the op id; unsupported + installed-contract mutation ids are rejected before they become + runtime-visible scheduler work. The new installed-contract intent pipeline + tests prove application submission does not tick or execute, ticketed ingress + stages without executing, scheduler-owned ticks dispatch the installed + mutation handler, conflict rejection is a final tick outcome with blocker + attribution, duplicate submits do not create hidden retries, witnessed + submissions can be replayed back into pending ingress history without staging + inbox work, and the replayed pipeline converges to the same receipt + correlation and observed outcome. - `warp-core` now exposes an installed contract package registry boundary for runtime-owner host adapters. An installed package binds generated registry metadata, schema hash, codec identity, package artifact identity, supported @@ -74,9 +87,10 @@ - `warp-core` now exposes a zero-write `observe_intent_outcome(...)` polling surface over witnessed submission ids. The observation reports `UnknownSubmission`, `Pending` with optional ticketed-ingress identity, or - `Decided` with the scheduler-owned receipt correlation once a ticketed - submission reaches a tick receipt. This does not infer per-candidate - applied/rejected application semantics, stream updates, dispatch installed + `Decided` with the scheduler-owned receipt correlation and typed receipt + decision once a ticketed submission reaches a tick receipt. The decision + reports applied entries or rejected entries with deterministic rejection reason + and blocker attribution. This does not stream updates, dispatch installed handlers, execute contracts outside scheduler-owned ticks, or introduce automatic retry. - `warp-core` now records scheduler-owned receipt correlations for ticketed diff --git a/docs/BEARING.md b/docs/BEARING.md index 07b58489..f2ed22a4 100644 --- a/docs/BEARING.md +++ b/docs/BEARING.md @@ -166,19 +166,19 @@ AdmissionTicket + witnessed submission -> ticketed runtime ingress ## Roadmap Status -| Area | Status | Notes | -| :----------------------------- | :------- | :--------------------------------------------------------------------------------------------------------- | -| WitnessedIntentSubmission | Partial | Runtime records witnessed submissions in memory; durable restart replay remains follow-up work. | -| SchedulerWorkCandidate | Complete | The admission ladder can resolve the scheduler work candidate fixture. | -| LawWitness | Complete | The admission ladder can resolve the law witness fixture. | -| AdmissionTicket | Complete | Echo can issue `OpticAdmissionTicket` evidence without executing. | -| TicketedRuntimeIngress | Complete | Ticketed ingress stages admitted submissions through runtime-owner authority without ticking. | -| ReceiptCorrelation | Complete | Scheduler-owned tick receipts correlate back to ticketed ingress, tickets, and submissions. | -| IntentOutcomeObservation | Partial | Core exposes zero-write pending/decided observation; domain-level applied/rejected semantics remain later. | -| InstalledContractHostDispatch | Partial | Installed package registry gating exists; full normal intent/tick dispatch proof remains next. | -| ConflictPolicy / ExplicitRetry | Partial | Conflict rejection is explicit; user-facing retry policy is still future work. | -| QueryViewObserverBridge | Complete | Core routes QueryView/Query to installed observers, and Wesley emits host helper constructors. | -| Replay/DIND proof | Later | End-to-end replay proof for the full intent/admission/tick pipeline remains future work. | +| Area | Status | Notes | +| :----------------------------- | :------- | :------------------------------------------------------------------------------------------------------------------------------ | +| WitnessedIntentSubmission | Partial | Runtime records witnessed submissions and exports/imports deterministic replay records; durable storage remains follow-up work. | +| SchedulerWorkCandidate | Complete | The admission ladder can resolve the scheduler work candidate fixture. | +| LawWitness | Complete | The admission ladder can resolve the law witness fixture. | +| AdmissionTicket | Complete | Echo can issue `OpticAdmissionTicket` evidence without executing. | +| TicketedRuntimeIngress | Complete | Ticketed ingress stages admitted submissions through runtime-owner authority without ticking. | +| ReceiptCorrelation | Complete | Scheduler-owned tick receipts correlate back to ticketed ingress, tickets, and submissions. | +| IntentOutcomeObservation | Complete | Core exposes zero-write pending/decided observation with applied/rejected receipt decisions and blockers. | +| InstalledContractHostDispatch | Complete | Installed packages can dispatch mutation handlers through witnessed, ticketed, scheduler-owned ticks. | +| ConflictPolicy / ExplicitRetry | Partial | Tick-scale conflict rejection is final and blocker-attributed; user-facing retry helpers remain future. | +| QueryViewObserverBridge | Complete | Core routes QueryView/Query to installed observers, and Wesley emits host helper constructors. | +| Replay/DIND proof | Partial | Local installed intent pipeline replay converges; broader DIND/replay closure remains future work. | ## Future Scope Boundaries @@ -192,24 +192,20 @@ AdmissionTicket + witnessed submission -> ticketed runtime ingress ## Immediate Next Slice -The next slice should prove installed contract mutation dispatch through the -normal witnessed intent and scheduler-owned tick path. Echo now has a single -registry-verified installed package surface that binds schema hash, artifact -hash, codec identity, supported operation ids, mutation handlers, and query -observers before those handlers or observers install into `Engine`. +The local installed-contract intent pipeline now reaches scheduler-owned handler +dispatch and replay convergence. The next slice should move outward to the +contract-aware receipt/reading and consumer-proof boundary: prove an external +Wesley-compiled contract package can use the generic installed mutation and +query surfaces without moving application nouns into `warp-core`. Direct `native_rule_bootstrap` registration remains an internal fixture and transitional engine-test path. It does not provide package identity, registry verification, or generated operation/package binding guarantees. Contract-host proofs that need those guarantees should install through the package boundary. -The next proof should start from canonical generated EINT bytes, pass through -witnessed ingress and package-supported operation lookup, stage through the -existing ticketed runtime path, and run the installed mutation rule only during a -scheduler-owned tick. - -That slice must not implement streaming subscriptions, automatic retry, -execution outside scheduler-owned ticks, or wall-clock cadence semantics. +That next slice must not implement streaming subscriptions, hidden retry, +execution outside scheduler-owned ticks, wall-clock cadence semantics, or +jedit/text-domain APIs inside Echo core. ## Do Not Regress diff --git a/docs/method/backlog/asap/PLATFORM_contract-queryview-observer-bridge.md b/docs/method/backlog/asap/PLATFORM_contract-queryview-observer-bridge.md index 4eeff8b5..401da944 100644 --- a/docs/method/backlog/asap/PLATFORM_contract-queryview-observer-bridge.md +++ b/docs/method/backlog/asap/PLATFORM_contract-queryview-observer-bridge.md @@ -3,8 +3,9 @@ # Contract QueryView Observer Bridge -Status: core observer bridge, generated query helper checkpoint, and installed -contract package registry boundary complete. +Status: core observer bridge, generated query helper checkpoint, installed +contract package registry boundary, and local installed mutation dispatch proof +complete. Depends on: diff --git a/docs/method/backlog/asap/PLATFORM_installed-wesley-contract-host-dispatch.md b/docs/method/backlog/asap/PLATFORM_installed-wesley-contract-host-dispatch.md index 9c2c6954..0ae4e553 100644 --- a/docs/method/backlog/asap/PLATFORM_installed-wesley-contract-host-dispatch.md +++ b/docs/method/backlog/asap/PLATFORM_installed-wesley-contract-host-dispatch.md @@ -3,8 +3,8 @@ # Installed Wesley Contract Host Dispatch -Status: installed package registry boundary exists; full normal intent/tick -dispatch proof remains. +Status: local installed package dispatch proof complete; external consumer +contract proof remains downstream. Depends on: @@ -14,9 +14,11 @@ Depends on: ## Why now -Echo can accept EINT bytes, but it does not yet route a validated generated -contract operation to an installed contract handler inside the normal witnessed -admission, scheduling, and provenance path. +Echo can accept EINT bytes and now routes package-supported generated contract +mutations through witnessed submission, ticketed runtime ingress, and +scheduler-owned ticks. The remaining platform work is proving the same generic +surface with an external Wesley-compiled consumer package while keeping +application nouns out of `warp-core`. ## Current Checkpoint @@ -49,28 +51,42 @@ Direct `native_rule_bootstrap` registration remains available only as an internal fixture and transitional engine-test path. It is not the registry package boundary and does not provide package identity guarantees. -Remaining work is dispatch integration: prove a generated EINT submitted through -normal Echo ingress reaches the installed mutation handler only through the -witnessed/ticketed/scheduler-owned path. +The local dispatch proof now verifies: + +- package-supported EINT mutation ids enter runtime only through witnessed + submission plus ticketed runtime ingress; +- unsupported installed-contract mutation ids are rejected before they become + runtime-visible work; +- handler execution occurs during `SchedulerCoordinator::super_tick(...)`, not + during application submission or ticketed ingress; +- receipt/outcome observation reports applied and rejected tick decisions; +- footprint conflicts are final for that tick attempt, with blocker attribution + and no hidden retry ingress; +- witnessed submission replay restores pending ingress history without staging + inbox work; and +- replayed installed-contract pipeline runs converge to the same receipt + correlation and observed outcome. + +Remaining work moved out of this card: external consumer proof fixtures, +contract-aware receipt/reading polish, and broader DIND replay closure. ## RED -Add a failing test with a tiny generated or hand-rolled contract fixture: +Added failing tests with a tiny generated-shaped contract fixture: - install one mutation op id and generated handler; -- submit generated EINT bytes through `dispatch_intent`; +- submit canonical EINT bytes through witnessed submission; - prove no direct test-only mutation service is called; -- assert worldline/provenance state changes only after scheduler execution. +- assert worldline/provenance state changes only after scheduler execution; +- reject unsupported package op ids before runtime ingress; +- expose receipt-level applied/rejected decisions; and +- prove replay convergence. ## GREEN -Add the minimal generic installed-contract host seam needed to pass the test. - -Candidate surface: - -- package-supported op-id lookup during ingress/admission/runtime handoff; -- generated vars decode; -- artifact/schema identity attached to receipt or ingress metadata. +Added package-supported op-id lookup during the ticketed runtime handoff and +receipt/outcome observation over scheduler-owned decisions. Generated vars +decode and handler dispatch remain inside installed mutation rules. ## Acceptance criteria From 0c50a29b08b7f00cb0cc3bb4de4dad847e9c1ea2 Mon Sep 17 00:00:00 2001 From: James Ross Date: Thu, 21 May 2026 13:03:16 -0700 Subject: [PATCH 07/11] Fix: preserve replayed submission generation continuity --- CHANGELOG.md | 4 +- crates/warp-core/src/coordinator.rs | 9 ++-- ...nstalled_contract_intent_pipeline_tests.rs | 43 ++++++++++++++++--- 3 files changed, 43 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9126e69c..7147f904 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,9 @@ attribution, duplicate submits do not create hidden retries, witnessed submissions can be replayed back into pending ingress history without staging inbox work, and the replayed pipeline converges to the same receipt - correlation and observed outcome. + correlation and observed outcome. Replayed witnessed submission records + preserve generation continuity so the next live submission receives the next + contiguous generation instead of skipping ahead. - `warp-core` now exposes an installed contract package registry boundary for runtime-owner host adapters. An installed package binds generated registry metadata, schema hash, codec identity, package artifact identity, supported diff --git a/crates/warp-core/src/coordinator.rs b/crates/warp-core/src/coordinator.rs index cfdd6a99..f14e1c24 100644 --- a/crates/warp-core/src/coordinator.rs +++ b/crates/warp-core/src/coordinator.rs @@ -1296,7 +1296,7 @@ impl WorldlineRuntime { /// Returns an error when a replay record names an unknown writer head, its /// submission id does not match the canonical head/ingress-derived identity, /// it conflicts with already-replayed submission material, or the replayed - /// generation cannot advance the runtime's next generation counter. + /// generation conflicts with existing replay posture. pub fn replay_witnessed_submissions(&mut self, records: I) -> Result<(), RuntimeError> where I: IntoIterator, @@ -1333,11 +1333,8 @@ impl WorldlineRuntime { record.submission_id, )); } - let Some(next_generation) = record.submission_generation.checked_increment() else { - return Err(RuntimeError::IntentSubmissionGenerationOverflow); - }; - if next_generation > self.next_submission_generation { - self.next_submission_generation = next_generation; + if record.submission_generation > self.next_submission_generation { + self.next_submission_generation = record.submission_generation; } self.submission_by_target .insert((record.head_key, record.ingress_id), record.submission_id); diff --git a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs index aca1bfa1..0bb662b5 100644 --- a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs +++ b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs @@ -10,12 +10,12 @@ use echo_registry_api::{ use warp_core::{ make_head_id, make_intent_kind, make_node_id, make_type_id, ContractMutationHandler, ContractPackageIdentity, Engine, EngineBuilder, GraphStore, GraphView, InboxPolicy, - IngressEnvelope, IngressTarget, IntentOutcomeDecision, IntentOutcomeObservation, - IntentSubmissionDisposition, NodeId, NodeRecord, OpticAdmissionTicket, OpticArtifactHandle, - PatternGraph, PlaybackMode, ProvenanceService, RuntimeError, SchedulerCoordinator, - SchedulerKind, TickDelta, TickReceiptRejection, TicketedRuntimeIngressAuthority, WorldlineId, - WorldlineRuntime, WorldlineState, WriterHead, WriterHeadKey, OPTIC_ADMISSION_TICKET_KIND, - OPTIC_ARTIFACT_HANDLE_KIND, + IngressEnvelope, IngressSubmissionGeneration, IngressTarget, IntentOutcomeDecision, + IntentOutcomeObservation, IntentSubmissionDisposition, NodeId, NodeRecord, + OpticAdmissionTicket, OpticArtifactHandle, PatternGraph, PlaybackMode, ProvenanceService, + RuntimeError, SchedulerCoordinator, SchedulerKind, TickDelta, TickReceiptRejection, + TicketedRuntimeIngressAuthority, WorldlineId, WorldlineRuntime, WorldlineState, WriterHead, + WriterHeadKey, OPTIC_ADMISSION_TICKET_KIND, OPTIC_ARTIFACT_HANDLE_KIND, }; const SCHEMA_SHA256_HEX: &str = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"; @@ -637,6 +637,37 @@ fn witnessed_submission_replay_restores_pending_history_without_runtime_ingress( )); } +#[test] +fn witnessed_submission_replay_preserves_generation_continuity() { + let (mut runtime, _engine, worldline_id, _head) = pipeline_runtime(); + let envelope_a = eint_envelope(worldline_id, MUTATION_OP_ID, MUTATION_VARS); + let envelope_b = eint_envelope(worldline_id, CONFLICT_OP_ID, CONFLICT_VARS_A); + let envelope_c = eint_envelope(worldline_id, CONFLICT_OP_ID, CONFLICT_VARS_B); + + runtime + .submit_intent(envelope_a) + .expect("first submission should be witnessed"); + runtime + .submit_intent(envelope_b) + .expect("second submission should be witnessed"); + let replay_records = runtime.witnessed_submission_replay_records(); + + let (mut replayed, _engine, _worldline_id, _head) = pipeline_runtime(); + replayed + .replay_witnessed_submissions(replay_records) + .expect("witnessed submission replay should import"); + + assert!(matches!( + replayed + .submit_intent(envelope_c) + .expect("next live submit should be witnessed"), + IntentSubmissionDisposition::Accepted { + submission_generation, + .. + } if submission_generation == IngressSubmissionGeneration::from_raw(3) + )); +} + #[test] fn installed_contract_pipeline_replays_to_same_receipt_and_outcome() { let (mut original_runtime, mut original_engine, worldline_id, _head) = pipeline_runtime(); From dbc86de970eedaec4572c520b3b568104505caa7 Mon Sep 17 00:00:00 2001 From: James Ross Date: Thu, 21 May 2026 15:38:18 -0700 Subject: [PATCH 08/11] Fix: make witnessed submission replay atomic --- crates/warp-core/src/coordinator.rs | 23 ++++++++--- ...nstalled_contract_intent_pipeline_tests.rs | 40 +++++++++++++++++++ 2 files changed, 58 insertions(+), 5 deletions(-) diff --git a/crates/warp-core/src/coordinator.rs b/crates/warp-core/src/coordinator.rs index f14e1c24..4ca9c87a 100644 --- a/crates/warp-core/src/coordinator.rs +++ b/crates/warp-core/src/coordinator.rs @@ -1303,6 +1303,9 @@ impl WorldlineRuntime { { let mut records = records.into_iter().collect::>(); records.sort_by_key(|record| (record.submission_generation, record.submission_id)); + let mut staged_submission_by_target = BTreeMap::new(); + let mut staged_witnessed_submissions = BTreeMap::new(); + let mut next_submission_generation = self.next_submission_generation; for record in records { if self.heads.get(&record.head_key).is_none() { @@ -1319,6 +1322,9 @@ impl WorldlineRuntime { .witnessed_submissions .get(&record.submission_id) .is_some_and(|existing| existing != &record) + || staged_witnessed_submissions + .get(&record.submission_id) + .is_some_and(|existing| existing != &record) { return Err(RuntimeError::IntentSubmissionReplayMismatch( record.submission_id, @@ -1328,20 +1334,27 @@ impl WorldlineRuntime { .submission_by_target .get(&(record.head_key, record.ingress_id)) .is_some_and(|submission_id| *submission_id != record.submission_id) + || staged_submission_by_target + .get(&(record.head_key, record.ingress_id)) + .is_some_and(|submission_id| *submission_id != record.submission_id) { return Err(RuntimeError::IntentSubmissionReplayMismatch( record.submission_id, )); } - if record.submission_generation > self.next_submission_generation { - self.next_submission_generation = record.submission_generation; + if record.submission_generation > next_submission_generation { + next_submission_generation = record.submission_generation; } - self.submission_by_target + staged_submission_by_target .insert((record.head_key, record.ingress_id), record.submission_id); - self.witnessed_submissions - .insert(record.submission_id, record); + staged_witnessed_submissions.insert(record.submission_id, record); } + self.next_submission_generation = next_submission_generation; + self.submission_by_target + .extend(staged_submission_by_target); + self.witnessed_submissions + .extend(staged_witnessed_submissions); Ok(()) } diff --git a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs index 0bb662b5..1534f82d 100644 --- a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs +++ b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs @@ -567,6 +567,46 @@ fn footprint_conflict_is_final_without_hidden_retry() { ); } +#[test] +fn replay_witnessed_submissions_rejects_invalid_batch_without_partial_import() { + let (mut runtime, _engine, worldline_id, _head) = pipeline_runtime(); + let envelope_a = eint_envelope(worldline_id, MUTATION_OP_ID, MUTATION_VARS); + let envelope_b = eint_envelope(worldline_id, CONFLICT_OP_ID, CONFLICT_VARS_A); + + runtime + .submit_intent(envelope_a) + .expect("first submission should be witnessed"); + runtime + .submit_intent(envelope_b) + .expect("second submission should be witnessed"); + let mut replay_records = runtime.witnessed_submission_replay_records(); + replay_records + .get_mut(1) + .expect("second replay record should exist") + .submission_id = [0xA5; 32]; + + let (mut replayed, _engine, replay_worldline_id, _head) = pipeline_runtime(); + assert!(matches!( + replayed.replay_witnessed_submissions(replay_records), + Err(RuntimeError::IntentSubmissionReplayMismatch(_)) + )); + + assert_eq!( + replayed.witnessed_submission_count(), + 0, + "failed replay import must not retain earlier records from the same batch" + ); + assert!(matches!( + replayed + .submit_intent(eint_envelope(replay_worldline_id, MUTATION_OP_ID, MUTATION_VARS)) + .expect("live submission after failed replay should still work"), + IntentSubmissionDisposition::Accepted { + submission_generation, + .. + } if submission_generation == IngressSubmissionGeneration::from_raw(1) + )); +} + #[test] fn witnessed_submission_replay_restores_pending_history_without_runtime_ingress() { let (mut runtime, _engine, worldline_id, _head) = pipeline_runtime(); From 5e730436156b31389e3e70b08ad843f2c83d63ab Mon Sep 17 00:00:00 2001 From: James Ross Date: Thu, 21 May 2026 15:41:19 -0700 Subject: [PATCH 09/11] Fix: validate replayed submission generations --- crates/warp-core/src/coordinator.rs | 20 +++++++ ...nstalled_contract_intent_pipeline_tests.rs | 60 +++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/crates/warp-core/src/coordinator.rs b/crates/warp-core/src/coordinator.rs index 4ca9c87a..8c795adf 100644 --- a/crates/warp-core/src/coordinator.rs +++ b/crates/warp-core/src/coordinator.rs @@ -1305,6 +1305,12 @@ impl WorldlineRuntime { records.sort_by_key(|record| (record.submission_generation, record.submission_id)); let mut staged_submission_by_target = BTreeMap::new(); let mut staged_witnessed_submissions = BTreeMap::new(); + let existing_generation_by_submission = self + .witnessed_submissions + .values() + .map(|record| (record.submission_generation, record.submission_id)) + .collect::>(); + let mut staged_generation_by_submission = BTreeMap::new(); let mut next_submission_generation = self.next_submission_generation; for record in records { @@ -1342,11 +1348,25 @@ impl WorldlineRuntime { record.submission_id, )); } + if record.submission_generation == IngressSubmissionGeneration::ZERO + || existing_generation_by_submission + .get(&record.submission_generation) + .is_some_and(|submission_id| *submission_id != record.submission_id) + || staged_generation_by_submission + .get(&record.submission_generation) + .is_some_and(|submission_id| *submission_id != record.submission_id) + { + return Err(RuntimeError::IntentSubmissionReplayMismatch( + record.submission_id, + )); + } if record.submission_generation > next_submission_generation { next_submission_generation = record.submission_generation; } staged_submission_by_target .insert((record.head_key, record.ingress_id), record.submission_id); + staged_generation_by_submission + .insert(record.submission_generation, record.submission_id); staged_witnessed_submissions.insert(record.submission_id, record); } diff --git a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs index 1534f82d..8152d070 100644 --- a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs +++ b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs @@ -607,6 +607,66 @@ fn replay_witnessed_submissions_rejects_invalid_batch_without_partial_import() { )); } +#[test] +fn replay_witnessed_submissions_rejects_zero_generation() { + let (mut runtime, _engine, worldline_id, _head) = pipeline_runtime(); + let envelope = eint_envelope(worldline_id, MUTATION_OP_ID, MUTATION_VARS); + + runtime + .submit_intent(envelope) + .expect("submission should be witnessed"); + let mut replay_records = runtime.witnessed_submission_replay_records(); + replay_records + .get_mut(0) + .expect("replay record should exist") + .submission_generation = IngressSubmissionGeneration::ZERO; + + let (mut replayed, _engine, _worldline_id, _head) = pipeline_runtime(); + assert!(matches!( + replayed.replay_witnessed_submissions(replay_records), + Err(RuntimeError::IntentSubmissionReplayMismatch(_)) + )); + assert_eq!( + replayed.witnessed_submission_count(), + 0, + "zero-generation replay records must not enter witnessed history" + ); +} + +#[test] +fn replay_witnessed_submissions_rejects_duplicate_generations() { + let (mut runtime, _engine, worldline_id, _head) = pipeline_runtime(); + let envelope_a = eint_envelope(worldline_id, MUTATION_OP_ID, MUTATION_VARS); + let envelope_b = eint_envelope(worldline_id, CONFLICT_OP_ID, CONFLICT_VARS_A); + + runtime + .submit_intent(envelope_a) + .expect("first submission should be witnessed"); + runtime + .submit_intent(envelope_b) + .expect("second submission should be witnessed"); + let mut replay_records = runtime.witnessed_submission_replay_records(); + let duplicate_generation = replay_records + .first() + .expect("first replay record should exist") + .submission_generation; + replay_records + .get_mut(1) + .expect("second replay record should exist") + .submission_generation = duplicate_generation; + + let (mut replayed, _engine, _worldline_id, _head) = pipeline_runtime(); + assert!(matches!( + replayed.replay_witnessed_submissions(replay_records), + Err(RuntimeError::IntentSubmissionReplayMismatch(_)) + )); + assert_eq!( + replayed.witnessed_submission_count(), + 0, + "duplicate replay generations must not enter witnessed history" + ); +} + #[test] fn witnessed_submission_replay_restores_pending_history_without_runtime_ingress() { let (mut runtime, _engine, worldline_id, _head) = pipeline_runtime(); From b0abe3da3a47942b0d0c18b8af2fd595adbbd283 Mon Sep 17 00:00:00 2001 From: James Ross Date: Thu, 21 May 2026 15:43:39 -0700 Subject: [PATCH 10/11] Fix: run installed intent pipeline tests in automation --- .github/workflows/ci.yml | 2 ++ .github/workflows/macos-local.yml | 2 ++ crates/warp-core/Cargo.toml | 4 ++++ scripts/verify-local.sh | 4 ++-- 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e2c2d509..e78ce923 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -211,6 +211,8 @@ jobs: run: | cargo test -p warp-core --features host_test --test causal_fact_publication_tests cargo test -p warp-core --features host_test --test optic_invocation_admission_tests + - name: cargo test (warp-core installed contract intent pipeline) + run: cargo test -p warp-core --features native_rule_bootstrap,host_test --test installed_contract_intent_pipeline_tests - name: cargo test --doc (warp-core) run: cargo test -p warp-core --doc - name: PRNG golden regression (warp-math) diff --git a/.github/workflows/macos-local.yml b/.github/workflows/macos-local.yml index e3de51ca..f01d680c 100644 --- a/.github/workflows/macos-local.yml +++ b/.github/workflows/macos-local.yml @@ -65,3 +65,5 @@ jobs: run: | cargo test -p warp-core --features host_test --test causal_fact_publication_tests cargo test -p warp-core --features host_test --test optic_invocation_admission_tests + - name: cargo test (warp-core installed contract intent pipeline) + run: cargo test -p warp-core --features native_rule_bootstrap,host_test --test installed_contract_intent_pipeline_tests diff --git a/crates/warp-core/Cargo.toml b/crates/warp-core/Cargo.toml index 3bed9999..2b5e1d27 100644 --- a/crates/warp-core/Cargo.toml +++ b/crates/warp-core/Cargo.toml @@ -90,6 +90,10 @@ required-features = ["trusted_runtime"] name = "installed_contract_registry_tests" required-features = ["native_rule_bootstrap"] +[[test]] +name = "installed_contract_intent_pipeline_tests" +required-features = ["native_rule_bootstrap", "host_test"] + [build-dependencies] blake3 = "1.0" diff --git a/scripts/verify-local.sh b/scripts/verify-local.sh index 36b712ae..605b95a5 100755 --- a/scripts/verify-local.sh +++ b/scripts/verify-local.sh @@ -1128,7 +1128,7 @@ pre_push_feature_string_for_test_target() { local test_target="$2" case "${crate}:${test_target}" in - warp-core:inbox) + warp-core:inbox|warp-core:installed_contract_intent_pipeline_tests) printf '%s\n' "native_rule_bootstrap,host_test" ;; warp-core:causal_fact_publication_tests|warp-core:optic_invocation_admission_tests) @@ -1385,7 +1385,7 @@ warp_core_feature_args_for_test() { local test_target="$1" case "$test_target" in - inbox) + inbox|installed_contract_intent_pipeline_tests) printf '%s\n' "--features" "native_rule_bootstrap,host_test" ;; causal_fact_publication_tests|optic_invocation_admission_tests) From 95a94c307ca642f467951fd652001541d445791b Mon Sep 17 00:00:00 2001 From: James Ross Date: Thu, 21 May 2026 15:46:58 -0700 Subject: [PATCH 11/11] Fix: make installed pipeline test target clippy-clean --- .../tests/installed_contract_intent_pipeline_tests.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs index 8152d070..a43ae0dd 100644 --- a/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs +++ b/crates/warp-core/tests/installed_contract_intent_pipeline_tests.rs @@ -2,6 +2,7 @@ // © James Ross Ω FLYING•ROBOTS //! Installed contract intent pipeline tests. #![cfg(all(feature = "native_rule_bootstrap", feature = "host_test"))] +#![allow(clippy::expect_used, clippy::panic)] use echo_registry_api::{ ArgDef, ContractArtifactVerificationPolicy, ObjectDef, OpDef, OpKind, RegistryInfo, @@ -497,7 +498,7 @@ fn footprint_conflict_is_final_without_hidden_retry() { &engine, submission_a, &admission_ticket(11), - envelope_a.clone(), + envelope_a, ) .expect("first conflict candidate should stage"); runtime @@ -683,7 +684,7 @@ fn witnessed_submission_replay_restores_pending_history_without_runtime_ingress( } }; let submission_b = match runtime - .submit_intent(envelope_b.clone()) + .submit_intent(envelope_b) .expect("second submission should be witnessed") { IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id,