Skip to content

Commit 3d5fc91

Browse files
authored
Implement distributed reply channels for remote await (#169)
* Implement distributed reply channels for remote await Add a request-response protocol over the mesh so that `await actor.method()` on a remote actor returns the actual reply instead of falling back to fire-and-forget. Wire protocol: - Extend HewWireEnvelope with request_id (field 5, varint) and source_node_id (field 6, varint). Fields are only encoded when non-zero, preserving backward compatibility with existing nodes. Reply routing table (hew_node.rs): - Process-global HashMap<u64, Arc<PendingReply>> keyed by request_id. - Atomic u64 counter for unique request ID generation. - Condvar-based blocking with timeout for the calling thread. Outbound remote ask (hew_node_api_ask): - If local PID: delegates to hew_actor_ask_by_id (new). - If remote PID: registers pending reply, sends envelope with request_id and source_node_id, blocks on condvar (5s timeout), returns reply data or zeroed fallback buffer. Inbound ask handling (node_inbound_router): - Detects request_id > 0 with source_node_id > 0. - Spawns a thread to perform a local blocking ask via hew_actor_ask_by_id, then sends the reply envelope back. Reply receipt (reader_loop in connection.rs): - Detects reply envelopes (request_id > 0, source_node_id == 0). - Deposits payload in the reply routing table, signalling the condvar to wake the blocked caller. Codegen (codegen.cpp ActorAskOpLowering): - Remote i64 targets now call hew_node_api_ask instead of hew_actor_send_by_id, enabling actual return values. New runtime helpers: - hew_actor_ask_by_id: blocking ask by actor ID (not pointer). - hew_reply_wait_with_size: returns both value and size. - InboundRouter extended to 6 params (+ request_id, source_node_id). Tests: - Wire envelope roundtrip with request_id/source_node_id. - Backward compatibility (fields default to zero). - Reply routing table: register, complete, remove, concurrent wake. Closes #165 * Fix unclosed mod tests delimiter from merge conflict The merge of main into issue-165-remote-await lost the closing brace for the mod tests block, causing the Node API wrapper functions and reply table code to be parsed inside the test module. This produced an 'unclosed delimiter' error on all CI platforms.
1 parent 5995696 commit 3d5fc91

7 files changed

Lines changed: 944 additions & 97 deletions

File tree

hew-codegen/src/codegen.cpp

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -969,28 +969,57 @@ struct ActorAskOpLowering : public mlir::OpConversionPattern<hew::ActorAskOp> {
969969
auto clonedArgs = deepCopyOwnedArgs(rewriter, loc, module, op.getArgs(), adaptor.getArgs());
970970
auto [dataPtr, dataSize] = emitPackArgs(rewriter, loc, clonedArgs);
971971

972-
// Remote actors (i64 PID from Node::lookup): use fire-and-forget send
973-
// via hew_actor_send_by_id. Full remote await (request-response) requires
974-
// a distributed reply channel protocol — planned for a future release.
972+
// Remote actors (i64 PID from Node::lookup): distributed ask via
973+
// hew_node_api_ask, which sends the message with a request_id over
974+
// the mesh and blocks until the reply arrives.
975975
if (targetVal.getType() == i64Type) {
976-
auto sendFuncType =
977-
rewriter.getFunctionType({i64Type, i32Type, ptrType, sizeType}, {i32Type});
978-
getOrInsertFuncDecl(module, rewriter, "hew_actor_send_by_id", sendFuncType);
979-
mlir::func::CallOp::create(rewriter, loc, "hew_actor_send_by_id", mlir::TypeRange{i32Type},
980-
mlir::ValueRange{targetVal, msgTypeVal, dataPtr, dataSize});
976+
auto askFuncType = rewriter.getFunctionType({i64Type, i32Type, ptrType, sizeType}, {ptrType});
977+
getOrInsertFuncDecl(module, rewriter, "hew_node_api_ask", askFuncType);
978+
auto call =
979+
mlir::func::CallOp::create(rewriter, loc, "hew_node_api_ask", mlir::TypeRange{ptrType},
980+
mlir::ValueRange{targetVal, msgTypeVal, dataPtr, dataSize});
981+
auto replyPtr = call.getResult(0);
981982

982-
// Return a zero/null result since we can't await remote actors yet.
983983
auto resultType = op.getResult().getType();
984-
mlir::Value zeroResult;
984+
985+
// Void-return handler: free the reply buffer and erase the op.
986+
// free(null) is a safe no-op if the ask failed.
985987
if (llvm::isa<mlir::NoneType>(resultType)) {
988+
auto freeFuncType = rewriter.getFunctionType({ptrType}, {});
989+
getOrInsertFuncDecl(module, rewriter, "free", freeFuncType);
990+
mlir::func::CallOp::create(rewriter, loc, "free", mlir::TypeRange{},
991+
mlir::ValueRange{replyPtr});
986992
rewriter.eraseOp(op);
987993
return mlir::success();
988-
} else if (resultType == ptrType || llvm::isa<mlir::LLVM::LLVMPointerType>(resultType)) {
989-
zeroResult = mlir::LLVM::ZeroOp::create(rewriter, loc, ptrType).getResult();
994+
}
995+
996+
// Non-void: load the result from the reply pointer.
997+
// hew_node_api_ask guarantees a non-null return for non-void asks
998+
// (returns a zeroed allocation on timeout/failure).
999+
mlir::Value resultVal;
1000+
if (resultType == ptrType || llvm::isa<mlir::LLVM::LLVMPointerType>(resultType)) {
1001+
auto loaded = mlir::LLVM::LoadOp::create(rewriter, loc, ptrType, replyPtr);
1002+
resultVal = loaded.getResult();
1003+
} else if (resultType == i32Type || resultType == rewriter.getI64Type() ||
1004+
llvm::isa<mlir::IntegerType>(resultType) ||
1005+
llvm::isa<mlir::FloatType>(resultType) ||
1006+
llvm::isa<mlir::LLVM::LLVMStructType>(resultType)) {
1007+
auto loaded = mlir::LLVM::LoadOp::create(rewriter, loc, resultType, replyPtr);
1008+
resultVal = loaded.getResult();
9901009
} else {
991-
zeroResult = mlir::arith::ConstantIntOp::create(rewriter, loc, resultType, 0);
1010+
auto loaded = mlir::LLVM::LoadOp::create(rewriter, loc, ptrType, replyPtr);
1011+
resultVal = mlir::UnrealizedConversionCastOp::create(rewriter, loc, resultType,
1012+
mlir::ValueRange{loaded.getResult()})
1013+
.getResult(0);
9921014
}
993-
rewriter.replaceOp(op, zeroResult);
1015+
1016+
// Free the malloc'd reply buffer.
1017+
auto freeFuncType = rewriter.getFunctionType({ptrType}, {});
1018+
getOrInsertFuncDecl(module, rewriter, "free", freeFuncType);
1019+
mlir::func::CallOp::create(rewriter, loc, "free", mlir::TypeRange{},
1020+
mlir::ValueRange{replyPtr});
1021+
1022+
rewriter.replaceOp(op, resultVal);
9941023
return mlir::success();
9951024
}
9961025

hew-runtime/src/actor.rs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1372,6 +1372,118 @@ pub unsafe extern "C" fn hew_actor_ask_with_channel(
13721372
send_result
13731373
}
13741374

1375+
/// Perform a blocking ask against an actor identified by PID.
1376+
///
1377+
/// Looks up the actor in `LIVE_ACTORS`, packs a reply channel into the
1378+
/// message, and waits for the reply. Returns the reply pointer and writes
1379+
/// the reply size to `*out_size`.
1380+
///
1381+
/// Returns null if the actor is not found locally or the send fails.
1382+
///
1383+
/// # Safety
1384+
///
1385+
/// - `data` must point to at least `size` readable bytes, or be null when
1386+
/// `size` is 0.
1387+
/// - `out_size` must be a valid, non-null writable pointer.
1388+
#[cfg(not(target_arch = "wasm32"))]
1389+
pub(crate) unsafe fn hew_actor_ask_by_id(
1390+
actor_id: u64,
1391+
msg_type: i32,
1392+
data: *mut c_void,
1393+
size: usize,
1394+
) -> *mut c_void {
1395+
let ptr_size = std::mem::size_of::<*mut c_void>();
1396+
let Some(total) = size.checked_add(ptr_size) else {
1397+
return std::ptr::null_mut();
1398+
};
1399+
1400+
let ch = reply_channel::hew_reply_channel_new();
1401+
1402+
// Pack: [original_data | reply_channel_ptr]
1403+
// SAFETY: malloc for packed buffer.
1404+
let packed = unsafe { libc::malloc(total) };
1405+
if packed.is_null() {
1406+
// SAFETY: ch was created by hew_reply_channel_new.
1407+
unsafe { reply_channel::hew_reply_channel_free(ch) };
1408+
return ptr::null_mut();
1409+
}
1410+
// SAFETY: copying data into packed buffer; write_unaligned handles the
1411+
// potentially-misaligned channel slot.
1412+
#[expect(
1413+
clippy::cast_ptr_alignment,
1414+
reason = "write_unaligned on the next line handles misalignment"
1415+
)]
1416+
unsafe {
1417+
if size > 0 && !data.is_null() {
1418+
ptr::copy_nonoverlapping(data.cast::<u8>(), packed.cast::<u8>(), size);
1419+
}
1420+
let ch_slot = packed.cast::<u8>().add(size).cast::<*mut c_void>();
1421+
ptr::write_unaligned(ch_slot, ch.cast());
1422+
}
1423+
1424+
// SAFETY: the actor now holds the sender-side reference until it replies.
1425+
unsafe { reply_channel::hew_reply_channel_retain(ch) };
1426+
1427+
// Look up actor and send packed message.
1428+
let sent = {
1429+
let guard = match LIVE_ACTORS.lock() {
1430+
Ok(g) => g,
1431+
Err(e) => e.into_inner(),
1432+
};
1433+
guard.as_ref().is_some_and(|set| {
1434+
set.iter().any(|actor_ptr| {
1435+
let actor = actor_ptr.0;
1436+
if actor.is_null() {
1437+
return false;
1438+
}
1439+
// SAFETY: LIVE_ACTORS pointers are valid while locked.
1440+
let matches = unsafe { (*actor).id == actor_id };
1441+
if matches {
1442+
// SAFETY: actor and packed data are valid.
1443+
let rc = unsafe { actor_send_result_internal(actor, msg_type, packed, total) };
1444+
rc == HewError::Ok as i32
1445+
} else {
1446+
false
1447+
}
1448+
})
1449+
})
1450+
};
1451+
1452+
// SAFETY: packed was malloc'd above.
1453+
unsafe { libc::free(packed) };
1454+
1455+
if !sent {
1456+
// SAFETY: release the sender-side reference retained for the failed send.
1457+
unsafe { reply_channel::hew_reply_channel_free(ch) };
1458+
// SAFETY: ch was created by hew_reply_channel_new.
1459+
unsafe { reply_channel::hew_reply_channel_free(ch) };
1460+
return std::ptr::null_mut();
1461+
}
1462+
1463+
let mut reply_size: usize = 0;
1464+
// SAFETY: ch is valid and single-reader; reply_size is a valid stack pointer.
1465+
let result = unsafe { reply_channel::hew_reply_wait_with_size(ch, &raw mut reply_size) };
1466+
1467+
// Store the reply size in a thread-local so the caller can retrieve it.
1468+
LAST_REPLY_SIZE.set(reply_size);
1469+
1470+
// SAFETY: ch was created by hew_reply_channel_new.
1471+
unsafe { reply_channel::hew_reply_channel_free(ch) };
1472+
1473+
result
1474+
}
1475+
1476+
// Thread-local storage for the reply size from the last `hew_actor_ask_by_id`.
1477+
std::thread_local! {
1478+
static LAST_REPLY_SIZE: std::cell::Cell<usize> = const { std::cell::Cell::new(0) };
1479+
}
1480+
1481+
/// Retrieve the size of the reply data from the most recent
1482+
/// `hew_actor_ask_by_id` call on the current thread.
1483+
pub(crate) unsafe fn hew_reply_data_size(_ptr: *mut c_void) -> usize {
1484+
LAST_REPLY_SIZE.get()
1485+
}
1486+
13751487
// ── Trap / Error ────────────────────────────────────────────────────────
13761488

13771489
/// Trap (panic) an actor: store an error code, close the mailbox, and

hew-runtime/src/connection.rs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,11 @@ struct ReconnectPlan {
194194
}
195195

196196
/// Inbound message routing callback.
197-
type InboundRouter = unsafe extern "C" fn(u64, i32, *mut u8, usize);
197+
///
198+
/// Parameters: `(target_actor_id, msg_type, data, size, request_id, source_node_id)`.
199+
/// `request_id` > 0 with `source_node_id` > 0 means this is an ask that expects
200+
/// a reply. `request_id` == 0 is fire-and-forget.
201+
type InboundRouter = unsafe extern "C" fn(u64, i32, *mut u8, usize, u64, u16);
198202

199203
// SAFETY: HewConnMgr is only accessed through C ABI functions that
200204
// serialize access via the internal Mutex. The transport pointer is
@@ -673,6 +677,8 @@ unsafe fn hew_conn_encode_envelope(
673677
msg_type,
674678
payload_size: payload_len as u32,
675679
payload,
680+
request_id: 0,
681+
source_node_id: 0,
676682
};
677683
// SAFETY: zeroed is valid for HewWireBuf.
678684
let mut wire_buf: HewWireBuf = unsafe { std::mem::zeroed() };
@@ -892,12 +898,30 @@ fn reader_loop(
892898
let mut envelope: HewWireEnvelope = std::mem::zeroed();
893899
let rc = hew_wire_decode_envelope(&raw mut wire_buf, &raw mut envelope);
894900
if rc == 0 {
895-
router_fn(
896-
envelope.target_actor_id,
897-
envelope.msg_type,
898-
envelope.payload,
899-
envelope.payload_size as usize,
900-
);
901+
// Reply envelopes (request_id > 0, source_node_id == 0) are
902+
// deposited directly into the reply routing table, bypassing
903+
// the normal inbound router.
904+
if envelope.request_id > 0 && envelope.source_node_id == 0 {
905+
let reply_payload =
906+
if envelope.payload_size > 0 && !envelope.payload.is_null() {
907+
std::slice::from_raw_parts(
908+
envelope.payload,
909+
envelope.payload_size as usize,
910+
)
911+
} else {
912+
&[]
913+
};
914+
crate::hew_node::complete_remote_reply(envelope.request_id, reply_payload);
915+
} else {
916+
router_fn(
917+
envelope.target_actor_id,
918+
envelope.msg_type,
919+
envelope.payload,
920+
envelope.payload_size as usize,
921+
envelope.request_id,
922+
envelope.source_node_id,
923+
);
924+
}
901925
}
902926
}
903927
}
@@ -1157,6 +1181,7 @@ pub unsafe extern "C" fn hew_connmgr_add(mgr: *mut HewConnMgr, conn_id: c_int) -
11571181
#[cfg(feature = "quic")]
11581182
{
11591183
// QUIC provides TLS 1.3 encryption — skip Noise when using QUIC transport.
1184+
// SAFETY: mgr.transport is valid while the connection manager is alive.
11601185
unsafe { crate::quic_transport::hew_transport_is_quic(mgr.transport) }
11611186
}
11621187
#[cfg(not(feature = "quic"))]

0 commit comments

Comments
 (0)