Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

139 changes: 139 additions & 0 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ fn map_reducer_error(e: ReducerCallError, reducer: &str) -> (StatusCode, String)
log::debug!("Attempt to call {lifecycle:?} lifecycle reducer {reducer}");
StatusCode::BAD_REQUEST
}
ReducerCallError::OutOfOrderInboundMessage { .. } => StatusCode::BAD_REQUEST,
};

log::debug!("Error while invoking reducer {e:#}");
Expand Down Expand Up @@ -216,6 +217,140 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
}
}

/// Path parameters for the `call_from_database` route.
#[derive(Deserialize)]
pub struct CallFromDatabaseParams {
name_or_identity: NameOrIdentity,
reducer: String,
}

/// Query parameters for the `call_from_database` route.
///
/// Both fields are mandatory; a missing field results in a 400 Bad Request.
#[derive(Deserialize)]
pub struct CallFromDatabaseQuery {
/// [`Identity`] of the sending database, parsed from a hex query string.
sender_identity: Identity,
/// The inter-database message ID from the sender's st_outbound_msg.
/// Used for at-most-once delivery via `st_inbound_msg`.
msg_id: u64,
}

/// Call a reducer on behalf of another database, with deduplication.
///
/// Endpoint: `POST /database/:name_or_identity/call-from-database/:reducer`
///
/// Required query params:
/// - `sender_identity` — hex-encoded identity of the sending database.
/// - `msg_id` — the inter-database message ID from the sender's st_outbound_msg.
///
/// Semantics:
/// - The client **must send strictly increasing `msg_id` values per `sender_identity`.**
/// - If a `msg_id` is **less than the last seen msg_id** for that sender, the request
/// is treated as **invalid and rejected with a bad request error**.
/// - If the incoming `msg_id` is equal to the last delivered msg_id, the call is treated
/// as a duplicate and **200 OK is returned without invoking the reducer**.
pub async fn call_from_database<S: ControlStateDelegate + NodeDelegate>(
State(worker_ctx): State<S>,
Extension(auth): Extension<SpacetimeAuth>,
Path(CallFromDatabaseParams {
name_or_identity,
reducer,
}): Path<CallFromDatabaseParams>,
Query(CallFromDatabaseQuery {
sender_identity,
msg_id,
}): Query<CallFromDatabaseQuery>,
TypedHeader(content_type): TypedHeader<headers::ContentType>,
body: axum::body::Bytes,
) -> axum::response::Result<impl IntoResponse> {
// IDC callers send BSATN (application/octet-stream).
if content_type != headers::ContentType::octet_stream() {
return Err((StatusCode::UNSUPPORTED_MEDIA_TYPE, "Expected application/octet-stream").into());
}

let caller_identity = auth.claims.identity;

let args = FunctionArgs::Bsatn(body);
let connection_id = generate_random_connection_id();

let (module, Database { owner_identity, .. }) = find_module_and_database(&worker_ctx, name_or_identity).await?;

// Call client_connected, if defined.
module
.call_identity_connected(auth.into(), connection_id)
.await
.map_err(client_connected_error_to_response)?;

let mut result = module
.call_reducer_from_database(
caller_identity,
Some(connection_id),
None,
None,
None,
&reducer,
args,
sender_identity,
msg_id,
)
.await;

//Wait for durability before sending response
if let Ok(rcr) = result.as_mut()
&& let Some(tx_offset) = rcr.tx_offset.as_mut()
&& let Some(mut durable_offset) = module.durable_tx_offset()
{
let tx_offset = tx_offset.await.map_err(|_| log_and_500("transaction aborted"))?;
durable_offset.wait_for(tx_offset).await.map_err(log_and_500)?;
}

module
.call_identity_disconnected(caller_identity, connection_id)
.await
.map_err(client_disconnected_error_to_response)?;

match result {
Ok(rcr) => {
let (status, body) = match rcr.outcome {
ReducerOutcome::Committed => (
StatusCode::OK,
axum::body::Body::from(rcr.reducer_return_value.unwrap_or_default()),
),
// 422 = reducer ran but returned Err; the IDC actor uses this to distinguish
// reducer failures from other errors (which it retries).
// This is inconsistent with `call` endpoint, which returns 523 status code if
// reducer fails
ReducerOutcome::Failed(errmsg) => (
StatusCode::UNPROCESSABLE_ENTITY,
axum::body::Body::from(errmsg.to_string()),
),
// This will be retried by IDC acttor
ReducerOutcome::BudgetExceeded => {
log::warn!(
"Node's energy budget exceeded for identity: {owner_identity} while executing {reducer}"
);
(
StatusCode::PAYMENT_REQUIRED,
axum::body::Body::from("Module energy budget exhausted."),
)
}
};
Ok((
status,
TypedHeader(SpacetimeEnergyUsed(rcr.energy_used)),
TypedHeader(SpacetimeExecutionDurationMicros(rcr.execution_duration)),
body,
)
.into_response())
}
Err(e) => {
let (status, msg) = map_reducer_error(e, &reducer);
Err((status, msg).into())
}
}
}

fn assert_content_type_json(content_type: headers::ContentType) -> axum::response::Result<()> {
if content_type != headers::ContentType::json() {
Err(axum::extract::rejection::MissingJsonContentType::default().into())
Expand Down Expand Up @@ -1189,6 +1324,8 @@ pub struct DatabaseRoutes<S> {
pub subscribe_get: MethodRouter<S>,
/// POST: /database/:name_or_identity/call/:reducer
pub call_reducer_procedure_post: MethodRouter<S>,
/// POST: /database/:name_or_identity/call-from-database/:reducer?sender_identity=<hex>&msg_id=<u64>
pub call_from_database_post: MethodRouter<S>,
/// GET: /database/:name_or_identity/schema
pub schema_get: MethodRouter<S>,
/// GET: /database/:name_or_identity/logs
Expand Down Expand Up @@ -1220,6 +1357,7 @@ where
identity_get: get(get_identity::<S>),
subscribe_get: get(handle_websocket::<S>),
call_reducer_procedure_post: post(call::<S>),
call_from_database_post: post(call_from_database::<S>),
schema_get: get(schema::<S>),
logs_get: get(logs::<S>),
sql_post: post(sql::<S>),
Expand All @@ -1245,6 +1383,7 @@ where
.route("/identity", self.identity_get)
.route("/subscribe", self.subscribe_get)
.route("/call/:reducer", self.call_reducer_procedure_post)
.route("/call-from-database/:reducer", self.call_from_database_post)
.route("/schema", self.schema_get)
.route("/logs", self.logs_get)
.route("/sql", self.sql_post)
Expand Down
39 changes: 38 additions & 1 deletion crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::idc_actor::{IdcActor, IdcActorConfig, IdcActorSender, IdcActorStarter};
use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule};
use super::scheduler::SchedulerStarter;
use super::wasmtime::WasmtimeRuntime;
Expand All @@ -22,8 +23,10 @@ use crate::util::jobs::{AllocatedJobCore, JobCores};
use crate::worker_metrics::WORKER_METRICS;
use anyhow::{anyhow, bail, Context};
use async_trait::async_trait;
use bytes::Bytes;
use durability::{Durability, EmptyHistory};
use log::{info, trace, warn};
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use scopeguard::defer;
use spacetimedb_commitlog::SizeOnDisk;
Expand Down Expand Up @@ -117,6 +120,8 @@ pub struct HostController {
db_cores: JobCores,
/// The pool of buffers used to build `BsatnRowList`s in subscriptions.
pub bsatn_rlb_pool: BsatnRowListBuilderPool,
/// Local port to be used by `IdcActor` to make remote reducer calls
idc_http_port: OnceCell<u16>,
}

pub(crate) struct HostRuntimes {
Expand All @@ -132,11 +137,13 @@ impl HostRuntimes {
}
}

#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct ReducerCallResult {
pub outcome: ReducerOutcome,
pub reducer_return_value: Option<Bytes>,
pub energy_used: EnergyQuanta,
pub execution_duration: Duration,
pub tx_offset: Option<TransactionOffset>,
}

impl ReducerCallResult {
Expand Down Expand Up @@ -228,6 +235,7 @@ impl HostController {
page_pool: PagePool::new(default_config.page_pool_max_size),
bsatn_rlb_pool: BsatnRowListBuilderPool::new(),
db_cores,
idc_http_port: OnceCell::new(),
}
}

Expand All @@ -236,6 +244,10 @@ impl HostController {
self.program_storage = ps;
}

pub fn set_idc_http_port(&self, port: u16) -> Result<(), u16> {
self.idc_http_port.set(port)
}

/// Get a [`ModuleHost`] managed by this controller, or launch it from
/// persistent state.
///
Expand Down Expand Up @@ -706,6 +718,7 @@ async fn make_module_host(
runtimes: Arc<HostRuntimes>,
replica_ctx: Arc<ReplicaContext>,
scheduler: Scheduler,
idc_sender: IdcActorSender,
program: Program,
energy_monitor: Arc<dyn EnergyMonitor>,
unregister: impl Fn() + Send + Sync + 'static,
Expand All @@ -721,6 +734,7 @@ async fn make_module_host(
let mcc = ModuleCreationContext {
replica_ctx,
scheduler,
idc_sender,
program_hash: program.hash,
energy_monitor,
};
Expand Down Expand Up @@ -758,6 +772,7 @@ struct LaunchedModule {
module_host: ModuleHost,
scheduler: Scheduler,
scheduler_starter: SchedulerStarter,
idc_starter: IdcActorStarter,
}

struct ModuleLauncher<F> {
Expand Down Expand Up @@ -794,10 +809,12 @@ impl<F: Fn() + Send + Sync + 'static> ModuleLauncher<F> {
.await
.map(Arc::new)?;
let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db().clone());
let (idc_starter, idc_sender) = IdcActor::open();
let (program, module_host) = make_module_host(
self.runtimes.clone(),
replica_ctx.clone(),
scheduler.clone(),
idc_sender.clone(),
self.program,
self.energy_monitor,
self.on_panic,
Expand All @@ -814,6 +831,7 @@ impl<F: Fn() + Send + Sync + 'static> ModuleLauncher<F> {
module_host,
scheduler,
scheduler_starter,
idc_starter,
},
))
}
Expand Down Expand Up @@ -879,6 +897,9 @@ struct Host {
/// Handle to the task responsible for cleaning up old views.
/// The task is aborted when [`Host`] is dropped.
view_cleanup_task: AbortHandle,
/// IDC actor: delivers outbound inter-database messages from `st_outbound_msg`.
/// Stopped when [`Host`] is dropped.
_idc_actor: IdcActor,
}

impl Host {
Expand Down Expand Up @@ -1072,6 +1093,7 @@ impl Host {
module_host,
scheduler,
scheduler_starter,
idc_starter,
} = launched;

// Disconnect dangling clients.
Expand All @@ -1098,6 +1120,18 @@ impl Host {
let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();
let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db().clone());

let idc_actor = idc_starter.start(
replica_ctx.relational_db().clone(),
IdcActorConfig {
sender_identity: replica_ctx.database_identity,
http_port: *host_controller
.idc_http_port
.get()
.ok_or_else(|| anyhow!("Port for IDC actor is not initialized"))?,
},
module_host.downgrade(),
);

let module = watch::Sender::new(module_host);

Ok(Host {
Expand All @@ -1107,6 +1141,7 @@ impl Host {
disk_metrics_recorder_task,
tx_metrics_recorder_task,
view_cleanup_task,
_idc_actor: idc_actor,
})
}

Expand Down Expand Up @@ -1179,11 +1214,13 @@ impl Host {
) -> anyhow::Result<UpdateDatabaseResult> {
let replica_ctx = &self.replica_ctx;
let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db().clone());
let (_idc_starter, idc_sender) = IdcActor::open();

let (program, module) = make_module_host(
runtimes,
replica_ctx.clone(),
scheduler.clone(),
idc_sender,
program,
energy_monitor,
on_panic,
Expand Down
Loading
Loading