Skip to content

[RSDK-8948] refactor task lifecycles for graceful shutdown #503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

gvaradarajan
Copy link
Member

No description provided.

@gvaradarajan gvaradarajan requested a review from a team as a code owner May 9, 2025 21:31

let mut tasks: FuturesUnordered<_> = FuturesUnordered::new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

log::info!("viam server started");
while let Some(ret) = tasks.next().await {
log::error!("task ran returned {:?}", ret);
let (app_client_tasks_result, server_task_result) = system_task.await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's pretty cool how that works

Copy link
Member

@mattjperez mattjperez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, however might want to wait for the poll function in viam.rs to get a second reviewer

Copy link
Member

@npmenard npmenard left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we running the desctructor of LocalRobot before we shutdown ViamServer?

@gvaradarajan
Copy link
Member Author

Are we running the desctructor of LocalRobot before we shutdown ViamServer?

we are now

@gvaradarajan gvaradarajan requested a review from npmenard May 15, 2025 19:27
Copy link
Member

@npmenard npmenard left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM left a couple of comments / nits
I did some quick test and I always ran into the timeout (120 secs) am I doing something wrong?

}
}
exec.block_on(force_shutdown(run_result.app_client.clone()));
unreachable!()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: so this doesn't get us back to main?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how I would do that besides putting the shutdown call in main.rs (which doesn't seem like a good idea?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then ok!

@@ -1307,6 +1447,9 @@ mod tests {
"/viam.app.v1.RobotService/Log" => self.log(body.split_off(5)),
"/viam.app.v1.RobotService/NeedsRestart" => self.needs_restart(body.split_off(5)),
"/viam.app.v1.RobotService/Config" => self.get_config(),
"/viam.app.agent.v1.AgentDeviceService/DeviceAgentConfig" => {
return Err(ServerError::new(GrpcError::RpcInternal, None));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return unimplemented by default?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, do you mean that this shouldn't panic, or that I should just change the error for this endpoint to unimplemented?

match self {
Self::Restart => "restart".to_string(),
Self::DeepSleep(dur) => dur.as_ref().map_or("enter deep sleep".to_string(), |d| {
format!("enter deep sleep for {} microseconds", d.as_micros())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would you go about configuring other wake up sources?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was that part of the scope of this? If so I can add that in a follow-up PR where I change the DeepSleep enum variant to be a struct with all of the possible settings. Then you would configure these settings through the agent config (just as you would the active timeout)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope just a general question

}

impl SystemEvent {
fn action_string(&self) -> String {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should implement Display trait

@gvaradarajan
Copy link
Member Author

LGTM left a couple of comments / nits I did some quick test and I always ran into the timeout (120 secs) am I doing something wrong?

@npmenard you're not doing anything wrong, it's just that the server task waits for a connection for much longer than the timeout. Wasn't sure exactly how to interrupt that, but I can give it another shot

@npmenard
Copy link
Member

LGTM left a couple of comments / nits I did some quick test and I always ran into the timeout (120 secs) am I doing something wrong?

@npmenard you're not doing anything wrong, it's just that the server task waits for a connection for much longer than the timeout. Wasn't sure exactly how to interrupt that, but I can give it another shot

yes we shouldn't wait more than needed (task done, client close etc...)

@gvaradarajan gvaradarajan force-pushed the RSDK-8948-system-lifecycle branch from d0094f0 to 8f72099 Compare May 22, 2025 18:36
@gvaradarajan gvaradarajan requested a review from npmenard May 22, 2025 20:42
Copy link
Member

@acmorrow acmorrow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took first pass through, here are some comments - mostly questions. I'll probably take another look on Tuesday morning to try to understand a few parts better.

@@ -545,8 +575,8 @@ pub struct WebRtcApi<C, E> {

impl<C, E> WebRtcApi<C, E>
where
C: Certificate,
E: WebRtcExecutor<Pin<Box<dyn Future<Output = ()>>>> + Clone,
C: Certificate + 'static,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why does this need 'static now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying something that didn't work out and I forgot to remove this

use super::log::LogUploadTask;
use super::runtime::terminate;

pub(crate) static SHUTDOWN_EVENT: LazyLock<Arc<AsyncMutex<Option<SystemEvent>>>> =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we leave a comment that, should we find ourselves back here, it would be a good idea to find a way to make this not global? I don't think it needs a ticket, just a TODO: Find a way to do this without introducing mutable global state.

}
}

pub(crate) async fn send_system_change(event: SystemEvent) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_system_event?

Also,. just thinking aloud here, but this makes me wonder if eventually this should be a channel, rather than just a slot. For now, a slot makes sense because both system events are terminal, but I could see a use for a more general facility later.

use super::log::LogUploadTask;
use super::runtime::terminate;

pub(crate) static SHUTDOWN_EVENT: LazyLock<Arc<AsyncMutex<Option<SystemEvent>>>> =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does SHUTDOWN_EVENT need to be pub at all? I think I'd be happier if it were entirely an implementation detail used by the API here.

}

pub(crate) async fn force_shutdown(app_client: Option<AppClient>) {
// flush logs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a deadline on this? Or is there already, implicitly, elsewhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is not, I will add one

if result != crate::esp32::esp_idf_svc::sys::ESP_OK {
unreachable!("duration requested too long")
}
log::warn!("Esp32 entering deep sleep for {} microseconds!", dur_micros);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this log ever be seen?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be seen in the monitor

async_io::block_on(Timer::after(dur));
terminate();
} else {
log::error!("sleep from wake up not supported for native builds")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message needs a little cleanup I think. Also, is it meaningful for execution to continue from here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was either this or panic I feel, do we prefer panic?

let mut app_client_tasks: FuturesUnordered<AppClientTaskRunner> =
FuturesUnordered::new();
log::info!("starting execution of app client tasks");
for task in &self.app_client_tasks {
log::info!("queueing {:?} task", task.name());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe debug?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meant to remove this entirely actually

@@ -777,16 +884,26 @@ where
},
});
}
let shutdown_was_requested = shutdown_requested_nonblocking().await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me like this only checks shutdown before it starts running the app client tasks. Shouldn't it be being awaited in parallel with the app client tasks, so that if shutdown is requested the app client tasks stop? This is probably me misunderstanding, so, just looking for a little more explanation here. Which maybe should get folded into a comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That logic is being handled in TaskRunnerState and AppClientTaskRunner. I'll add some comments

Copy link
Member

@npmenard npmenard left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM left a couple of comments. Mainly concerned by the increased stack usage

if let Err(e) = self.serve_incoming_connection(incoming).await {
log::error!("failed to serve incoming connection: {:?}", e)

let mut connection_future =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this have a penalty on stack consumption vs how it was before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure, I'll profile the stack usage and get back to you on that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a penalty of about 1.5 KB of stack, do we feel this is unacceptable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved offline, this is deemed as "not great, but temporarily acceptable"

// silence errors coming from context cancellation
if *code == 1 {
return Ok(None);
let mut shutdown_poll_future = Box::pin(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question about stack

@@ -59,6 +60,10 @@ impl WebRTCConnection {
}
pub(crate) async fn run(&mut self) -> Result<(), ServerError> {
loop {
if shutdown_requested_nonblocking().await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just unilaterally close connections? this basically does that already. We might want to support graceful shutdown of `DTLS/HTTP2 connections later

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved offline

return Ok(());
} else {
log::error!("reached state of completed tasks without shutdown request, additional investigation required");
return Err(errors::ServerError::ServerInvalidCompletedState);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can that even happen?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. But if it does, it's wrong so I thought we'd want to make sure we know without necessarily producing a coredump

@@ -1307,6 +1447,9 @@ mod tests {
"/viam.app.v1.RobotService/Log" => self.log(body.split_off(5)),
"/viam.app.v1.RobotService/NeedsRestart" => self.needs_restart(body.split_off(5)),
"/viam.app.v1.RobotService/Config" => self.get_config(),
"/viam.app.agent.v1.AgentDeviceService/DeviceAgentConfig" => {
return Err(ServerError::new(GrpcError::RpcInternal, None));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump


pub(crate) async fn send_system_change(event: SystemEvent) {
log::info!("received call to {}", event);
let _ = SHUTDOWN_EVENT.lock().await.insert(event);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have multiple event they are going to be overwritten no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that we actually want to respect the latest requested event?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind, that decision's ridiculous. I'll fix this

}
}
None => {
log::error!("call to shutdown/restart without request to system, restarting");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: terminating.

@gvaradarajan gvaradarajan merged commit 63803aa into viamrobotics:main May 27, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants