From 382b9174ac540081ece240b9d77ee2b611a2a105 Mon Sep 17 00:00:00 2001 From: Drew Tada Date: Mon, 12 Feb 2024 12:41:11 -0500 Subject: [PATCH 1/2] timer refactor --- Cargo.lock | 2 +- kinode/src/timer.rs | 77 +++++++++++++++++++++++++-------------------- lib/src/core.rs | 6 ++++ 3 files changed, 50 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 41943ebf7..c7285c74a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3227,7 +3227,7 @@ dependencies = [ [[package]] name = "kit" version = "0.1.0" -source = "git+https://github.com/kinode-dao/kit?rev=25b098f#25b098fab136387065d6058162d33c727d277ab8" +source = "git+https://github.com/kinode-dao/kit?rev=0c43430#0c434306fdce55e11d3309959fc4a0fe6ae28def" dependencies = [ "anyhow", "base64 0.21.7", diff --git a/kinode/src/timer.rs b/kinode/src/timer.rs index 12c62eacf..c6ccb0038 100644 --- a/kinode/src/timer.rs +++ b/kinode/src/timer.rs @@ -1,7 +1,7 @@ use anyhow::Result; use lib::types::core::{ Address, KernelMessage, Message, MessageReceiver, MessageSender, PrintSender, Printout, - Response, TIMER_PROCESS_ID, + Response, TIMER_PROCESS_ID, TimerAction, }; use serde::{Deserialize, Serialize}; @@ -11,8 +11,8 @@ use serde::{Deserialize, Serialize}; /// requests made by other nodes. /// /// The interface of the timer module is as follows: -/// One kind of request is accepted: the IPC must be a little-endian byte-representation -/// of an unsigned 64-bit integer, in milliseconds. This request should always expect a Response. +/// One kind of request is accepted: TimerAction::SetTimer(u64), where the u64 is the time to wait +/// in milliseconds. This request should always expect a Response. /// If the request does not expect a Response, the timer will not be set. /// /// A proper Request will trigger the timer module to send a Response. The Response will be @@ -39,44 +39,53 @@ pub async fn timer_service( // we only handle Requests which contain a little-endian u64 as IPC, // except for a special "debug" message, which prints the current state let Message::Request(req) = km.message else { continue }; - if req.body == "debug".as_bytes() { + let Ok(timer_action) = serde_json::from_slice::(&req.body) else { let _ = print_tx.send(Printout { - verbosity: 0, - content: format!("timer service active timers ({}):", timer_map.timers.len()), + verbosity: 1, + content: "timer service received a request with an invalid body".to_string(), }).await; - for (k, v) in timer_map.timers.iter() { + continue + }; + match timer_action { + TimerAction::Debug => { let _ = print_tx.send(Printout { verbosity: 0, - content: format!("{}: {:?}", k, v), + content: format!("timer service active timers ({}):", timer_map.timers.len()), }).await; + for (k, v) in timer_map.timers.iter() { + let _ = print_tx.send(Printout { + verbosity: 0, + content: format!("{}: {:?}", k, v), + }).await; + } + continue + } + TimerAction::SetTimer(timer_millis) => { + // if the timer is set to pop in 0 millis, we immediately respond + // otherwise, store in our persisted map, and spawn a task that + // sleeps for the given time, then sends the response + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + let pop_time = now + timer_millis; + if timer_millis == 0 { + send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await; + continue + } + let _ = print_tx.send(Printout { + verbosity: 1, + content: format!("set timer to pop in {}ms", timer_millis), + }).await; + if !timer_map.contains(pop_time) { + timer_tasks.spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(timer_millis - 1)).await; + pop_time + }); + } + timer_map.insert(pop_time, km.id, km.rsvp.unwrap_or(km.source)); } - continue - } - let Ok(bytes): Result<[u8; 8], _> = req.body.try_into() else { continue }; - let timer_millis = u64::from_le_bytes(bytes); - // if the timer is set to pop in 0 millis, we immediately respond - // otherwise, store in our persisted map, and spawn a task that - // sleeps for the given time, then sends the response - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - let pop_time = now + timer_millis; - if timer_millis == 0 { - send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await; - continue - } - let _ = print_tx.send(Printout { - verbosity: 1, - content: format!("set timer to pop in {}ms", timer_millis), - }).await; - if !timer_map.contains(pop_time) { - timer_tasks.spawn(async move { - tokio::time::sleep(std::time::Duration::from_millis(timer_millis - 1)).await; - pop_time - }); } - timer_map.insert(pop_time, km.id, km.rsvp.unwrap_or(km.source)); } Some(Ok(time)) = timer_tasks.join_next() => { // when a timer pops, we send the response to the process(es) that set diff --git a/lib/src/core.rs b/lib/src/core.rs index f4bbdf2d3..d84504468 100644 --- a/lib/src/core.rs +++ b/lib/src/core.rs @@ -1486,3 +1486,9 @@ impl From> for SqliteError { } } } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum TimerAction { + Debug, + SetTimer(u64), +} From 452676cd26881dfdd59f48be935243c001df2adf Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 12 Feb 2024 17:41:39 +0000 Subject: [PATCH 2/2] Format Rust code using rustfmt --- kinode/src/timer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kinode/src/timer.rs b/kinode/src/timer.rs index c6ccb0038..9b93a1c2a 100644 --- a/kinode/src/timer.rs +++ b/kinode/src/timer.rs @@ -1,7 +1,7 @@ use anyhow::Result; use lib::types::core::{ Address, KernelMessage, Message, MessageReceiver, MessageSender, PrintSender, Printout, - Response, TIMER_PROCESS_ID, TimerAction, + Response, TimerAction, TIMER_PROCESS_ID, }; use serde::{Deserialize, Serialize};