Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export Prometheus metrics #285

Merged
merged 21 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0a274ea
draft prometheus metrics serving via axum;
greenhat Jun 2, 2023
4c7c4fd
remove get_core_api_port() and pass the port number to axum service;
greenhat Jun 2, 2023
9185323
add port number for metrics server in OracleConfig;
greenhat Jun 5, 2023
0b9e187
define pool health prometheus metrics, extact types and re-use in api…
greenhat Jun 5, 2023
eb2ef41
rename pool status metric;
greenhat Jun 5, 2023
acbf6a7
add namespace and subsystem to prometheus metrics;
greenhat Jun 6, 2023
a38a52e
add oracle metrics to prometheus exporter;
greenhat Jun 6, 2023
b0ccaca
fix formatting;
greenhat Jun 6, 2023
bda76e8
fix collected oracle box label in prometheus metrics;
greenhat Jun 6, 2023
de3532c
add pool box rate to prometheus metrics;
greenhat Jun 7, 2023
ede669c
add node's wallet balance to prometheus metrics;
greenhat Jun 7, 2023
9d1b7cf
add reward token amount in the pool box to the prometheus metrics;
greenhat Jun 7, 2023
84eee88
add reward tokens in buyback box to prometheus metrics;
greenhat Jun 8, 2023
f559a7c
add all oracles box height with addresses to prometheus metrics and h…
greenhat Jun 9, 2023
c2c46eb
add active oracle boxes to the prometheus metrics and health REST API;
greenhat Jun 9, 2023
6404dd8
add active oracles count to prometheus metrics;
greenhat Jun 9, 2023
09d42c5
switch from IntGaugeVec to IntGauge for relevant metrics;
greenhat Jun 12, 2023
94a5f77
temporarily report oracle down in metrics to test alerts;
greenhat Jun 12, 2023
fe4a81b
restore oracle health metric;
greenhat Jun 12, 2023
583f715
set HealthStatus enum invariant integer values;
greenhat Jun 12, 2023
098d391
add grafana_dashboard.json;
greenhat Jun 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 16 additions & 10 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
[package]
name = "oracle-core"
version = "2.0.0-beta9"
authors = ["Robert Kornacki <11645932+robkorn@users.noreply.github.com>", "@greenhat", "@kettlebell", "@SethDusek"]
authors = [
"Robert Kornacki <11645932+robkorn@users.noreply.github.com>",
"@greenhat",
"@kettlebell",
"@SethDusek",
]
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down Expand Up @@ -29,19 +34,20 @@ ergo-lib = { workspace = true }
ergo-node-interface = { git = "https://github.com/ergoplatform/ergo-node-interface-rust", rev = "143c2a3dc8fb772d1af37f1f1e1924067c6aad14" }
# ergo-node-interface = { version = "0.4" }
derive_more = "0.99"
clap = {version = "4.2.4", features = ["derive"]}
clap = { version = "4.2.4", features = ["derive"] }
exitcode = "1.1.2"
lazy_static = "1.4.0"
once_cell = "1.15.0"
futures = "0.3"
prometheus = "0.13"

[dev-dependencies]
ergo-lib = { workspace = true, features = ["arbitrary"]}
proptest = {version = "1.0.0"}
proptest-derive = {version = "0.3.0"}
sigma-test-util = {version = "0.3.0"}
ergo-chain-sim = {version = "0.1.0", path="../ergo-chain-sim"}
env_logger = {version = "0.10.0"}
tokio-test = {version = "0.4"}
pretty_assertions = {workspace = true}
ergo-lib = { workspace = true, features = ["arbitrary"] }
proptest = { version = "1.0.0" }
proptest-derive = { version = "0.3.0" }
sigma-test-util = { version = "0.3.0" }
ergo-chain-sim = { version = "0.1.0", path = "../ergo-chain-sim" }
env_logger = { version = "0.10.0" }
tokio-test = { version = "0.4" }
pretty_assertions = { workspace = true }
expect-test = "1.0.1"
142 changes: 14 additions & 128 deletions core/src/address_util.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use ergo_lib::ergotree_ir::{
chain::address::{Address, AddressEncoder, AddressEncoderError},
mir::constant::{Constant, Literal},
serialization::{SigmaParsingError, SigmaSerializable, SigmaSerializationError},
sigma_protocol::sigma_boolean::ProveDlog,
};
use ergo_lib::ergo_chain_types::EcPoint;
use ergo_lib::ergotree_ir::chain::address::Address;
use ergo_lib::ergotree_ir::chain::address::AddressEncoderError;
use ergo_lib::ergotree_ir::chain::address::NetworkAddress;
use ergo_lib::ergotree_ir::chain::address::NetworkPrefix;
use ergo_lib::ergotree_ir::serialization::SigmaParsingError;
use ergo_lib::ergotree_ir::serialization::SigmaSerializationError;
use thiserror::Error;

#[derive(Error, Debug)]
Expand All @@ -22,126 +23,11 @@ pub enum AddressUtilError {
Base16DecodeError(#[from] base16::DecodeError),
}

/// Given a P2S Ergo address, extract the hex-encoded serialized ErgoTree (script)
pub fn address_to_tree(address: &str) -> Result<String, AddressUtilError> {
let address_parsed = AddressEncoder::unchecked_parse_network_address_from_str(address)?;
let script = address_parsed.address().script()?;
Ok(base16::encode_lower(&script.sigma_serialize_bytes()?))
}

/// Given a P2S Ergo address, convert it to a hex-encoded Sigma byte array constant
pub fn address_to_bytes(address: &str) -> Result<String, AddressUtilError> {
let address_parsed = AddressEncoder::unchecked_parse_network_address_from_str(address)?;
let script = address_parsed.address().script()?;
Ok(base16::encode_lower(
&Constant::from(script.sigma_serialize_bytes()?).sigma_serialize_bytes()?,
))
}

/// Given an Ergo P2PK Address, convert it to a raw hex-encoded EC point
/// and prepend the type bytes so it is encoded and ready
/// to be used in a register.
pub fn address_to_raw_for_register(address: &str) -> Result<String, AddressUtilError> {
let address_parsed = AddressEncoder::unchecked_parse_network_address_from_str(address)?;
match address_parsed.address() {
Address::P2Pk(ProveDlog { h }) => Ok(base16::encode_lower(
&Constant::from(*h).sigma_serialize_bytes()?,
)),
Address::P2SH(_) | Address::P2S(_) => Err(AddressUtilError::ExpectedP2PK),
}
}

/// Given an Ergo P2PK Address, convert it to a raw hex-encoded EC point
pub fn address_to_raw(address: &str) -> Result<String, AddressUtilError> {
let address_parsed = AddressEncoder::unchecked_parse_network_address_from_str(address)?;
match address_parsed.address() {
Address::P2Pk(_) => Ok(base16::encode_lower(
&address_parsed.address().content_bytes(),
)),
Address::P2SH(_) | Address::P2S(_) => Err(AddressUtilError::ExpectedP2PK),
}
}

/// Given a raw hex-encoded EC point, convert it to a P2PK address
pub fn raw_to_address(raw: &str) -> Result<Address, AddressUtilError> {
let bytes = base16::decode(raw)?;
Address::p2pk_from_pk_bytes(&bytes).map_err(Into::into)
}

/// Given a raw hex-encoded EC point from a register (thus with type encoded characters in front),
/// convert it to a P2PK address
pub fn raw_from_register_to_address(raw: &str) -> Result<Address, AddressUtilError> {
let bytes = base16::decode(raw)?;
let constant = Constant::sigma_parse_bytes(&bytes)?;
if let Literal::GroupElement(h) = constant.v {
Ok(Address::P2Pk(ProveDlog { h }))
} else {
Err(AddressUtilError::ExpectedP2PK)
}
}

#[cfg(test)]
mod test {
use ergo_lib::ergotree_ir::chain::address::{AddressEncoder, NetworkPrefix};

use crate::address_util::{
address_to_bytes, address_to_raw, address_to_raw_for_register, address_to_tree,
raw_from_register_to_address, raw_to_address,
};

// Test serialization for default address argument of /utils/addressToRaw
#[test]
fn test_address_to_raw_for_register() {
assert_eq!(
"07028333f9f7454f8d5ff73dbac9833767ed6fc3a86cf0a73df946b32ea9927d9197",
address_to_raw_for_register("3WwbzW6u8hKWBcL1W7kNVMr25s2UHfSBnYtwSHvrRQt7DdPuoXrt")
.unwrap()
);
assert_eq!(
"028333f9f7454f8d5ff73dbac9833767ed6fc3a86cf0a73df946b32ea9927d9197",
address_to_raw("3WwbzW6u8hKWBcL1W7kNVMr25s2UHfSBnYtwSHvrRQt7DdPuoXrt").unwrap()
);
}
#[test]
fn test_address_raw_roundtrip() {
let address = AddressEncoder::new(NetworkPrefix::Testnet)
.parse_address_from_str("3WwbzW6u8hKWBcL1W7kNVMr25s2UHfSBnYtwSHvrRQt7DdPuoXrt")
.unwrap();
assert_eq!(
address,
raw_to_address(
&address_to_raw("3WwbzW6u8hKWBcL1W7kNVMr25s2UHfSBnYtwSHvrRQt7DdPuoXrt").unwrap()
)
.unwrap()
);
}
#[test]
fn test_address_raw_register_roundtrip() {
let address = AddressEncoder::new(NetworkPrefix::Testnet)
.parse_address_from_str("3WwbzW6u8hKWBcL1W7kNVMr25s2UHfSBnYtwSHvrRQt7DdPuoXrt")
.unwrap();
assert_eq!(
address,
raw_from_register_to_address(
&address_to_raw_for_register(
"3WwbzW6u8hKWBcL1W7kNVMr25s2UHfSBnYtwSHvrRQt7DdPuoXrt"
)
.unwrap()
)
.unwrap()
);
}

// test serialization of "sigmaProp(true)" script
#[test]
fn test_address_to_tree() {
assert_eq!(
"10010101d17300",
address_to_tree("Ms7smJwLGbUAjuWQ").unwrap()
);
assert_eq!(
"0e0710010101d17300",
address_to_bytes("Ms7smJwLGbUAjuWQ").unwrap()
);
}
pub fn pks_to_network_addresses(
pks: Vec<EcPoint>,
network_prefix: NetworkPrefix,
) -> Vec<NetworkAddress> {
pks.into_iter()
.map(|pk| NetworkAddress::new(network_prefix, &Address::P2Pk(pk.into())))
.collect()
}
105 changes: 33 additions & 72 deletions core/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use std::convert::From;
use std::net::SocketAddr;
use std::sync::Arc;

use crate::box_kind::{OracleBoxWrapper, PoolBox};
use crate::node_interface::node_api::NodeApi;
use crate::oracle_config::{get_core_api_port, ORACLE_CONFIG};
use crate::box_kind::PoolBox;
use crate::monitor::{check_oracle_health, check_pool_health, PoolHealth};
use crate::node_interface::node_api::{NodeApi, NodeApiError};
use crate::oracle_config::ORACLE_CONFIG;
use crate::oracle_state::{DataSourceError, LocalDatapointState, OraclePool};
use crate::pool_config::POOL_CONFIG;
use axum::http::StatusCode;
Expand Down Expand Up @@ -135,26 +136,8 @@ fn pool_status_sync(oracle_pool: Arc<OraclePool>) -> Result<Json<serde_json::Val
.epoch_length();
let pool_box_height = pool_box.get_box().creation_height;
let epoch_end_height = pool_box_height + epoch_length.0 as u32;

let posted_boxes = oracle_pool
.get_posted_datapoint_boxes_source()
.get_posted_datapoint_boxes()?;
let posted_count_current_epoch = posted_boxes
.into_iter()
.filter(|b| b.get_box().creation_height >= pool_box_height)
.count();

let collected_boxes = oracle_pool
.get_collected_datapoint_boxes_source()
.get_collected_datapoint_boxes()?;
let collected_count_previous_epoch = collected_boxes
.into_iter()
.filter(|b| b.get_box().creation_height == pool_box_height)
.count();

let active_oracle_count = collected_count_previous_epoch + posted_count_current_epoch;
let pool_health = pool_health_sync(oracle_pool)?;

let active_oracle_count = pool_health.details.active_oracles.len();
let json = Json(json!({
"latest_pool_datapoint": pool_box.rate(),
"latest_pool_box_height": pool_box_height,
Expand Down Expand Up @@ -202,72 +185,43 @@ fn oracle_health_sync(oracle_pool: Arc<OraclePool>) -> Result<serde_json::Value,
.get_pool_box_source()
.get_pool_box()?
.get_box()
.creation_height;
let mut check_details = json!({
"pool_box_height": pool_box_height,
});
let is_healthy = match oracle_pool
.get_local_datapoint_box_source()
.get_local_oracle_datapoint_box()?
{
Some(b) => match b {
OracleBoxWrapper::Posted(posted_box) => {
let creation_height = posted_box.get_box().creation_height;
check_details["posted_box_height"] = json!(creation_height);
creation_height > pool_box_height
}
OracleBoxWrapper::Collected(collected_box) => {
let creation_height = collected_box.get_box().creation_height;
check_details["collected_box_height"] = json!(creation_height);
creation_height == pool_box_height
}
},
None => false,
};
let json = json!({
"status": if is_healthy { "OK" } else { "DOWN" },
"details": check_details,
});
Ok(json)
.creation_height
.into();
let oracle_health = check_oracle_health(oracle_pool, pool_box_height)?;
Ok(serde_json::to_value(oracle_health).unwrap())
}

async fn pool_health(oracle_pool: Arc<OraclePool>) -> Result<Json<serde_json::Value>, ApiError> {
let json = task::spawn_blocking(|| pool_health_sync(oracle_pool))
let json = task::spawn_blocking(|| pool_health_sync_json(oracle_pool))
.await
.unwrap()?;
Ok(Json(json))
}
fn pool_health_sync(oracle_pool: Arc<OraclePool>) -> Result<serde_json::Value, ApiError> {
let pool_conf = &POOL_CONFIG;

fn pool_health_sync(oracle_pool: Arc<OraclePool>) -> Result<PoolHealth, ApiError> {
let node_api = NodeApi::new(ORACLE_CONFIG.node_api_key.clone(), &ORACLE_CONFIG.node_url);
let current_height = node_api.node.current_block_height()? as u32;
let current_height = (node_api.node.current_block_height()? as u32).into();
let pool_box_height = oracle_pool
.get_pool_box_source()
.get_pool_box()?
.get_box()
.creation_height;
let epoch_length = pool_conf
.refresh_box_wrapper_inputs
.contract_inputs
.contract_parameters()
.epoch_length()
.0 as u32;
let check_details = json!({
"pool_box_height": pool_box_height,
"current_block_height": current_height,
"epoch_length": epoch_length,
});
let is_healthy = pool_box_height >= current_height - epoch_length;
let json = json!({
"status": if is_healthy { "OK" } else { "DOWN" },
"details": check_details,
});
Ok(json)
.creation_height
.into();
let network_prefix = node_api.get_change_address()?.network();
let pool_health =
check_pool_health(current_height, pool_box_height, oracle_pool, network_prefix)?;
Ok(pool_health)
}

fn pool_health_sync_json(oracle_pool: Arc<OraclePool>) -> Result<serde_json::Value, ApiError> {
let pool_health = pool_health_sync(oracle_pool)?;
Ok(serde_json::to_value(pool_health).unwrap())
}

pub async fn start_rest_server(
repost_receiver: Receiver<bool>,
oracle_pool: Arc<OraclePool>,
api_port: u16,
) -> Result<(), anyhow::Error> {
let op_clone = oracle_pool.clone();
let op_clone2 = oracle_pool.clone();
Expand All @@ -290,7 +244,8 @@ pub async fn start_rest_server(
.allow_origin(tower_http::cors::Any)
.allow_methods([axum::http::Method::GET]),
);
let addr = SocketAddr::from(([0, 0, 0, 0], get_core_api_port().parse().unwrap()));
let addr = SocketAddr::from(([0, 0, 0, 0], api_port));
log::info!("Starting REST server on {}", addr);
axum::Server::try_bind(&addr)?
.serve(app.into_make_service())
.await?;
Expand Down Expand Up @@ -322,3 +277,9 @@ impl From<anyhow::Error> for ApiError {
ApiError(format!("Error: {:?}", err))
}
}

impl From<NodeApiError> for ApiError {
fn from(err: NodeApiError) -> Self {
ApiError(format!("NodeApiError: {:?}", err))
}
}
17 changes: 12 additions & 5 deletions core/src/box_kind/oracle_box.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,10 @@ impl OracleBox for OracleBoxWrapper {
}

fn public_key(&self) -> EcPoint {
self.get_box()
.get_register(NonMandatoryRegisterId::R4.into())
.unwrap()
.try_extract_into::<EcPoint>()
.unwrap()
match self {
OracleBoxWrapper::Posted(p) => p.public_key().clone(),
OracleBoxWrapper::Collected(c) => c.public_key().clone(),
}
}

fn get_box(&self) -> &ErgoBox {
Expand Down Expand Up @@ -276,6 +275,14 @@ impl CollectedOracleBox {
pub fn get_box(&self) -> &ErgoBox {
&self.ergo_box
}

pub fn public_key(&self) -> EcPoint {
self.ergo_box
.get_register(NonMandatoryRegisterId::R4.into())
.unwrap()
.try_extract_into::<EcPoint>()
.unwrap()
}
}

#[derive(Clone, Debug)]
Expand Down