Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 43 additions & 34 deletions kinode/src/timer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use lib::types::core::{
Address, KernelMessage, Message, MessageReceiver, MessageSender, PrintSender, Printout,
Response, TIMER_PROCESS_ID,
Response, TimerAction, TIMER_PROCESS_ID,
};
use serde::{Deserialize, Serialize};

Expand All @@ -11,8 +11,8 @@ use serde::{Deserialize, Serialize};
/// requests made by other nodes.
///
/// The interface of the timer module is as follows:
/// One kind of request is accepted: the IPC must be a little-endian byte-representation
/// of an unsigned 64-bit integer, in milliseconds. This request should always expect a Response.
/// One kind of request is accepted: TimerAction::SetTimer(u64), where the u64 is the time to wait
/// in milliseconds. This request should always expect a Response.
/// If the request does not expect a Response, the timer will not be set.
///
/// A proper Request will trigger the timer module to send a Response. The Response will be
Expand All @@ -39,44 +39,53 @@ pub async fn timer_service(
// we only handle Requests which contain a little-endian u64 as IPC,
// except for a special "debug" message, which prints the current state
let Message::Request(req) = km.message else { continue };
if req.body == "debug".as_bytes() {
let Ok(timer_action) = serde_json::from_slice::<TimerAction>(&req.body) else {
let _ = print_tx.send(Printout {
verbosity: 0,
content: format!("timer service active timers ({}):", timer_map.timers.len()),
verbosity: 1,
content: "timer service received a request with an invalid body".to_string(),
}).await;
for (k, v) in timer_map.timers.iter() {
continue
};
match timer_action {
TimerAction::Debug => {
let _ = print_tx.send(Printout {
verbosity: 0,
content: format!("{}: {:?}", k, v),
content: format!("timer service active timers ({}):", timer_map.timers.len()),
}).await;
for (k, v) in timer_map.timers.iter() {
let _ = print_tx.send(Printout {
verbosity: 0,
content: format!("{}: {:?}", k, v),
}).await;
}
continue
}
TimerAction::SetTimer(timer_millis) => {
// if the timer is set to pop in 0 millis, we immediately respond
// otherwise, store in our persisted map, and spawn a task that
// sleeps for the given time, then sends the response
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let pop_time = now + timer_millis;
if timer_millis == 0 {
send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await;
continue
}
let _ = print_tx.send(Printout {
verbosity: 1,
content: format!("set timer to pop in {}ms", timer_millis),
}).await;
if !timer_map.contains(pop_time) {
timer_tasks.spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(timer_millis - 1)).await;
pop_time
});
}
timer_map.insert(pop_time, km.id, km.rsvp.unwrap_or(km.source));
}
continue
}
let Ok(bytes): Result<[u8; 8], _> = req.body.try_into() else { continue };
let timer_millis = u64::from_le_bytes(bytes);
// if the timer is set to pop in 0 millis, we immediately respond
// otherwise, store in our persisted map, and spawn a task that
// sleeps for the given time, then sends the response
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let pop_time = now + timer_millis;
if timer_millis == 0 {
send_response(&our, km.id, km.rsvp.unwrap_or(km.source), &kernel_message_sender).await;
continue
}
let _ = print_tx.send(Printout {
verbosity: 1,
content: format!("set timer to pop in {}ms", timer_millis),
}).await;
if !timer_map.contains(pop_time) {
timer_tasks.spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(timer_millis - 1)).await;
pop_time
});
}
timer_map.insert(pop_time, km.id, km.rsvp.unwrap_or(km.source));
}
Some(Ok(time)) = timer_tasks.join_next() => {
// when a timer pops, we send the response to the process(es) that set
Expand Down
6 changes: 6 additions & 0 deletions lib/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1486,3 +1486,9 @@ impl From<tokio::sync::mpsc::error::SendError<CapMessage>> for SqliteError {
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TimerAction {
Debug,
SetTimer(u64),
}