Skip to content

Commit

Permalink
Attempt at fixing sporadic failures of shuttle-deployer (shuttle-hq…
Browse files Browse the repository at this point in the history
…#980)

* feat(deployer): use joinset to await builder tasks on shutdown

* feat(deployer): use joinset for the DeploymentManager as well

* test(deployment): when testing tests, abort early when different

* test(deployment): use test_states in deployment_to_be_queued

Instead of relying on a one second sleep.

* fix(deployment): properly await spawned tasks

* ref(deployer): move join set of DeploymentManager into struct

* fix(deployer): use tokio::select! to await tasks set in deploy/run queue
  • Loading branch information
Kazy authored and AlphaKeks committed Jul 21, 2023
1 parent e6fd6a0 commit 416a808
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 133 deletions.
41 changes: 25 additions & 16 deletions deployer/src/deployment/deploy_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,14 @@ mod tests {
async fn test_states(id: &Uuid, expected_states: Vec<StateLog>) {
loop {
let states = RECORDER.lock().unwrap().get_deployment_states(id);
if states == expected_states {
return;
}

if *states == expected_states {
break;
for (actual, expected) in states.iter().zip(&expected_states) {
if actual != expected {
return;
}
}

sleep(Duration::from_millis(250)).await;
Expand All @@ -585,7 +590,7 @@ mod tests {

#[tokio::test(flavor = "multi_thread")]
async fn deployment_to_be_queued() {
let deployment_manager = get_deployment_manager().await;
let deployment_manager = get_deployment_manager();

let queued = get_queue("sleep-async");
let id = queued.id;
Expand Down Expand Up @@ -628,12 +633,8 @@ mod tests {
// Send kill signal
deployment_manager.kill(id).await;

sleep(Duration::from_secs(1)).await;

let states = RECORDER.lock().unwrap().get_deployment_states(&id);

assert_eq!(
*states,
let test = test_states(
&id,
vec![
StateLog {
id,
Expand All @@ -659,13 +660,21 @@ mod tests {
id,
state: State::Stopped,
},
]
],
);

select! {
_ = sleep(Duration::from_secs(60)) => {
let states = RECORDER.lock().unwrap().get_deployment_states(&id);
panic!("states should go into 'Stopped' for a valid service: {:#?}", states);
},
_ = test => {}
};
}

#[tokio::test(flavor = "multi_thread")]
async fn deployment_self_stop() {
let deployment_manager = get_deployment_manager().await;
let deployment_manager = get_deployment_manager();

let queued = get_queue("self-stop");
let id = queued.id;
Expand Down Expand Up @@ -712,7 +721,7 @@ mod tests {

#[tokio::test(flavor = "multi_thread")]
async fn deployment_bind_panic() {
let deployment_manager = get_deployment_manager().await;
let deployment_manager = get_deployment_manager();

let queued = get_queue("bind-panic");
let id = queued.id;
Expand Down Expand Up @@ -759,7 +768,7 @@ mod tests {

#[tokio::test(flavor = "multi_thread")]
async fn deployment_main_panic() {
let deployment_manager = get_deployment_manager().await;
let deployment_manager = get_deployment_manager();

let queued = get_queue("main-panic");
let id = queued.id;
Expand Down Expand Up @@ -802,7 +811,7 @@ mod tests {

#[tokio::test]
async fn deployment_from_run() {
let deployment_manager = get_deployment_manager().await;
let deployment_manager = get_deployment_manager();

let id = Uuid::new_v4();
deployment_manager
Expand Down Expand Up @@ -845,7 +854,7 @@ mod tests {

#[tokio::test]
async fn scope_with_nil_id() {
let deployment_manager = get_deployment_manager().await;
let deployment_manager = get_deployment_manager();

let id = Uuid::nil();
deployment_manager
Expand All @@ -872,7 +881,7 @@ mod tests {
);
}

async fn get_deployment_manager() -> DeploymentManager {
fn get_deployment_manager() -> DeploymentManager {
DeploymentManager::builder()
.build_log_recorder(RECORDER.clone())
.secret_recorder(RECORDER.clone())
Expand Down
12 changes: 9 additions & 3 deletions deployer/src/deployment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use crate::{
persistence::{DeploymentUpdater, ResourceManager, SecretGetter, SecretRecorder, State},
RuntimeManager,
};
use tokio::sync::{mpsc, Mutex};
use tokio::{
sync::{mpsc, Mutex},
task::JoinSet,
};
use uuid::Uuid;

use self::{deploy_layer::LogRecorder, gateway_client::BuildQueueClient};
Expand Down Expand Up @@ -125,8 +128,9 @@ where
let storage_manager = ArtifactsStorageManager::new(artifacts_path);

let run_send_clone = run_send.clone();
let mut set = JoinSet::new();

tokio::spawn(queue::task(
set.spawn(queue::task(
queue_recv,
run_send_clone,
deployment_updater.clone(),
Expand All @@ -135,7 +139,7 @@ where
storage_manager.clone(),
queue_client,
));
tokio::spawn(run::task(
set.spawn(run::task(
run_recv,
runtime_manager.clone(),
deployment_updater,
Expand All @@ -150,6 +154,7 @@ where
run_send,
runtime_manager,
storage_manager,
_join_set: Arc::new(Mutex::new(set)),
}
}
}
Expand All @@ -160,6 +165,7 @@ pub struct DeploymentManager {
run_send: RunSender,
runtime_manager: Arc<Mutex<RuntimeManager>>,
storage_manager: ArtifactsStorageManager,
_join_set: Arc<Mutex<JoinSet<()>>>,
}

/// ```no-test
Expand Down
110 changes: 62 additions & 48 deletions deployer/src/deployment/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use opentelemetry::global;
use serde_json::json;
use shuttle_common::claims::Claim;
use shuttle_service::builder::{build_workspace, BuiltService};
use tokio::task::JoinSet;
use tokio::time::{sleep, timeout};
use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
Expand Down Expand Up @@ -40,58 +41,71 @@ pub async fn task(
) {
info!("Queue task started");

while let Some(queued) = recv.recv().await {
let id = queued.id;

info!("Queued deployment at the front of the queue: {id}");

let deployment_updater = deployment_updater.clone();
let run_send_cloned = run_send.clone();
let log_recorder = log_recorder.clone();
let secret_recorder = secret_recorder.clone();
let storage_manager = storage_manager.clone();
let queue_client = queue_client.clone();

tokio::spawn(async move {
let parent_cx = global::get_text_map_propagator(|propagator| {
propagator.extract(&queued.tracing_context)
});
let span = debug_span!("builder");
span.set_parent(parent_cx);

async move {
match timeout(
Duration::from_secs(60 * 3), // Timeout after 3 minutes if the build queue hangs or it takes too long for a slot to become available
wait_for_queue(queue_client.clone(), id),
)
.await
{
Ok(_) => {}
Err(err) => return build_failed(&id, err),
}
let mut tasks = JoinSet::new();

match queued
.handle(
storage_manager,
deployment_updater,
log_recorder,
secret_recorder,
)
.await
{
Ok(built) => {
remove_from_queue(queue_client, id).await;
promote_to_run(built, run_send_cloned).await
}
Err(err) => {
remove_from_queue(queue_client, id).await;
build_failed(&id, err)
loop {
tokio::select! {
Some(queued) = recv.recv() => {
let id = queued.id;

info!("Queued deployment at the front of the queue: {id}");

let deployment_updater = deployment_updater.clone();
let run_send_cloned = run_send.clone();
let log_recorder = log_recorder.clone();
let secret_recorder = secret_recorder.clone();
let storage_manager = storage_manager.clone();
let queue_client = queue_client.clone();

tasks.spawn(async move {
let parent_cx = global::get_text_map_propagator(|propagator| {
propagator.extract(&queued.tracing_context)
});
let span = debug_span!("builder");
span.set_parent(parent_cx);

async move {
match timeout(
Duration::from_secs(60 * 3), // Timeout after 3 minutes if the build queue hangs or it takes too long for a slot to become available
wait_for_queue(queue_client.clone(), id),
)
.await
{
Ok(_) => {}
Err(err) => return build_failed(&id, err),
}

match queued
.handle(
storage_manager,
deployment_updater,
log_recorder,
secret_recorder,
)
.await
{
Ok(built) => {
remove_from_queue(queue_client, id).await;
promote_to_run(built, run_send_cloned).await
}
Err(err) => {
remove_from_queue(queue_client, id).await;
build_failed(&id, err)
}
}
}
.instrument(span)
.await
});
},
Some(res) = tasks.join_next() => {
match res {
Ok(_) => (),
Err(err) => error!(error = %err, "an error happened while joining a builder task"),
}
}
.instrument(span)
.await
});
else => break
}
}
}

Expand Down
Loading

0 comments on commit 416a808

Please sign in to comment.