Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

230 changes: 127 additions & 103 deletions kinode/src/kernel/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@ impl ProcessState {
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let res = match self.message_queue.pop_front() {
Some(message_from_queue) => message_from_queue,
None => self
.recv_in_process
.recv()
.await
.expect("fatal: process couldn't receive next message"),
None => self.ingest_message().await,
};
self.kernel_message_to_process_receive(res)
}
Expand All @@ -138,11 +134,7 @@ impl ProcessState {
}
// next, wait for the awaited message to arrive
loop {
let res = self
.recv_in_process
.recv()
.await
.expect("fatal: process couldn't receive next message");
let res = self.ingest_message().await;
let id = match &res {
Ok(km) => km.id,
Err(e) => e.id,
Expand All @@ -155,6 +147,131 @@ impl ProcessState {
}
}

/// ingest next valid message from kernel.
/// cancel any timeout task associated with this message.
/// if the message is a response, only enqueue if we have an outstanding request for it.
async fn ingest_message(&mut self) -> Result<t::KernelMessage, t::WrappedSendError> {
loop {
let message = self
.recv_in_process
.recv()
.await
.expect("fatal: process couldn't receive next message");

match &message {
Ok(km) => match &km.message {
t::Message::Response(_) => {
if let Some((_context, timeout_handle)) = self.contexts.get_mut(&km.id) {
timeout_handle.abort();
return message;
}
}
_ => {
return message;
}
},
Err(e) => {
if let Some((_context, timeout_handle)) = self.contexts.get_mut(&e.id) {
timeout_handle.abort();
return message;
}
}
}
}
}

/// Convert a message from the main event loop into a result for the process to receive.
/// If the message is a response or error, get context if we have one.
fn kernel_message_to_process_receive(
&mut self,
incoming: Result<t::KernelMessage, t::WrappedSendError>,
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let (mut km, context) = match incoming {
Ok(mut km) => match km.message {
t::Message::Request(_) => {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(km, None)
}
t::Message::Response(_) => match self.contexts.remove(&km.id) {
Some((context, _timeout_handle)) => {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = context.prompting_message;
(km, context.context)
}
None => {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(km, None)
}
},
},
Err(e) => match self.contexts.remove(&e.id) {
None => return Err((t::en_wit_send_error(e.error), None)),
Some((context, _timeout_handle)) => {
self.prompting_message = context.prompting_message;
return Err((t::en_wit_send_error(e.error), context.context));
}
},
};

let pk = signature::UnparsedPublicKey::new(
&signature::ED25519,
self.keypair.as_ref().public_key(),
);

// prune any invalid capabilities before handing to process
// where invalid = supposedly issued by us, but not signed properly by us
match &mut km.message {
t::Message::Request(request) => {
request.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
t::Message::Response((response, _)) => {
response.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
};

Ok((
km.source.en_wit(),
match km.message {
t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)),
// NOTE: we throw away whatever context came from the sender, that's not ours
t::Message::Response((response, _sent_context)) => {
wit::Message::Response((t::en_wit_response(response), context))
}
},
))
}

/// takes Request generated by a process and sends it to the main event loop.
/// will only fail if process does not have capability to send to target.
/// if the request has a timeout (expects response), start a task to track
Expand Down Expand Up @@ -362,99 +479,6 @@ impl ProcessState {
.await
.expect("fatal: kernel couldn't send response");
}

/// Convert a message from the main event loop into a result for the process to receive.
/// If the message is a response or error, get context if we have one.
fn kernel_message_to_process_receive(
&mut self,
incoming: Result<t::KernelMessage, t::WrappedSendError>,
) -> Result<(wit::Address, wit::Message), (wit::SendError, Option<wit::Context>)> {
let (mut km, context) = match incoming {
Ok(mut km) => match km.message {
t::Message::Request(_) => {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(km, None)
}
t::Message::Response(_) => {
if let Some((context, timeout_handle)) = self.contexts.remove(&km.id) {
timeout_handle.abort();
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = context.prompting_message;
(km, context.context)
} else {
self.last_blob = km.lazy_load_blob;
km.lazy_load_blob = None;
self.prompting_message = Some(km.clone());
(km, None)
}
}
},
Err(e) => match self.contexts.remove(&e.id) {
None => return Err((t::en_wit_send_error(e.error), None)),
Some((context, timeout_handle)) => {
timeout_handle.abort();
self.prompting_message = context.prompting_message;
return Err((t::en_wit_send_error(e.error), context.context));
}
},
};

let pk = signature::UnparsedPublicKey::new(
&signature::ED25519,
self.keypair.as_ref().public_key(),
);

// prune any invalid capabilities before handing to process
// where invalid = supposedly issued by us, but not signed properly by us
match &mut km.message {
t::Message::Request(request) => {
request.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
t::Message::Response((response, _)) => {
response.capabilities.retain(|(cap, sig)| {
// The only time we verify a cap's signature is when a foreign node
// sends us a cap that we (allegedly) issued
if km.source.node != self.metadata.our.node
&& cap.issuer.node == self.metadata.our.node
{
match pk.verify(&rmp_serde::to_vec(&cap).unwrap_or_default(), sig) {
Ok(_) => true,
Err(_) => false,
}
} else {
return true;
}
});
}
};

Ok((
km.source.en_wit(),
match km.message {
t::Message::Request(request) => wit::Message::Request(t::en_wit_request(request)),
// NOTE: we throw away whatever context came from the sender, that's not ours
t::Message::Response((response, _sent_context)) => {
wit::Message::Response((t::en_wit_response(response), context))
}
},
))
}
}

/// create a specific process, and generate a task that will run it.
Expand Down