Skip to content

Commit

Permalink
Add support for keep alive for workers
Browse files Browse the repository at this point in the history
The scheduler will now support ability to evict workers if they
timeout. The WorkerApiServer will now refresh worker timeouts based
on keep_alive request being sent from the worker. If the worker was
evicted and was executing a job the job will now be rescheduled.
  • Loading branch information
allada committed Apr 12, 2022
1 parent e8a349c commit be6f2ee
Show file tree
Hide file tree
Showing 11 changed files with 618 additions and 92 deletions.
2 changes: 2 additions & 0 deletions cas/grpc_service/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ rust_library(
"//third_party:futures",
"//third_party:rand",
"//third_party:stdext",
"//third_party:tokio",
"//third_party:tokio_stream",
"//third_party:tonic",
"//util:common",
Expand Down Expand Up @@ -126,6 +127,7 @@ rust_test(
deps = [
"//cas/scheduler",
"//cas/scheduler:platform_property_manager",
"//cas/scheduler:worker",
"//proto",
"//third_party:pretty_assertions",
"//third_party:tokio",
Expand Down
34 changes: 31 additions & 3 deletions cas/grpc_service/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use futures::{Stream, StreamExt};
use rand::{thread_rng, Rng};
use tokio::time::interval;
use tokio_stream::wrappers::WatchStream;
use tonic::{Request, Response, Status};

Expand All @@ -27,8 +28,11 @@ use store::{Store, StoreManager};
/// Default priority remote execution jobs will get when not provided.
const DEFAULT_EXECUTION_PRIORITY: i64 = 0;

/// Default timeout for workers in seconds.
const DEFAULT_WORKER_TIMEOUT_S: u64 = 5;

struct InstanceInfo {
scheduler: Scheduler,
scheduler: Arc<Scheduler>,
cas_store: Arc<dyn Store>,
platform_property_manager: PlatformPropertyManager,
}
Expand Down Expand Up @@ -120,14 +124,38 @@ impl ExecutionServer {
.clone()
.unwrap_or(HashMap::new()),
);
let mut worker_timeout_s = exec_cfg.worker_timeout_s;
if worker_timeout_s == 0 {
worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S;
}
let scheduler = Arc::new(Scheduler::new(worker_timeout_s));
let weak_scheduler = Arc::downgrade(&scheduler);
instance_infos.insert(
instance_name.to_string(),
InstanceInfo {
scheduler: Scheduler::new(),
scheduler,
cas_store,
platform_property_manager,
},
);
tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(1));
loop {
ticker.tick().await;
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Error: system time is now behind unix epoch");
match weak_scheduler.upgrade() {
Some(scheduler) => {
if let Err(e) = scheduler.remove_timedout_workers(timestamp.as_secs()).await {
log::error!("Error while running remove_timedout_workers : {:?}", e);
}
}
// If we fail to upgrade, our service is probably destroyed, so return.
None => return,
}
}
});
}
Ok(Self { instance_infos })
}
Expand Down
208 changes: 176 additions & 32 deletions cas/grpc_service/tests/worker_api_server_test.rs
Original file line number Diff line number Diff line change
@@ -1,56 +1,200 @@
// Copyright 2022 Nathan (Blaise) Bruer. All rights reserved.

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use tokio_stream::StreamExt;
use tonic::Request;

use error::ResultExt;
use error::{Error, ResultExt};
use platform_property_manager::PlatformPropertyManager;
use proto::com::github::allada::turbo_cache::remote_execution::{
update_for_worker, worker_api_server::WorkerApi, SupportedProperties,
update_for_worker, worker_api_server::WorkerApi, KeepAliveRequest, SupportedProperties,
};
use scheduler::Scheduler;
use worker_api_server::WorkerApiServer;
use worker::WorkerId;
use worker_api_server::{ConnectWorkerStream, NowFn, WorkerApiServer};

const BASE_NOW_S: u64 = 10;
const BASE_WORKER_TIMEOUT_S: u64 = 100;

struct TestContext {
scheduler: Arc<Scheduler>,
worker_api_server: WorkerApiServer,
connection_worker_stream: ConnectWorkerStream,
worker_id: WorkerId,
}

fn static_now_fn() -> Result<Duration, Error> {
Ok(Duration::from_secs(BASE_NOW_S))
}

async fn setup_api_server(worker_timeout: u64, now_fn: NowFn) -> Result<TestContext, Error> {
let platform_properties = HashMap::new();
let scheduler = Arc::new(Scheduler::new(worker_timeout));
let worker_api_server = WorkerApiServer::new_with_now_fn(
Arc::new(PlatformPropertyManager::new(platform_properties)),
scheduler.clone(),
now_fn,
);

let supported_properties = SupportedProperties::default();
let mut connection_worker_stream = worker_api_server
.connect_worker(Request::new(supported_properties))
.await?
.into_inner();

let maybe_first_message = connection_worker_stream.next().await;
assert!(maybe_first_message.is_some(), "Expected first message from stream");
let first_update = maybe_first_message
.unwrap()
.err_tip(|| "Expected success result")?
.update
.err_tip(|| "Expected update field to be populated")?;
let worker_id = match first_update {
update_for_worker::Update::ConnectionResult(connection_result) => connection_result.worker_id,
other => unreachable!("Expected ConnectionResult, got {:?}", other),
};

const UUID_SIZE: usize = 36;
assert_eq!(worker_id.len(), UUID_SIZE, "Worker ID should be 36 characters");

Ok(TestContext {
scheduler,
worker_api_server,
connection_worker_stream,
worker_id: worker_id.try_into()?,
})
}

#[cfg(test)]
pub mod connect_worker_tests {
use super::*;
use pretty_assertions::assert_eq; // Must be declared in every module.

#[tokio::test]
pub async fn connect_worker_adds_worker_to_scheduler_test() -> Result<(), Box<dyn std::error::Error>> {
let platform_properties = HashMap::new();
let scheduler = Arc::new(Scheduler::new());
let worker_api_server = WorkerApiServer::new(
Arc::new(PlatformPropertyManager::new(platform_properties)),
scheduler.clone(),
);

let supported_properties = SupportedProperties::default();
let mut update_for_worker_stream = worker_api_server
.connect_worker(Request::new(supported_properties))
.await?
.into_inner();

let maybe_first_message = update_for_worker_stream.next().await;
assert!(maybe_first_message.is_some(), "Expected first message from stream");
let first_update = maybe_first_message
.unwrap()
.err_tip(|| "Expected success result")?
.update
.err_tip(|| "Expected update field to be populated")?;
let worker_id = match first_update {
update_for_worker::Update::ConnectionResult(connection_result) => connection_result.worker_id,
other => unreachable!("Expected ConnectionResult, got {:?}", other),
let test_context = setup_api_server(BASE_WORKER_TIMEOUT_S, Box::new(static_now_fn)).await?;

let worker_exists = test_context
.scheduler
.contains_worker_for_test(&test_context.worker_id)
.await;
assert!(worker_exists, "Expected worker to exist in worker map");

Ok(())
}
}

#[cfg(test)]
pub mod keep_alive_tests {
use super::*;
use pretty_assertions::assert_eq; // Must be declared in every module.

#[tokio::test]
pub async fn server_times_out_workers_test() -> Result<(), Box<dyn std::error::Error>> {
let test_context = setup_api_server(BASE_WORKER_TIMEOUT_S, Box::new(static_now_fn)).await?;

let mut now_timestamp = BASE_NOW_S;
{
// Now change time to 1 second before timeout and ensure the worker is still in the pool.
now_timestamp += BASE_WORKER_TIMEOUT_S - 1;
test_context.scheduler.remove_timedout_workers(now_timestamp).await?;
let worker_exists = test_context
.scheduler
.contains_worker_for_test(&test_context.worker_id)
.await;
assert!(worker_exists, "Expected worker to exist in worker map");
}
{
// Now add 1 second and our worker should have been evicted due to timeout.
now_timestamp += 1;
test_context.scheduler.remove_timedout_workers(now_timestamp).await?;
let worker_exists = test_context
.scheduler
.contains_worker_for_test(&test_context.worker_id)
.await;
assert!(!worker_exists, "Expected worker to not exist in map");
}

Ok(())
}

#[tokio::test]
pub async fn server_does_not_timeout_if_keep_alive_test() -> Result<(), Box<dyn std::error::Error>> {
let now_timestamp = Arc::new(Mutex::new(BASE_NOW_S));
let now_timestamp_clone = now_timestamp.clone();
let add_and_return_timestamp = move |add_amount: u64| -> u64 {
let mut locked_now_timestamp = now_timestamp.lock().unwrap();
*locked_now_timestamp += add_amount;
*locked_now_timestamp
};

const UUID_SIZE: usize = 36;
assert_eq!(worker_id.len(), UUID_SIZE, "Worker ID should be 36 characters");
let test_context = setup_api_server(
BASE_WORKER_TIMEOUT_S,
Box::new(move || Ok(Duration::from_secs(*now_timestamp_clone.lock().unwrap()))),
)
.await?;
{
// Now change time to 1 second before timeout and ensure the worker is still in the pool.
let timestamp = add_and_return_timestamp(BASE_WORKER_TIMEOUT_S - 1);
test_context.scheduler.remove_timedout_workers(timestamp).await?;
let worker_exists = test_context
.scheduler
.contains_worker_for_test(&test_context.worker_id)
.await;
assert!(worker_exists, "Expected worker to exist in worker map");
}
{
// Now send keep alive.
test_context
.worker_api_server
.keep_alive(Request::new(KeepAliveRequest {
worker_id: test_context.worker_id.to_string(),
}))
.await
.err_tip(|| "Error sending keep alive")?;
}
{
// Now add 1 second and our worker should still exist in our map.
let timestamp = add_and_return_timestamp(1);
test_context.scheduler.remove_timedout_workers(timestamp).await?;
let worker_exists = test_context
.scheduler
.contains_worker_for_test(&test_context.worker_id)
.await;
assert!(worker_exists, "Expected worker to exist in map");
}

let worker_exists = scheduler.contains_worker_for_test(&worker_id.try_into()?).await;
assert!(worker_exists, "Expected worker to exist in worker map");
Ok(())
}

#[tokio::test]
pub async fn worker_receives_keep_alive_request_test() -> Result<(), Box<dyn std::error::Error>> {
let mut test_context = setup_api_server(BASE_WORKER_TIMEOUT_S, Box::new(static_now_fn)).await?;

// Send keep alive to client.
test_context
.scheduler
.send_keep_alive_to_worker_for_test(&test_context.worker_id)
.await
.err_tip(|| "Could not send keep alive to worker")?;

{
// Read stream and ensure it was a keep alive message.
let maybe_message = test_context.connection_worker_stream.next().await;
assert!(maybe_message.is_some(), "Expected next message in stream to exist");
let update_message = maybe_message
.unwrap()
.err_tip(|| "Expected success result")?
.update
.err_tip(|| "Expected update field to be populated")?;
assert_eq!(
update_message,
update_for_worker::Update::KeepAlive(()),
"Expected KeepAlive message"
);
}

Ok(())
}
Expand Down
Loading

0 comments on commit be6f2ee

Please sign in to comment.