From 416a808c166d8fc2e5ac01ff93e21105aab3902e Mon Sep 17 00:00:00 2001 From: Jocelyn Boullier Date: Tue, 27 Jun 2023 10:59:44 +0200 Subject: [PATCH] Attempt at fixing sporadic failures of `shuttle-deployer` (#980) * feat(deployer): use joinset to await builder tasks on shutdown * feat(deployer): use joinset for the DeploymentManager as well * test(deployment): when testing tests, abort early when different * test(deployment): use test_states in deployment_to_be_queued Instead of relying on a one second sleep. * fix(deployment): properly await spawned tasks * ref(deployer): move join set of DeploymentManager into struct * fix(deployer): use tokio::select! to await tasks set in deploy/run queue --- deployer/src/deployment/deploy_layer.rs | 41 ++++--- deployer/src/deployment/mod.rs | 12 +- deployer/src/deployment/queue.rs | 110 +++++++++-------- deployer/src/deployment/run.rs | 154 ++++++++++++++---------- deployer/src/lib.rs | 1 + 5 files changed, 185 insertions(+), 133 deletions(-) diff --git a/deployer/src/deployment/deploy_layer.rs b/deployer/src/deployment/deploy_layer.rs index 8af5c231c8..1b03b809a3 100644 --- a/deployer/src/deployment/deploy_layer.rs +++ b/deployer/src/deployment/deploy_layer.rs @@ -574,9 +574,14 @@ mod tests { async fn test_states(id: &Uuid, expected_states: Vec) { loop { let states = RECORDER.lock().unwrap().get_deployment_states(id); + if states == expected_states { + return; + } - if *states == expected_states { - break; + for (actual, expected) in states.iter().zip(&expected_states) { + if actual != expected { + return; + } } sleep(Duration::from_millis(250)).await; @@ -585,7 +590,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn deployment_to_be_queued() { - let deployment_manager = get_deployment_manager().await; + let deployment_manager = get_deployment_manager(); let queued = get_queue("sleep-async"); let id = queued.id; @@ -628,12 +633,8 @@ mod tests { // Send kill signal deployment_manager.kill(id).await; - sleep(Duration::from_secs(1)).await; - - let states = RECORDER.lock().unwrap().get_deployment_states(&id); - - assert_eq!( - *states, + let test = test_states( + &id, vec![ StateLog { id, @@ -659,13 +660,21 @@ mod tests { id, state: State::Stopped, }, - ] + ], ); + + select! { + _ = sleep(Duration::from_secs(60)) => { + let states = RECORDER.lock().unwrap().get_deployment_states(&id); + panic!("states should go into 'Stopped' for a valid service: {:#?}", states); + }, + _ = test => {} + }; } #[tokio::test(flavor = "multi_thread")] async fn deployment_self_stop() { - let deployment_manager = get_deployment_manager().await; + let deployment_manager = get_deployment_manager(); let queued = get_queue("self-stop"); let id = queued.id; @@ -712,7 +721,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn deployment_bind_panic() { - let deployment_manager = get_deployment_manager().await; + let deployment_manager = get_deployment_manager(); let queued = get_queue("bind-panic"); let id = queued.id; @@ -759,7 +768,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn deployment_main_panic() { - let deployment_manager = get_deployment_manager().await; + let deployment_manager = get_deployment_manager(); let queued = get_queue("main-panic"); let id = queued.id; @@ -802,7 +811,7 @@ mod tests { #[tokio::test] async fn deployment_from_run() { - let deployment_manager = get_deployment_manager().await; + let deployment_manager = get_deployment_manager(); let id = Uuid::new_v4(); deployment_manager @@ -845,7 +854,7 @@ mod tests { #[tokio::test] async fn scope_with_nil_id() { - let deployment_manager = get_deployment_manager().await; + let deployment_manager = get_deployment_manager(); let id = Uuid::nil(); deployment_manager @@ -872,7 +881,7 @@ mod tests { ); } - async fn get_deployment_manager() -> DeploymentManager { + fn get_deployment_manager() -> DeploymentManager { DeploymentManager::builder() .build_log_recorder(RECORDER.clone()) .secret_recorder(RECORDER.clone()) diff --git a/deployer/src/deployment/mod.rs b/deployer/src/deployment/mod.rs index fe5aca3139..33141e546d 100644 --- a/deployer/src/deployment/mod.rs +++ b/deployer/src/deployment/mod.rs @@ -15,7 +15,10 @@ use crate::{ persistence::{DeploymentUpdater, ResourceManager, SecretGetter, SecretRecorder, State}, RuntimeManager, }; -use tokio::sync::{mpsc, Mutex}; +use tokio::{ + sync::{mpsc, Mutex}, + task::JoinSet, +}; use uuid::Uuid; use self::{deploy_layer::LogRecorder, gateway_client::BuildQueueClient}; @@ -125,8 +128,9 @@ where let storage_manager = ArtifactsStorageManager::new(artifacts_path); let run_send_clone = run_send.clone(); + let mut set = JoinSet::new(); - tokio::spawn(queue::task( + set.spawn(queue::task( queue_recv, run_send_clone, deployment_updater.clone(), @@ -135,7 +139,7 @@ where storage_manager.clone(), queue_client, )); - tokio::spawn(run::task( + set.spawn(run::task( run_recv, runtime_manager.clone(), deployment_updater, @@ -150,6 +154,7 @@ where run_send, runtime_manager, storage_manager, + _join_set: Arc::new(Mutex::new(set)), } } } @@ -160,6 +165,7 @@ pub struct DeploymentManager { run_send: RunSender, runtime_manager: Arc>, storage_manager: ArtifactsStorageManager, + _join_set: Arc>>, } /// ```no-test diff --git a/deployer/src/deployment/queue.rs b/deployer/src/deployment/queue.rs index b6f3e3bb71..a59a20a723 100644 --- a/deployer/src/deployment/queue.rs +++ b/deployer/src/deployment/queue.rs @@ -12,6 +12,7 @@ use opentelemetry::global; use serde_json::json; use shuttle_common::claims::Claim; use shuttle_service::builder::{build_workspace, BuiltService}; +use tokio::task::JoinSet; use tokio::time::{sleep, timeout}; use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -40,58 +41,71 @@ pub async fn task( ) { info!("Queue task started"); - while let Some(queued) = recv.recv().await { - let id = queued.id; - - info!("Queued deployment at the front of the queue: {id}"); - - let deployment_updater = deployment_updater.clone(); - let run_send_cloned = run_send.clone(); - let log_recorder = log_recorder.clone(); - let secret_recorder = secret_recorder.clone(); - let storage_manager = storage_manager.clone(); - let queue_client = queue_client.clone(); - - tokio::spawn(async move { - let parent_cx = global::get_text_map_propagator(|propagator| { - propagator.extract(&queued.tracing_context) - }); - let span = debug_span!("builder"); - span.set_parent(parent_cx); - - async move { - match timeout( - Duration::from_secs(60 * 3), // Timeout after 3 minutes if the build queue hangs or it takes too long for a slot to become available - wait_for_queue(queue_client.clone(), id), - ) - .await - { - Ok(_) => {} - Err(err) => return build_failed(&id, err), - } + let mut tasks = JoinSet::new(); - match queued - .handle( - storage_manager, - deployment_updater, - log_recorder, - secret_recorder, - ) - .await - { - Ok(built) => { - remove_from_queue(queue_client, id).await; - promote_to_run(built, run_send_cloned).await - } - Err(err) => { - remove_from_queue(queue_client, id).await; - build_failed(&id, err) + loop { + tokio::select! { + Some(queued) = recv.recv() => { + let id = queued.id; + + info!("Queued deployment at the front of the queue: {id}"); + + let deployment_updater = deployment_updater.clone(); + let run_send_cloned = run_send.clone(); + let log_recorder = log_recorder.clone(); + let secret_recorder = secret_recorder.clone(); + let storage_manager = storage_manager.clone(); + let queue_client = queue_client.clone(); + + tasks.spawn(async move { + let parent_cx = global::get_text_map_propagator(|propagator| { + propagator.extract(&queued.tracing_context) + }); + let span = debug_span!("builder"); + span.set_parent(parent_cx); + + async move { + match timeout( + Duration::from_secs(60 * 3), // Timeout after 3 minutes if the build queue hangs or it takes too long for a slot to become available + wait_for_queue(queue_client.clone(), id), + ) + .await + { + Ok(_) => {} + Err(err) => return build_failed(&id, err), + } + + match queued + .handle( + storage_manager, + deployment_updater, + log_recorder, + secret_recorder, + ) + .await + { + Ok(built) => { + remove_from_queue(queue_client, id).await; + promote_to_run(built, run_send_cloned).await + } + Err(err) => { + remove_from_queue(queue_client, id).await; + build_failed(&id, err) + } + } } + .instrument(span) + .await + }); + }, + Some(res) = tasks.join_next() => { + match res { + Ok(_) => (), + Err(err) => error!(error = %err, "an error happened while joining a builder task"), } } - .instrument(span) - .await - }); + else => break + } } } diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index 09ff7ef307..eba5e6f045 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -18,7 +18,10 @@ use shuttle_proto::runtime::{ runtime_client::RuntimeClient, LoadRequest, StartRequest, StopReason, SubscribeStopRequest, SubscribeStopResponse, }; -use tokio::sync::Mutex; +use tokio::{ + sync::Mutex, + task::{JoinHandle, JoinSet}, +}; use tonic::{transport::Channel, Code}; use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -44,73 +47,92 @@ pub async fn task( ) { info!("Run task started"); - while let Some(built) = recv.recv().await { - let id = built.id; - - info!("Built deployment at the front of run queue: {id}"); - - let deployment_updater = deployment_updater.clone(); - let secret_getter = secret_getter.clone(); - let resource_manager = resource_manager.clone(); - let storage_manager = storage_manager.clone(); - - let old_deployments_killer = kill_old_deployments( - built.service_id, - id, - active_deployment_getter.clone(), - runtime_manager.clone(), - ); - let cleanup = move |response: Option| { - debug!(response = ?response, "stop client response: "); - - if let Some(response) = response { - match StopReason::from_i32(response.reason).unwrap_or_default() { - StopReason::Request => stopped_cleanup(&id), - StopReason::End => completed_cleanup(&id), - StopReason::Crash => crashed_cleanup( - &id, - Error::Run(anyhow::Error::msg(response.message).into()), - ), - } - } else { - crashed_cleanup( - &id, - Error::Runtime(anyhow::anyhow!( - "stop subscribe channel stopped unexpectedly" - )), - ) - } - }; - let runtime_manager = runtime_manager.clone(); + let mut set = JoinSet::new(); - tokio::spawn(async move { - let parent_cx = global::get_text_map_propagator(|propagator| { - propagator.extract(&built.tracing_context) - }); - let span = debug_span!("runner"); - span.set_parent(parent_cx); - - async move { - if let Err(err) = built - .handle( - storage_manager, - secret_getter, - resource_manager, - runtime_manager, - deployment_updater, - old_deployments_killer, - cleanup, - ) + loop { + tokio::select! { + Some(built) = recv.recv() => { + let id = built.id; + + info!("Built deployment at the front of run queue: {id}"); + + let deployment_updater = deployment_updater.clone(); + let secret_getter = secret_getter.clone(); + let resource_manager = resource_manager.clone(); + let storage_manager = storage_manager.clone(); + + let old_deployments_killer = kill_old_deployments( + built.service_id, + id, + active_deployment_getter.clone(), + runtime_manager.clone(), + ); + let cleanup = move |response: Option| { + debug!(response = ?response, "stop client response: "); + + if let Some(response) = response { + match StopReason::from_i32(response.reason).unwrap_or_default() { + StopReason::Request => stopped_cleanup(&id), + StopReason::End => completed_cleanup(&id), + StopReason::Crash => crashed_cleanup( + &id, + Error::Run(anyhow::Error::msg(response.message).into()), + ), + } + } else { + crashed_cleanup( + &id, + Error::Runtime(anyhow::anyhow!( + "stop subscribe channel stopped unexpectedly" + )), + ) + } + }; + let runtime_manager = runtime_manager.clone(); + + set.spawn(async move { + let parent_cx = global::get_text_map_propagator(|propagator| { + propagator.extract(&built.tracing_context) + }); + let span = debug_span!("runner"); + span.set_parent(parent_cx); + + async move { + match built + .handle( + storage_manager, + secret_getter, + resource_manager, + runtime_manager, + deployment_updater, + old_deployments_killer, + cleanup, + ) + .await + { + Ok(handle) => handle + .await + .expect("the call to run in built.handle to be done"), + Err(err) => start_crashed_cleanup(&id, err), + }; + + info!("deployment done"); + } + .instrument(span) .await - { - start_crashed_cleanup(&id, err) + }); + }, + Some(res) = set.join_next() => { + match res { + Ok(_) => (), + Err(err) => { + error!(error = %err, "an error happened while joining a deployment run task") + } } - info!("deployment done"); } - .instrument(span) - .await - }); + else => break + } } } @@ -199,7 +221,7 @@ impl Built { deployment_updater: impl DeploymentUpdater, kill_old_deployments: impl futures::Future>, cleanup: impl FnOnce(Option) + Send + 'static, - ) -> Result<()> { + ) -> Result> { // For alpha this is the path to the users project with an embedded runtime. // For shuttle-next this is the path to the compiled .wasm file, which will be // used in the load request. @@ -244,7 +266,7 @@ impl Built { ) .await?; - tokio::spawn(run( + let handler = tokio::spawn(run( self.id, self.service_name, runtime_client, @@ -253,7 +275,7 @@ impl Built { cleanup, )); - Ok(()) + Ok(handler) } } diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index 0d1eadfa43..0448f65778 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -29,6 +29,7 @@ pub async fn start( runtime_manager: Arc>, args: Args, ) { + // when _set is dropped once axum exits, the deployment tasks will be aborted. let deployment_manager = DeploymentManager::builder() .build_log_recorder(persistence.clone()) .secret_recorder(persistence.clone())