diff --git a/Cargo.lock b/Cargo.lock index 41943ebf7..c7285c74a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3227,7 +3227,7 @@ dependencies = [ [[package]] name = "kit" version = "0.1.0" -source = "git+https://github.com/kinode-dao/kit?rev=25b098f#25b098fab136387065d6058162d33c727d277ab8" +source = "git+https://github.com/kinode-dao/kit?rev=0c43430#0c434306fdce55e11d3309959fc4a0fe6ae28def" dependencies = [ "anyhow", "base64 0.21.7", diff --git a/kinode/src/kernel/process.rs b/kinode/src/kernel/process.rs index 4759e2999..1edc104f5 100644 --- a/kinode/src/kernel/process.rs +++ b/kinode/src/kernel/process.rs @@ -112,11 +112,7 @@ impl ProcessState { ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { let res = match self.message_queue.pop_front() { Some(message_from_queue) => message_from_queue, - None => self - .recv_in_process - .recv() - .await - .expect("fatal: process couldn't receive next message"), + None => self.ingest_message().await, }; self.kernel_message_to_process_receive(res) } @@ -138,11 +134,7 @@ impl ProcessState { } // next, wait for the awaited message to arrive loop { - let res = self - .recv_in_process - .recv() - .await - .expect("fatal: process couldn't receive next message"); + let res = self.ingest_message().await; let id = match &res { Ok(km) => km.id, Err(e) => e.id, @@ -155,6 +147,131 @@ impl ProcessState { } } + /// ingest next valid message from kernel. + /// cancel any timeout task associated with this message. + /// if the message is a response, only enqueue if we have an outstanding request for it. + async fn ingest_message(&mut self) -> Result { + loop { + let message = self + .recv_in_process + .recv() + .await + .expect("fatal: process couldn't receive next message"); + + match &message { + Ok(km) => match &km.message { + t::Message::Response(_) => { + if let Some((_context, timeout_handle)) = self.contexts.get_mut(&km.id) { + timeout_handle.abort(); + return message; + } + } + _ => { + return message; + } + }, + Err(e) => { + if let Some((_context, timeout_handle)) = self.contexts.get_mut(&e.id) { + timeout_handle.abort(); + return message; + } + } + } + } + } + + /// Convert a message from the main event loop into a result for the process to receive. + /// If the message is a response or error, get context if we have one. + fn kernel_message_to_process_receive( + &mut self, + incoming: Result, + ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { + let (mut km, context) = match incoming { + Ok(mut km) => match km.message { + t::Message::Request(_) => { + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; + self.prompting_message = Some(km.clone()); + (km, None) + } + t::Message::Response(_) => match self.contexts.remove(&km.id) { + Some((context, _timeout_handle)) => { + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; + self.prompting_message = context.prompting_message; + (km, context.context) + } + None => { + self.last_blob = km.lazy_load_blob; + km.lazy_load_blob = None; + self.prompting_message = Some(km.clone()); + (km, None) + } + }, + }, + Err(e) => match self.contexts.remove(&e.id) { + None => return Err((t::en_wit_send_error(e.error), None)), + Some((context, _timeout_handle)) => { + self.prompting_message = context.prompting_message; + return Err((t::en_wit_send_error(e.error), context.context)); + } + }, + }; + + let pk = signature::UnparsedPublicKey::new( + &signature::ED25519, + self.keypair.as_ref().public_key(), + ); + + // prune any invalid capabilities before handing to process + // where invalid = supposedly issued by us, but not signed properly by us + match &mut km.message { + t::Message::Request(request) => { + request.capabilities.retain(|(cap, sig)| { + // The only time we verify a cap's signature is when a foreign node + // sends us a cap that we (allegedly) issued + if km.source.node != self.metadata.our.node + && cap.issuer.node == self.metadata.our.node + { + match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { + Ok(_) => true, + Err(_) => false, + } + } else { + return true; + } + }); + } + t::Message::Response((response, _)) => { + response.capabilities.retain(|(cap, sig)| { + // The only time we verify a cap's signature is when a foreign node + // sends us a cap that we (allegedly) issued + if km.source.node != self.metadata.our.node + && cap.issuer.node == self.metadata.our.node + { + match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { + Ok(_) => true, + Err(_) => false, + } + } else { + return true; + } + }); + } + }; + + Ok(( + km.source.en_wit(), + match km.message { + t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)), + // NOTE: we throw away whatever context came from the sender, that's not ours + t::Message::Response((response, _sent_context)) => { + wit::Message::Response((t::en_wit_response(response), context)) + } + }, + )) + } + /// takes Request generated by a process and sends it to the main event loop. /// will only fail if process does not have capability to send to target. /// if the request has a timeout (expects response), start a task to track @@ -362,99 +479,6 @@ impl ProcessState { .await .expect("fatal: kernel couldn't send response"); } - - /// Convert a message from the main event loop into a result for the process to receive. - /// If the message is a response or error, get context if we have one. - fn kernel_message_to_process_receive( - &mut self, - incoming: Result, - ) -> Result<(wit::Address, wit::Message), (wit::SendError, Option)> { - let (mut km, context) = match incoming { - Ok(mut km) => match km.message { - t::Message::Request(_) => { - self.last_blob = km.lazy_load_blob; - km.lazy_load_blob = None; - self.prompting_message = Some(km.clone()); - (km, None) - } - t::Message::Response(_) => { - if let Some((context, timeout_handle)) = self.contexts.remove(&km.id) { - timeout_handle.abort(); - self.last_blob = km.lazy_load_blob; - km.lazy_load_blob = None; - self.prompting_message = context.prompting_message; - (km, context.context) - } else { - self.last_blob = km.lazy_load_blob; - km.lazy_load_blob = None; - self.prompting_message = Some(km.clone()); - (km, None) - } - } - }, - Err(e) => match self.contexts.remove(&e.id) { - None => return Err((t::en_wit_send_error(e.error), None)), - Some((context, timeout_handle)) => { - timeout_handle.abort(); - self.prompting_message = context.prompting_message; - return Err((t::en_wit_send_error(e.error), context.context)); - } - }, - }; - - let pk = signature::UnparsedPublicKey::new( - &signature::ED25519, - self.keypair.as_ref().public_key(), - ); - - // prune any invalid capabilities before handing to process - // where invalid = supposedly issued by us, but not signed properly by us - match &mut km.message { - t::Message::Request(request) => { - request.capabilities.retain(|(cap, sig)| { - // The only time we verify a cap's signature is when a foreign node - // sends us a cap that we (allegedly) issued - if km.source.node != self.metadata.our.node - && cap.issuer.node == self.metadata.our.node - { - match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { - Ok(_) => true, - Err(_) => false, - } - } else { - return true; - } - }); - } - t::Message::Response((response, _)) => { - response.capabilities.retain(|(cap, sig)| { - // The only time we verify a cap's signature is when a foreign node - // sends us a cap that we (allegedly) issued - if km.source.node != self.metadata.our.node - && cap.issuer.node == self.metadata.our.node - { - match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) { - Ok(_) => true, - Err(_) => false, - } - } else { - return true; - } - }); - } - }; - - Ok(( - km.source.en_wit(), - match km.message { - t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)), - // NOTE: we throw away whatever context came from the sender, that's not ours - t::Message::Response((response, _sent_context)) => { - wit::Message::Response((t::en_wit_response(response), context)) - } - }, - )) - } } /// create a specific process, and generate a task that will run it.