-
Notifications
You must be signed in to change notification settings - Fork 15
[RSDK-8948] refactor task lifecycles for graceful shutdown #503
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RSDK-8948] refactor task lifecycles for graceful shutdown #503
Conversation
|
||
let mut tasks: FuturesUnordered<_> = FuturesUnordered::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
log::info!("viam server started"); | ||
while let Some(ret) = tasks.next().await { | ||
log::error!("task ran returned {:?}", ret); | ||
let (app_client_tasks_result, server_task_result) = system_task.await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's pretty cool how that works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, however might want to wait for the poll
function in viam.rs
to get a second reviewer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we running the desctructor of LocalRobot before we shutdown ViamServer?
we are now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM left a couple of comments / nits
I did some quick test and I always ran into the timeout (120 secs) am I doing something wrong?
} | ||
} | ||
exec.block_on(force_shutdown(run_result.app_client.clone())); | ||
unreachable!() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: so this doesn't get us back to main?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how I would do that besides putting the shutdown call in main.rs
(which doesn't seem like a good idea?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then ok!
micro-rdk/src/common/conn/viam.rs
Outdated
@@ -1307,6 +1447,9 @@ mod tests { | |||
"/viam.app.v1.RobotService/Log" => self.log(body.split_off(5)), | |||
"/viam.app.v1.RobotService/NeedsRestart" => self.needs_restart(body.split_off(5)), | |||
"/viam.app.v1.RobotService/Config" => self.get_config(), | |||
"/viam.app.agent.v1.AgentDeviceService/DeviceAgentConfig" => { | |||
return Err(ServerError::new(GrpcError::RpcInternal, None)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return unimplemented by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify, do you mean that this shouldn't panic, or that I should just change the error for this endpoint to unimplemented?
micro-rdk/src/common/system.rs
Outdated
match self { | ||
Self::Restart => "restart".to_string(), | ||
Self::DeepSleep(dur) => dur.as_ref().map_or("enter deep sleep".to_string(), |d| { | ||
format!("enter deep sleep for {} microseconds", d.as_micros()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how would you go about configuring other wake up sources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was that part of the scope of this? If so I can add that in a follow-up PR where I change the DeepSleep
enum variant to be a struct with all of the possible settings. Then you would configure these settings through the agent config (just as you would the active timeout)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope just a general question
micro-rdk/src/common/system.rs
Outdated
} | ||
|
||
impl SystemEvent { | ||
fn action_string(&self) -> String { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should implement Display trait
@npmenard you're not doing anything wrong, it's just that the server task waits for a connection for much longer than the timeout. Wasn't sure exactly how to interrupt that, but I can give it another shot |
yes we shouldn't wait more than needed (task done, client close etc...) |
This reverts commit 62feb34.
d0094f0
to
8f72099
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took first pass through, here are some comments - mostly questions. I'll probably take another look on Tuesday morning to try to understand a few parts better.
micro-rdk/src/common/webrtc/api.rs
Outdated
@@ -545,8 +575,8 @@ pub struct WebRtcApi<C, E> { | |||
|
|||
impl<C, E> WebRtcApi<C, E> | |||
where | |||
C: Certificate, | |||
E: WebRtcExecutor<Pin<Box<dyn Future<Output = ()>>>> + Clone, | |||
C: Certificate + 'static, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, why does this need 'static
now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying something that didn't work out and I forgot to remove this
micro-rdk/src/common/system.rs
Outdated
use super::log::LogUploadTask; | ||
use super::runtime::terminate; | ||
|
||
pub(crate) static SHUTDOWN_EVENT: LazyLock<Arc<AsyncMutex<Option<SystemEvent>>>> = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we leave a comment that, should we find ourselves back here, it would be a good idea to find a way to make this not global? I don't think it needs a ticket, just a TODO: Find a way to do this without introducing mutable global state
.
micro-rdk/src/common/system.rs
Outdated
} | ||
} | ||
|
||
pub(crate) async fn send_system_change(event: SystemEvent) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
send_system_event
?
Also,. just thinking aloud here, but this makes me wonder if eventually this should be a channel, rather than just a slot. For now, a slot makes sense because both system events are terminal, but I could see a use for a more general facility later.
micro-rdk/src/common/system.rs
Outdated
use super::log::LogUploadTask; | ||
use super::runtime::terminate; | ||
|
||
pub(crate) static SHUTDOWN_EVENT: LazyLock<Arc<AsyncMutex<Option<SystemEvent>>>> = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does SHUTDOWN_EVENT
need to be pub at all? I think I'd be happier if it were entirely an implementation detail used by the API here.
} | ||
|
||
pub(crate) async fn force_shutdown(app_client: Option<AppClient>) { | ||
// flush logs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a deadline on this? Or is there already, implicitly, elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is not, I will add one
if result != crate::esp32::esp_idf_svc::sys::ESP_OK { | ||
unreachable!("duration requested too long") | ||
} | ||
log::warn!("Esp32 entering deep sleep for {} microseconds!", dur_micros); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this log ever be seen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will be seen in the monitor
micro-rdk/src/common/system.rs
Outdated
async_io::block_on(Timer::after(dur)); | ||
terminate(); | ||
} else { | ||
log::error!("sleep from wake up not supported for native builds") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The message needs a little cleanup I think. Also, is it meaningful for execution to continue from here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was either this or panic I feel, do we prefer panic?
micro-rdk/src/common/conn/viam.rs
Outdated
let mut app_client_tasks: FuturesUnordered<AppClientTaskRunner> = | ||
FuturesUnordered::new(); | ||
log::info!("starting execution of app client tasks"); | ||
for task in &self.app_client_tasks { | ||
log::info!("queueing {:?} task", task.name()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe debug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
meant to remove this entirely actually
micro-rdk/src/common/conn/viam.rs
Outdated
@@ -777,16 +884,26 @@ where | |||
}, | |||
}); | |||
} | |||
let shutdown_was_requested = shutdown_requested_nonblocking().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks to me like this only checks shutdown before it starts running the app client tasks. Shouldn't it be being awaited in parallel with the app client tasks, so that if shutdown is requested the app client tasks stop? This is probably me misunderstanding, so, just looking for a little more explanation here. Which maybe should get folded into a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That logic is being handled in TaskRunnerState
and AppClientTaskRunner
. I'll add some comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM left a couple of comments. Mainly concerned by the increased stack usage
if let Err(e) = self.serve_incoming_connection(incoming).await { | ||
log::error!("failed to serve incoming connection: {:?}", e) | ||
|
||
let mut connection_future = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this have a penalty on stack consumption vs how it was before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure, I'll profile the stack usage and get back to you on that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a penalty of about 1.5 KB of stack, do we feel this is unacceptable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved offline, this is deemed as "not great, but temporarily acceptable"
// silence errors coming from context cancellation | ||
if *code == 1 { | ||
return Ok(None); | ||
let mut shutdown_poll_future = Box::pin( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question about stack
@@ -59,6 +60,10 @@ impl WebRTCConnection { | |||
} | |||
pub(crate) async fn run(&mut self) -> Result<(), ServerError> { | |||
loop { | |||
if shutdown_requested_nonblocking().await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just unilaterally close connections? this basically does that already. We might want to support graceful shutdown of `DTLS/HTTP2 connections later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved offline
return Ok(()); | ||
} else { | ||
log::error!("reached state of completed tasks without shutdown request, additional investigation required"); | ||
return Err(errors::ServerError::ServerInvalidCompletedState); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can that even happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. But if it does, it's wrong so I thought we'd want to make sure we know without necessarily producing a coredump
micro-rdk/src/common/conn/viam.rs
Outdated
@@ -1307,6 +1447,9 @@ mod tests { | |||
"/viam.app.v1.RobotService/Log" => self.log(body.split_off(5)), | |||
"/viam.app.v1.RobotService/NeedsRestart" => self.needs_restart(body.split_off(5)), | |||
"/viam.app.v1.RobotService/Config" => self.get_config(), | |||
"/viam.app.agent.v1.AgentDeviceService/DeviceAgentConfig" => { | |||
return Err(ServerError::new(GrpcError::RpcInternal, None)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump
micro-rdk/src/common/system.rs
Outdated
|
||
pub(crate) async fn send_system_change(event: SystemEvent) { | ||
log::info!("received call to {}", event); | ||
let _ = SHUTDOWN_EVENT.lock().await.insert(event); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we have multiple event they are going to be overwritten no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that we actually want to respect the latest requested event?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
never mind, that decision's ridiculous. I'll fix this
micro-rdk/src/common/system.rs
Outdated
} | ||
} | ||
None => { | ||
log::error!("call to shutdown/restart without request to system, restarting"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: terminating.
No description provided.