Skip to content

Commit

Permalink
define pool health prometheus metrics, extact types and re-use in api…
Browse files Browse the repository at this point in the history
… module;
  • Loading branch information
greenhat committed Jun 5, 2023
1 parent 9185323 commit bcb2605
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 48 deletions.
29 changes: 5 additions & 24 deletions core/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::SocketAddr;
use std::sync::Arc;

use crate::box_kind::{OracleBoxWrapper, PoolBox};
use crate::monitor::check_pool_health;
use crate::node_interface::node_api::NodeApi;
use crate::oracle_config::ORACLE_CONFIG;
use crate::oracle_state::{DataSourceError, LocalDatapointState, OraclePool};
Expand Down Expand Up @@ -238,31 +239,10 @@ async fn pool_health(oracle_pool: Arc<OraclePool>) -> Result<Json<serde_json::Va
Ok(Json(json))
}
fn pool_health_sync(oracle_pool: Arc<OraclePool>) -> Result<serde_json::Value, ApiError> {
let pool_conf = &POOL_CONFIG;
let node_api = NodeApi::new(ORACLE_CONFIG.node_api_key.clone(), &ORACLE_CONFIG.node_url);
let current_height = node_api.node.current_block_height()? as u32;
let pool_box_height = oracle_pool
.get_pool_box_source()
.get_pool_box()?
.get_box()
.creation_height;
let epoch_length = pool_conf
.refresh_box_wrapper_inputs
.contract_inputs
.contract_parameters()
.epoch_length()
.0 as u32;
let check_details = json!({
"pool_box_height": pool_box_height,
"current_block_height": current_height,
"epoch_length": epoch_length,
});
let is_healthy = pool_box_height >= current_height - epoch_length;
let json = json!({
"status": if is_healthy { "OK" } else { "DOWN" },
"details": check_details,
});
Ok(json)
let current_height = (node_api.node.current_block_height()? as u32).into();
let pool_health = check_pool_health(oracle_pool, current_height)?;
Ok(serde_json::to_value(pool_health).unwrap())
}

pub async fn start_rest_server(
Expand Down Expand Up @@ -292,6 +272,7 @@ pub async fn start_rest_server(
.allow_methods([axum::http::Method::GET]),
);
let addr = SocketAddr::from(([0, 0, 0, 0], api_port));
log::info!("Starting REST server on {}", addr);
axum::Server::try_bind(&addr)?
.serve(app.into_make_service())
.await?;
Expand Down
17 changes: 7 additions & 10 deletions core/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod explorer_api;
mod logging;
mod metrics;
mod migrate;
mod monitor;
mod node_interface;
mod oracle_config;
mod oracle_state;
Expand Down Expand Up @@ -64,6 +65,7 @@ use ergo_lib::ergotree_ir::chain::token::TokenId;
use log::error;
use log::LevelFilter;
use metrics::start_metrics_server;
use metrics::update_metrics;
use node_interface::assert_wallet_unlocked;
use node_interface::node_api::NodeApi;
use oracle_config::ORACLE_CONFIG;
Expand All @@ -75,7 +77,6 @@ use pool_commands::refresh::RefreshActionError;
use pool_commands::PoolCommandError;
use pool_config::DEFAULT_POOL_CONFIG_FILE_NAME;
use pool_config::POOL_CONFIG;
use prometheus::Registry;
use scans::get_scans_file_path;
use scans::wait_for_node_rescan;
use spec_token::RewardTokenId;
Expand Down Expand Up @@ -325,7 +326,6 @@ fn main() {
let node_scan_registry =
NodeScanRegistry::ensure_node_registered_scans(&node_api, pool_config).unwrap();
let oracle_pool = Arc::new(OraclePool::new(&node_scan_registry).unwrap());
let metrics_registry = Arc::new(Registry::new());
let datapoint_source = RuntimeDataPointSource::new(
POOL_CONFIG.data_point_source,
ORACLE_CONFIG.data_point_source_custom_script.clone(),
Expand All @@ -346,23 +346,20 @@ fn main() {
});
}
if let Some(metrics_port) = ORACLE_CONFIG.metrics_port {
let metrics_registry_clone = metrics_registry.clone();
tokio_runtime.spawn(async move {
if let Err(e) = start_metrics_server(metrics_registry_clone, metrics_port).await
{
if let Err(e) = start_metrics_server(metrics_port).await {
error!("An error occurred while starting the metrics server: {}", e);
std::process::exit(exitcode::SOFTWARE);
}
});
}
loop {
if let Err(e) = main_loop_iteration(
&oracle_pool,
oracle_pool.clone(),
read_only,
&datapoint_source,
&node_api,
action_report_storage.clone(),
&metrics_registry,
) {
error!("error: {:?}", e);
}
Expand Down Expand Up @@ -492,12 +489,11 @@ fn handle_pool_command(command: Command, node_api: &NodeApi) {
}

fn main_loop_iteration(
oracle_pool: &OraclePool,
oracle_pool: Arc<OraclePool>,
read_only: bool,
datapoint_source: &RuntimeDataPointSource,
node_api: &NodeApi,
report_storage: Arc<RwLock<ActionReportStorage>>,
metrics_registry: &Registry,
) -> std::result::Result<(), anyhow::Error> {
if !node_api.node.wallet_status()?.unlocked {
return Err(anyhow!("Wallet is locked!"));
Expand Down Expand Up @@ -525,7 +521,7 @@ fn main_loop_iteration(
log::debug!("Height {height}. Building action for command: {:?}", cmd);
let build_action_tuple_res = build_action(
cmd,
oracle_pool,
&oracle_pool,
node_api,
height,
network_change_address.address(),
Expand All @@ -540,6 +536,7 @@ fn main_loop_iteration(
}
};
}
update_metrics(oracle_pool)?;
Ok(())
}

Expand Down
107 changes: 97 additions & 10 deletions core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,101 @@ use axum::response::Response;
use axum::routing::get;
use axum::Router;
use ergo_node_interface::scanning::NodeError;
use once_cell::sync::Lazy;
use prometheus::Encoder;
use prometheus::Registry;
use prometheus::IntGaugeVec;
use prometheus::Opts;
use prometheus::TextEncoder;
use reqwest::StatusCode;
use tower_http::cors::CorsLayer;

async fn serve_metrics(registry: Arc<Registry>) -> impl IntoResponse {
use crate::monitor::check_pool_health;
use crate::monitor::PoolHealth;
use crate::monitor::PoolStatus;
use crate::node_interface::node_api::NodeApi;
use crate::oracle_config::ORACLE_CONFIG;
use crate::oracle_state::OraclePool;

pub static LAST_POSTED_RATE: Lazy<IntGaugeVec> = Lazy::new(|| {
let m = IntGaugeVec::new(
Opts::new(
"ACTIVE_INDEXERS",
"Number of indexers actively crosschecking on the deployment (self excluded)",
)
.namespace("graphcast")
.subsystem("poi_radio"),
&["deployment"],
)
.expect("Failed to create ACTIVE_INDEXERS gauges");
prometheus::register(Box::new(m.clone())).expect("Failed to register ACTIVE_INDEXERS counter");
m
});

static POOL_BOX_HEIGHT: Lazy<IntGaugeVec> = Lazy::new(|| {
let m = IntGaugeVec::new(
Opts::new("pool_box_height", "The height of the pool box"),
&["pool"],
)
.unwrap();
prometheus::register(Box::new(m.clone())).expect("Failed to register");
m
});

static CURRENT_HEIGHT: Lazy<IntGaugeVec> = Lazy::new(|| {
let m = IntGaugeVec::new(Opts::new("current_height", "The current height"), &["pool"]).unwrap();
prometheus::register(Box::new(m.clone())).expect("Failed to register");
m
});

static EPOCH_LENGTH: Lazy<IntGaugeVec> = Lazy::new(|| {
let m = IntGaugeVec::new(Opts::new("epoch_length", "The epoch length"), &["pool"]).unwrap();
prometheus::register(Box::new(m.clone())).expect("Failed to register");
m
});

static POOL_STATUS: Lazy<IntGaugeVec> = Lazy::new(|| {
let m = IntGaugeVec::new(
Opts::new(
"pool_health_status",
"The health status of the pool, 1 for Ok and 0 for Down",
),
&["pool"],
)
.unwrap();
prometheus::register(Box::new(m.clone())).expect("Failed to register");
m
});

// Update your metrics
pub fn update_pool_health(pool_health: &PoolHealth) {
let pool_name = "pool";
POOL_BOX_HEIGHT
.with_label_values(&[pool_name])
.set(pool_health.details.pool_box_height.into());
CURRENT_HEIGHT
.with_label_values(&[pool_name])
.set(pool_health.details.current_height.into());
EPOCH_LENGTH
.with_label_values(&[pool_name])
.set(pool_health.details.epoch_length.into());

let status = match pool_health.status {
PoolStatus::Ok => 1,
PoolStatus::Down => 0,
};
POOL_STATUS.with_label_values(&[pool_name]).set(status);
}

pub fn update_metrics(oracle_pool: Arc<OraclePool>) -> Result<(), anyhow::Error> {
let node_api = NodeApi::new(ORACLE_CONFIG.node_api_key.clone(), &ORACLE_CONFIG.node_url);
let current_height = (node_api.node.current_block_height()? as u32).into();
let pool_health = check_pool_health(oracle_pool, current_height)?;
update_pool_health(&pool_health);
Ok(())
}

async fn serve_metrics() -> impl IntoResponse {
let registry = prometheus::default_registry();
let metric_families = registry.gather();
let mut buffer = vec![];
let encoder = TextEncoder::new();
Expand All @@ -26,15 +114,14 @@ async fn serve_metrics(registry: Arc<Registry>) -> impl IntoResponse {
.unwrap()
}

pub async fn start_metrics_server(reg: Arc<Registry>, port_num: u16) -> Result<(), anyhow::Error> {
let app = Router::new()
.route("/metrics", get(move || serve_metrics(reg.clone())))
.layer(
CorsLayer::new()
.allow_origin(tower_http::cors::Any)
.allow_methods([axum::http::Method::GET]),
);
pub async fn start_metrics_server(port_num: u16) -> Result<(), anyhow::Error> {
let app = Router::new().route("/metrics", get(serve_metrics)).layer(
CorsLayer::new()
.allow_origin(tower_http::cors::Any)
.allow_methods([axum::http::Method::GET]),
);
let addr = SocketAddr::from(([0, 0, 0, 0], port_num));
log::info!("Starting metrics server on {}", addr);
axum::Server::try_bind(&addr)?
.serve(app.into_make_service())
.await?;
Expand Down
61 changes: 61 additions & 0 deletions core/src/monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::sync::Arc;

use crate::box_kind::PoolBox;
use crate::oracle_state::OraclePool;
use crate::oracle_types::BlockHeight;
use crate::oracle_types::EpochLength;
use crate::pool_config::POOL_CONFIG;

#[derive(Debug, serde::Serialize)]
pub enum PoolStatus {
Ok,
Down,
}

#[derive(Debug, serde::Serialize)]
pub struct PoolHealthDetails {
pub pool_box_height: BlockHeight,
pub current_height: BlockHeight,
pub epoch_length: EpochLength,
}

#[derive(Debug, serde::Serialize)]
pub struct PoolHealth {
pub status: PoolStatus,
pub details: PoolHealthDetails,
}

pub fn check_pool_health(
oracle_pool: Arc<OraclePool>,
current_height: BlockHeight,
) -> Result<PoolHealth, anyhow::Error> {
let pool_conf = &POOL_CONFIG;
let pool_box_height = oracle_pool
.get_pool_box_source()
.get_pool_box()?
.get_box()
.creation_height
.into();
let epoch_length = pool_conf
.refresh_box_wrapper_inputs
.contract_inputs
.contract_parameters()
.epoch_length()
.0
.into();
let acceptable_pool_box_delay_blocks = 3;
let is_healthy =
pool_box_height >= current_height - epoch_length - acceptable_pool_box_delay_blocks;
Ok(PoolHealth {
status: if is_healthy {
PoolStatus::Ok
} else {
PoolStatus::Down
},
details: PoolHealthDetails {
pool_box_height,
current_height,
epoch_length,
},
})
}
35 changes: 31 additions & 4 deletions core/src/oracle_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use derive_more::Sub;
use serde::Deserialize;
use serde::Serialize;

#[derive(PartialEq, PartialOrd, Eq, Ord, Debug, Serialize, Deserialize, Copy, Clone)]
#[derive(PartialEq, PartialOrd, Eq, Ord, Debug, Serialize, Deserialize, Copy, Clone, From)]
#[serde(transparent)]
pub struct BlockHeight(pub u32);

Expand All @@ -21,6 +21,21 @@ impl std::ops::Sub<EpochLength> for BlockHeight {
}
}

impl std::ops::Add<EpochLength> for BlockHeight {
type Output = BlockHeight;
fn add(self, other: EpochLength) -> BlockHeight {
BlockHeight(self.0 + other.0 as u32)
}
}

impl std::ops::Add<u32> for BlockHeight {
type Output = BlockHeight;
fn add(self, other: u32) -> BlockHeight {
// Unwrap here to panic on overflow instead of wrapping around
BlockHeight(self.0.checked_add(other).unwrap())
}
}

impl std::ops::Sub<u32> for BlockHeight {
type Output = BlockHeight;
fn sub(self, other: u32) -> BlockHeight {
Expand All @@ -29,21 +44,33 @@ impl std::ops::Sub<u32> for BlockHeight {
}
}

impl From<BlockHeight> for i64 {
fn from(block_height: BlockHeight) -> Self {
block_height.0 as i64
}
}

impl std::fmt::Display for BlockHeight {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

#[derive(PartialEq, PartialOrd, Eq, Ord, Debug, Serialize, Deserialize, Copy, Clone)]
#[derive(PartialEq, PartialOrd, Eq, Ord, Debug, Serialize, Deserialize, Copy, Clone, From)]
#[serde(transparent)]
pub struct EpochLength(pub i32);

#[derive(PartialEq, PartialOrd, Eq, Ord, Debug, Serialize, Deserialize, Copy, Clone)]
impl From<EpochLength> for i64 {
fn from(epoch_length: EpochLength) -> Self {
epoch_length.0 as i64
}
}

#[derive(PartialEq, PartialOrd, Eq, Ord, Debug, Serialize, Deserialize, Copy, Clone, From)]
#[serde(transparent)]
pub struct EpochCounter(pub u32);

#[derive(PartialEq, PartialOrd, Eq, Ord, Debug, Serialize, Deserialize, Copy, Clone)]
#[derive(PartialEq, PartialOrd, Eq, Ord, Debug, Serialize, Deserialize, Copy, Clone, From)]
#[serde(transparent)]
pub struct MinDatapoints(pub i32);

Expand Down

0 comments on commit bcb2605

Please sign in to comment.