diff --git a/kinode/src/eth/mod.rs b/kinode/src/eth/mod.rs index bd6802c96..82fc34998 100644 --- a/kinode/src/eth/mod.rs +++ b/kinode/src/eth/mod.rs @@ -1097,37 +1097,48 @@ async fn kernel_message( body: T, send_to_loop: &MessageSender, ) { - let _ = send_to_loop - .send(KernelMessage { - id: km_id, - source: Address { - node: our.to_string(), - process: ETH_PROCESS_ID.clone(), - }, - target, - rsvp, - message: if req { - Message::Request(Request { + let Err(e) = send_to_loop.try_send(KernelMessage { + id: km_id, + source: Address { + node: our.to_string(), + process: ETH_PROCESS_ID.clone(), + }, + target, + rsvp, + message: if req { + Message::Request(Request { + inherit: false, + expects_response: timeout, + body: serde_json::to_vec(&body).unwrap(), + metadata: None, + capabilities: vec![], + }) + } else { + Message::Response(( + Response { inherit: false, - expects_response: timeout, body: serde_json::to_vec(&body).unwrap(), metadata: None, capabilities: vec![], - }) - } else { - Message::Response(( - Response { - inherit: false, - body: serde_json::to_vec(&body).unwrap(), - metadata: None, - capabilities: vec![], - }, - None, - )) - }, - lazy_load_blob: None, - }) - .await; + }, + None, + )) + }, + lazy_load_blob: None, + }) else { + // not Err -> send successful; done here + return; + }; + // its an Err: handle + match e { + tokio::sync::mpsc::error::TrySendError::Closed(_) => { + panic!("(eth) kernel message sender: receiver closed"); + } + tokio::sync::mpsc::error::TrySendError::Full(_) => { + // TODO: implement backpressure + panic!("(eth) kernel overloaded with messages: TODO: implement backpressure"); + } + } } fn find_index(vec: &Vec<&str>, item: &str) -> Option { diff --git a/kinode/src/main.rs b/kinode/src/main.rs index 6d25037c5..da7a40110 100644 --- a/kinode/src/main.rs +++ b/kinode/src/main.rs @@ -31,7 +31,7 @@ mod terminal; mod timer; mod vfs; -const EVENT_LOOP_CHANNEL_CAPACITY: usize = 10_000; +const EVENT_LOOP_CHANNEL_CAPACITY: usize = 100_000; const EVENT_LOOP_DEBUG_CHANNEL_CAPACITY: usize = 50; const TERMINAL_CHANNEL_CAPACITY: usize = 32; const WEBSOCKET_SENDER_CHANNEL_CAPACITY: usize = 32; diff --git a/lib/src/core.rs b/lib/src/core.rs index 6de12d838..8b8ee2146 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -1190,7 +1190,20 @@ impl KernelMessage { } pub async fn send(self, sender: &MessageSender) { - sender.send(self).await.expect("kernel message sender died"); + let Err(e) = sender.try_send(self) else { + // not Err -> send successful; done here + return; + }; + // its an Err: handle + match e { + tokio::sync::mpsc::error::TrySendError::Closed(_) => { + panic!("kernel message sender: receiver closed"); + } + tokio::sync::mpsc::error::TrySendError::Full(_) => { + // TODO: implement backpressure + panic!("kernel overloaded with messages: TODO: implement backpressure"); + } + } } }