Skip to content

Commit

Permalink
feat(cubestore): readiness and liveness probes
Browse files Browse the repository at this point in the history
Available only on router nodes via `/livez` and `/readyz`.
Use `CUBESTORE_STATUS_BIND_ADDR` or `CUBESTORE_STATUS_PORT` for
endpoint configuration. Default is `0.0.0.0:3031`.

Runs as a separate HTTP server to simplify implementation. Main HTTP
server does waits for metastore from remote before it starts listening
on a socket.

The checks are very basic. Liveness check tries to execute a trivial
query. Readiness check after metastore got loaded from remote and
can serve another trivial query.
  • Loading branch information
ilya-biryukov committed Aug 25, 2021
1 parent 4283c73 commit 888b0f1
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 42 deletions.
8 changes: 6 additions & 2 deletions rust/cubestore/src/bin/cubestored.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use cubestore::app_metrics;
use cubestore::config::{Config, CubeServices};
use cubestore::http::status::serve_status_probes;
use cubestore::telemetry::track_event;
use cubestore::util::logger::init_cube_logger;
use cubestore::util::metrics::init_metrics;
Expand Down Expand Up @@ -38,9 +39,12 @@ fn main() {
cubestore::util::respawn::init();

let runtime = Builder::new_multi_thread().enable_all().build().unwrap();

runtime.block_on(async move {
let services = config.configure().await;
config.configure_injector().await;

serve_status_probes(&config);

let services = config.cube_services().await;

track_event("Cube Store Start".to_string(), HashMap::new()).await;

Expand Down
3 changes: 2 additions & 1 deletion rust/cubestore/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::ack_error;
use crate::cluster::message::NetworkMessage;
use crate::cluster::transport::{ClusterTransport, MetaStoreTransport, WorkerConnection};
use crate::config::injection::DIService;
use crate::config::is_router;
#[allow(unused_imports)]
use crate::config::{Config, ConfigObj};
use crate::import::ImportService;
Expand Down Expand Up @@ -654,7 +655,7 @@ impl ClusterImpl {
}

pub fn is_select_worker(&self) -> bool {
self.config_obj.worker_bind_address().is_some()
!is_router(self.config_obj.as_ref())
}

pub async fn wait_for_worker_to_close(&self) {
Expand Down
80 changes: 46 additions & 34 deletions rust/cubestore/src/config/injection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ use std::future::Future;
use std::pin::Pin;
use std::raw::TraitObject;
use std::sync::{Arc, Weak};
use tokio::sync::Mutex;
use tokio::sync::RwLock;

pub struct Injector {
this: Weak<Injector>,
services: RwLock<HashMap<String, Arc<RwLock<Option<Arc<dyn DIService>>>>>>,
init_guards: RwLock<HashMap<String, Arc<Mutex<()>>>>,
services: RwLock<HashMap<String, Arc<dyn DIService>>>,
factories: RwLock<
HashMap<
String,
Expand All @@ -26,6 +28,7 @@ impl Injector {
pub fn new() -> Arc<Self> {
Arc::new_cyclic(|this| Self {
this: this.clone(),
init_guards: RwLock::new(HashMap::new()),
services: RwLock::new(HashMap::new()),
factories: RwLock::new(HashMap::new()),
})
Expand Down Expand Up @@ -61,10 +64,10 @@ impl Injector {
})
}),
);
self.services
self.init_guards
.write()
.await
.insert(name, Arc::new(RwLock::new(None)));
.insert(name, Arc::new(Mutex::new(())));
}

pub async fn register<F>(
Expand All @@ -81,57 +84,66 @@ impl Injector {
Box::pin(async move { fn_to_move(i.clone()).await })
}),
);
self.services
self.init_guards
.write()
.await
.insert(name.to_string(), Arc::new(RwLock::new(None)));
.insert(name.to_string(), Arc::new(Mutex::new(())));
}
}

impl Injector {
pub async fn get_service<T: ?Sized + Send + Sync + 'static>(&self, name: &str) -> Arc<T> {
if self
.services
if let Some(s) = self.try_get_service(name).await {
return s;
}

let pending = self
.init_guards
.read()
.await
.get(name)
.expect(&format!("Service is not found: {}", name))
.read()
.await
.is_none()
{
let service_opt_lock = {
let map_lock = self.services.read().await;
map_lock.get(name).unwrap().clone()
};
// println!("Locking service: {}", name);
// TODO cycle depends lead to dead lock here
let mut service_opt = service_opt_lock.write().await;
if service_opt.is_none() {
let factories = self.factories.read().await;
let factory = factories
.get(name)
.expect(&format!("Service not found: {}", name));
let service = factory(self.this.upgrade().unwrap()).await;
// println!("Setting service: {}", name);
*service_opt = Some(service);
}
.clone();
// println!("Locking service: {}", name);
// TODO cycle depends lead to dead lock here
let _l = pending.lock().await;

if let Some(s) = self.try_get_service(name).await {
return s;
}
let map_lock = self.services.read().await;
let opt_lock = map_lock.get(name).unwrap();
let arc = opt_lock

let factories = self.factories.read().await;
let factory = factories
.get(name)
.expect(&format!("Service not found: {}", name));
let service = factory(self.this.upgrade().unwrap()).await;
// println!("Setting service: {}", name);
self.services
.write()
.await
.insert(name.to_string(), service.clone());
return service.clone().downcast(service).unwrap();
}

pub async fn try_get_service<T: ?Sized + Send + Sync + 'static>(
&self,
name: &str,
) -> Option<Arc<T>> {
self.services
.read()
.await
.as_ref()
.expect("Unexpected state")
.clone();
arc.downcast::<T>(arc.clone()).unwrap()
.get(name)
.map(|s| s.clone().downcast(s.clone()).unwrap())
}

pub async fn get_service_typed<T: ?Sized + Send + Sync + 'static>(&self) -> Arc<T> {
self.get_service(type_name::<T>()).await
}

pub async fn try_get_service_typed<T: ?Sized + Send + Sync + 'static>(&self) -> Option<Arc<T>> {
self.try_get_service(type_name::<T>()).await
}

pub async fn has_service<T: ?Sized + Send + Sync + 'static>(&self, name: &str) -> bool {
self.factories.read().await.contains_key(name)
}
Expand Down
25 changes: 20 additions & 5 deletions rust/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ pub trait ConfigObj: DIService {

fn bind_address(&self) -> &Option<String>;

fn status_bind_address(&self) -> &Option<String>;

fn http_bind_address(&self) -> &Option<String>;

fn query_timeout(&self) -> u64;
Expand Down Expand Up @@ -255,6 +257,7 @@ pub struct ConfigObjImpl {
pub select_worker_pool_size: usize,
pub job_runners_count: usize,
pub bind_address: Option<String>,
pub status_bind_address: Option<String>,
pub http_bind_address: Option<String>,
pub query_timeout: u64,
/// Must be set to 2*query_timeout in prod, only for overrides in tests.
Expand Down Expand Up @@ -307,6 +310,10 @@ impl ConfigObj for ConfigObjImpl {
&self.bind_address
}

fn status_bind_address(&self) -> &Option<String> {
&self.status_bind_address
}

fn http_bind_address(&self) -> &Option<String> {
&self.http_bind_address
}
Expand Down Expand Up @@ -460,6 +467,9 @@ impl Config {
.ok()
.unwrap_or(format!("0.0.0.0:{}", env_parse("CUBESTORE_PORT", 3306))),
),
status_bind_address: Some(env::var("CUBESTORE_STATUS_BIND_ADDR").ok().unwrap_or(
format!("0.0.0.0:{}", env_parse("CUBESTORE_STATUS_PORT", 3031)),
)),
http_bind_address: Some(env::var("CUBESTORE_HTTP_BIND_ADDR").ok().unwrap_or(
format!("0.0.0.0:{}", env_parse("CUBESTORE_HTTP_PORT", 3030)),
)),
Expand Down Expand Up @@ -513,6 +523,7 @@ impl Config {
select_worker_pool_size: 0,
job_runners_count: 4,
bind_address: None,
status_bind_address: None,
http_bind_address: None,
query_timeout,
not_used_timeout: 2 * query_timeout,
Expand Down Expand Up @@ -728,11 +739,7 @@ impl Config {
.await;
}

if self
.injector
.has_service_typed::<dyn MetaStoreTransport>()
.await
{
if uses_remote_metastore(&self.injector).await {
self.injector
.register_typed::<dyn MetaStore, _, _, _>(async move |i| {
let transport = ClusterMetaStoreClient::new(i.get_service_typed().await);
Expand Down Expand Up @@ -956,3 +963,11 @@ impl Config {
}

type LoopHandle = JoinHandle<Result<(), CubeError>>;

pub async fn uses_remote_metastore(i: &Injector) -> bool {
i.has_service_typed::<dyn MetaStoreTransport>().await
}

pub fn is_router(c: &dyn ConfigObj) -> bool {
!c.worker_bind_address().is_some()
}
2 changes: 2 additions & 0 deletions rust/cubestore/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod status;

use std::sync::Arc;

use warp::{Filter, Rejection, Reply};
Expand Down
96 changes: 96 additions & 0 deletions rust/cubestore/src/http/status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use crate::config::injection::Injector;
use crate::config::{is_router, uses_remote_metastore, Config};
use crate::metastore::MetaStore;
use crate::sql::SqlService;
use crate::CubeError;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use warp::http::StatusCode;
use warp::Filter;

pub fn serve_status_probes(c: &Config) {
let addr = match c.config_obj().status_bind_address() {
Some(a) => a.clone(),
None => return,
};

let p = match RouterProbes::try_new(c) {
Some(p) => p,
None => return,
};

let pc = p.clone();
let l = warp::path!("livez").and_then(move || {
let pc = pc.clone();
async move { status_probe_reply("liveness", pc.is_live().await) }
});
let r = warp::path!("readyz").and_then(move || {
let p = p.clone();
async move { status_probe_reply("readiness", p.is_ready().await) }
});

let addr: SocketAddr = addr.parse().expect("cannot parse status probe address");
match warp::serve(l.or(r)).try_bind_ephemeral(addr) {
Ok((addr, f)) => {
log::info!("Serving status probes at {}", addr);
tokio::spawn(f);
}
Err(e) => {
log::error!("Failed to serve status probes at {}: {}", addr, e);
}
}
}

pub fn status_probe_reply(probe: &str, r: Result<(), CubeError>) -> Result<StatusCode, Infallible> {
match r {
Ok(()) => Ok(StatusCode::OK),
Err(e) => {
log::warn!("{} probe failed: {}", probe, e.display_with_backtrace());
Ok(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}

#[derive(Clone)]
struct RouterProbes {
services: Arc<Injector>,
}

impl RouterProbes {
pub fn try_new(config: &Config) -> Option<RouterProbes> {
if !is_router(config.config_obj().as_ref()) {
return None;
}
Some(RouterProbes {
services: config.injector(),
})
}

pub async fn is_live(&self) -> Result<(), CubeError> {
if let Some(s) = self
.services
.try_get_service_typed::<dyn SqlService>()
.await
{
s.exec_query("SELECT 1").await?;
}
Ok(())
}

pub async fn is_ready(&self) -> Result<(), CubeError> {
if uses_remote_metastore(&self.services).await {
return Ok(());
}
let m = match self.services.try_get_service_typed::<dyn MetaStore>().await {
None => return Err(CubeError::internal("metastore is not ready".to_string())),
Some(m) => m,
};
// Check metastore is not stalled.
m.get_schemas().await?;
// It is tempting to check worker connectivity on the router, but we cannot do this now.
// Workers connect to the router for warmup, so router must be ready before workers are up.
// TODO: warmup explicitly with router request instead?
Ok(())
}
}
6 changes: 6 additions & 0 deletions rust/cubestore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,12 @@ impl From<tokio::sync::AcquireError> for CubeError {
}
}

impl From<warp::Error> for CubeError {
fn from(v: warp::Error) -> Self {
return CubeError::from_error(v);
}
}

impl Into<ArrowError> for CubeError {
fn into(self) -> ArrowError {
ArrowError::ExternalError(Box::new(self))
Expand Down

0 comments on commit 888b0f1

Please sign in to comment.