Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prestwich/monitor #214

Open
wants to merge 80 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
dfa911b
wip: monitor
prestwich Jul 15, 2022
60bcc68
feat: basic init logic
prestwich Jul 16, 2022
3e5d0ed
feat: full init logic
prestwich Jul 16, 2022
575cc4b
features: a lot
prestwich Jul 17, 2022
960c327
feature: event annotation
prestwich Jul 17, 2022
7d7f2f0
feature: terminal step
prestwich Jul 17, 2022
e727661
wip
prestwich Jul 18, 2022
02ac15b
chore: rebuild
prestwich Jul 18, 2022
e12e84e
feature: dispatch producer
prestwich Jul 18, 2022
4b872ac
feature: basic dispatch tracking
prestwich Jul 18, 2022
c567491
feature: basic event latency tracking
prestwich Jul 19, 2022
4371025
bug: fix hardcoded event name
prestwich Jul 19, 2022
4a5a7ef
wip: dispatch_wait
prestwich Jul 19, 2022
d7de862
fix: broken event loops
prestwich Jul 19, 2022
b17b8ac
feature: update-to-relay wait metrics
prestwich Jul 20, 2022
a5c4bc8
chore: lock
prestwich Jul 20, 2022
b8c0cb0
refactor: more functions, more fun
prestwich Jul 20, 2022
5c513ae
refactor: Faucets<'a>
prestwich Jul 20, 2022
611b3fa
fix: one less panic in producer
prestwich Jul 20, 2022
0865966
refactor: use faucets in more places
prestwich Jul 20, 2022
51aa0c1
feature: relay to process metrics
prestwich Jul 20, 2022
dcccbd6
chore: remove unused imports
prestwich Jul 20, 2022
f60c2b4
wip
prestwich Jul 21, 2022
5702cda
feature: pipe process-once semantics
prestwich Jul 21, 2022
77154ec
refactor: between uses pipes
prestwich Jul 21, 2022
08587f6
feature: e2e bootup
prestwich Jul 21, 2022
2d04988
chore: update dockerfile
prestwich Jul 21, 2022
d5748db
chore: move magic numbers to named consts
prestwich Jul 21, 2022
23b8fc6
fix: remove debug todo!()
prestwich Jul 21, 2022
a27e4fb
chore: update Cargo.toml
prestwich Jul 22, 2022
85989ee
feature: graceful resume for getlogs pollers
prestwich Jul 22, 2022
4f9d5fe
refactor: dispatch wait uses pipes
prestwich Jul 24, 2022
25874b5
refactor: remove handles
prestwich Jul 24, 2022
94a9a96
refactor: change spawns to run_until_panic
prestwich Jul 24, 2022
509a066
fix: run_until_panic no longer invokes itself
prestwich Jul 24, 2022
fe6ead2
fix: improved logging in run_until_panic
prestwich Jul 24, 2022
c14a401
fix: terminate all channels to prevent memory leak
prestwich Jul 24, 2022
1279ca1
feature: default to info logging
prestwich Jul 25, 2022
b1b185e
refactor: make namespace a module constant in metrics
prestwich Jul 26, 2022
c91e5ce
fix: reasonable-ish buckets for histograms
prestwich Jul 26, 2022
2806282
chore: differentiate e2e time buckets
prestwich Jul 26, 2022
eb4d9c2
chore: undo rename of const var
prestwich Jul 26, 2022
10f8de4
fix: agent http servers now respond to all reqs with 301 to metrics e…
prestwich Jul 26, 2022
aed13a7
feature: replica label on update-to-relay timers
prestwich Jul 26, 2022
dd5ff64
feature: finish function on pipe
prestwich Jul 26, 2022
7b930e0
fix: standardize on f64 seconds, not millis
prestwich Jul 26, 2022
9138012
feature: more useful output in between and produce tracing
prestwich Jul 26, 2022
ac2f11c
feature: set MONITOR_NETWORKS to filter monitor watching networks
prestwich Jul 26, 2022
0d49c5c
fix: better distinguish between recoverable and non-recoverable errors
prestwich Jul 26, 2022
fee18e2
fix: switch back to json logging
prestwich Jul 26, 2022
5a98856
fix: producers properly update From
prestwich Jul 26, 2022
7f5d6cd
fix: prevent james from accidentally committing pretty over and over
prestwich Jul 26, 2022
be35c4e
fix: one more unintended crashloop
prestwich Jul 26, 2022
fce1067
feature: TaskResult to differentiate panics from unrecoverable task e…
prestwich Jul 27, 2022
fc2bfcf
feature: log count of producers
prestwich Jul 27, 2022
406256f
test: add recovery tests
prestwich Jul 27, 2022
8a64090
feature: quieter logging for unremarkable errors
prestwich Jul 27, 2022
3a1a932
refactor: can't send is an unrecoverable, not a panic
prestwich Jul 27, 2022
8268a25
feature: noisy_sleep to prevent producer sync
prestwich Jul 27, 2022
aa67e2a
fix: do not double count events on block range edges
prestwich Jul 27, 2022
ea25702
fix: prevent overly verbose logging of terminal
prestwich Jul 27, 2022
f9083fe
fix: prevent double logging recoverable bails
prestwich Jul 27, 2022
a0db4fd
fix: prevent producer panic on tip request
prestwich Jul 27, 2022
27911eb
feature: enhanced debug logging for all tasks
prestwich Jul 27, 2022
81579b9
fix: prevent panics in e2e
prestwich Jul 27, 2022
a6c01b1
fix: update_wait uses pipe now
prestwich Jul 28, 2022
93ff757
fix: e2e no longer crashes on untracked network
prestwich Jul 28, 2022
da8466e
fix: new macro had inverted if condition
prestwich Jul 28, 2022
4ed2b28
fix: register e2e metric
prestwich Jul 28, 2022
0e12527
feature: lots of nice traces for update and relay wait loops
prestwich Jul 28, 2022
5799565
nit: minor tracing event updates
prestwich Jul 28, 2022
87d6631
feature: gauge for unprocessed dispatches
prestwich Jul 28, 2022
c5b260d
chore: additional comment
prestwich Jul 28, 2022
e617521
refactor: associate e2e metrics with both home and remote
prestwich Jul 29, 2022
3e59236
fix: use a float literal instead of a cast
prestwich Jul 29, 2022
db5bfbd
lint: missing space
prestwich Jul 29, 2022
d644a78
feature: dispatch queue length gauge
prestwich Jul 29, 2022
7ea43dc
feature: gauges for updates not relayed
prestwich Jul 29, 2022
24347f7
chore: update help text for unrelayed updates
prestwich Jul 29, 2022
ce27242
refactor: break tools out from monitor into agent-utils
prestwich Aug 10, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1,200 changes: 507 additions & 693 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ resolver = "2"

members = [
"accumulator",
"agent-utils",
"nomad-types",
"nomad-core",
"nomad-base",
Expand All @@ -15,6 +16,7 @@ members = [
"agents/relayer",
"agents/watcher",
"agents/processor",
"agents/monitor",
"tools/kms-cli",
"tools/nomad-cli",
"tools/balance-exporter",
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ COPY ${TARGET_DIR}/release/updater \
${TARGET_DIR}/release/watcher \
${TARGET_DIR}/release/processor \
${TARGET_DIR}/release/kathy \
${TARGET_DIR}/release/monitor \
${TARGET_DIR}/release/kms-cli \
${TARGET_DIR}/release/nomad-cli ./

Expand Down
26 changes: 26 additions & 0 deletions agent-utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "agent-utils"
version = "0.1.0"
edition = "2021"
authors = ["James Prestwich <james@nomad.xyz>"]
description = "Utils for building better agents"
repository = "https://github.com/nomad-xyz/rust"
license = "MIT OR Apache-2.0"
keywords = ["Ethereum", "Nomad"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.0.1", features = ["rt", "macros"] }
tracing = "0.1.35"
ethers = { git = "https://github.com/gakonst/ethers-rs", branch = "master", default-features = false }

tracing-subscriber = "0.2.15"
eyre = "0.6.8"
warp = "0.3.2"
async-trait = "0.1.56"
futures-util = "0.3.21"
tracing-test = "0.2.3"

nomad-core = { path = "../nomad-core", default-features = false }
nomad-xyz-configuration = { version = "0.1.0-rc.25", path = "../configuration" }
11 changes: 11 additions & 0 deletions agent-utils/src/aliases.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use std::collections::HashMap;

use tokio::{sync::mpsc, task::JoinHandle};

pub type Restartable<Task> = JoinHandle<crate::TaskResult<Task>>;

pub type Faucet<T> = mpsc::UnboundedReceiver<T>;
pub type Sink<T> = mpsc::UnboundedSender<T>;

pub type NetworkMap<'a, T> = HashMap<&'a str, T>;
pub type HomeReplicaMap<'a, T> = HashMap<&'a str, std::collections::HashMap<&'a str, T>>;
44 changes: 44 additions & 0 deletions agent-utils/src/init.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use nomad_xyz_configuration::{get_builtin, NomadConfig};
use tracing::Level;
use tracing_subscriber::EnvFilter;

pub fn config_from_file() -> Option<NomadConfig> {
std::env::var("CONFIG_PATH")
.ok()
.and_then(|path| NomadConfig::from_file(path).ok())
}

pub fn config_from_env() -> Option<NomadConfig> {
std::env::var("RUN_ENV")
.ok()
.and_then(|env| get_builtin(&env))
.map(ToOwned::to_owned)
}

pub fn config() -> eyre::Result<NomadConfig> {
config_from_file()
.or_else(config_from_env)
.ok_or_else(|| eyre::eyre!("Unable to load config from file or env"))
}

pub fn init_tracing() {
let builder = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(Level::INFO)
.with_env_filter(EnvFilter::from_default_env())
.with_level(true);
if std::env::var("MONITOR_PRETTY").is_ok() {
builder.pretty().init()
} else {
builder.json().init()
}
}

pub fn networks_from_env() -> Option<Vec<String>> {
std::env::var("MONITOR_NETWORKS")
.ok()
.map(|s| s.split(',').map(ToOwned::to_owned).collect())
}

pub fn rpc_from_env(network: &str) -> Option<String> {
std::env::var(format!("{}_CONNECTION_URL", network.to_uppercase())).ok()
}
181 changes: 181 additions & 0 deletions agent-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
pub mod aliases;
pub mod init;
pub mod macros;
pub mod pipe;
pub mod utils;

use std::panic;

pub use aliases::*;

use tokio::task::JoinHandle;

#[derive(Debug)]
pub enum TaskResult<T> {
Recoverable {
task: T,
err: eyre::Report,
},
Unrecoverable {
err: eyre::Report,
worth_logging: bool,
},
}

pub trait ProcessStep: std::fmt::Display {
fn spawn(self) -> Restartable<Self>
where
Self: 'static + Send + Sync + Sized;

/// Run the task until it panics. Errors result in a task restart with the
/// same channels. This means that an error causes the task to lose only
/// the data that is in-scope when it faults.
fn run_until_panic(self) -> JoinHandle<()>
where
Self: 'static + Send + Sync + Sized,
{
let task_description = format!("{}", self);
tokio::spawn(async move {
let mut handle = self.spawn();
loop {
let result = handle.await;

let again = match result {
Ok(TaskResult::Recoverable { task, err }) => {
tracing::warn!(
error = %err,
task = task_description.as_str(),
"Restarting task",
);
task
}

Ok(TaskResult::Unrecoverable { err, worth_logging }) => {
if worth_logging {
tracing::error!(err = %err, task = task_description.as_str(), "Unrecoverable error encountered");
} else {
tracing::trace!(err = %err, task = task_description.as_str(), "Unrecoverable error encountered");
}
break;
}

Err(e) => {
let panic_res = e.try_into_panic();

if panic_res.is_err() {
tracing::trace!(
task = task_description.as_str(),
"Internal task cancelled",
);
break;
}
let p = panic_res.unwrap();
tracing::error!(task = task_description.as_str(), "Internal task panicked");
panic::resume_unwind(p);
}
};

utils::noisy_sleep(15_000).await;
handle = again.spawn();
}
})
}
}

#[cfg(test)]
mod test {
use crate::{ProcessStep, TaskResult};

struct RecoverableTask;
impl std::fmt::Display for RecoverableTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "RecoverableTask")
}
}

impl ProcessStep for RecoverableTask {
fn spawn(self) -> crate::Restartable<Self>
where
Self: 'static + Send + Sync + Sized,
{
tokio::spawn(async move {
TaskResult::Recoverable {
task: self,
err: eyre::eyre!("This error was recoverable"),
}
})
}
}

#[tokio::test]
#[tracing_test::traced_test]
async fn test_recovery() {
let handle = RecoverableTask.run_until_panic();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
handle.abort();
let result = handle.await;

assert!(logs_contain("RecoverableTask"));
assert!(logs_contain("Restarting task"));
assert!(logs_contain("This error was recoverable"));
assert!(result.is_err() && result.unwrap_err().is_cancelled());
}

struct UnrecoverableTask;
impl std::fmt::Display for UnrecoverableTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "UnrecoverableTask")
}
}

impl ProcessStep for UnrecoverableTask {
fn spawn(self) -> crate::Restartable<Self>
where
Self: 'static + Send + Sync + Sized,
{
tokio::spawn(async move {
TaskResult::Unrecoverable {
err: eyre::eyre!("This error was unrecoverable"),
worth_logging: true,
}
})
}
}

#[tokio::test]
#[tracing_test::traced_test]
async fn test_unrecoverable() {
let handle = UnrecoverableTask.run_until_panic();
let result = handle.await;
assert!(logs_contain("UnrecoverableTask"));
assert!(logs_contain("Unrecoverable error encountered"));
assert!(logs_contain("This error was unrecoverable"));
assert!(result.is_ok());
}

struct PanicTask;
impl std::fmt::Display for PanicTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PanicTask")
}
}

impl ProcessStep for PanicTask {
fn spawn(self) -> crate::Restartable<Self>
where
Self: 'static + Send + Sync + Sized,
{
tokio::spawn(async move { panic!("intentional panic :)") })
}
}

#[tokio::test]
#[tracing_test::traced_test]
async fn test_panic() {
let handle = PanicTask.run_until_panic();
let result = handle.await;
assert!(logs_contain("PanicTask"));
assert!(logs_contain("Internal task panicked"));
assert!(result.is_err() && result.unwrap_err().is_panic());
}
}
68 changes: 68 additions & 0 deletions agent-utils/src/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// bails a restartable task
#[macro_export]
macro_rules! bail_task_if {
($cond:expr, $self:ident, $err:expr,) => {
$crate::bail_task_if!($cond, $self, $err)
};
($cond:expr, $self:ident, $err:expr) => {
if $cond {
let err = eyre::eyre!($err);
return $crate::TaskResult::Recoverable { task: $self, err };
}
};
}

#[macro_export]
macro_rules! unwrap_channel_item_unrecoverable {
($channel_item:ident, $self:ident,) => {{
unwrap_channel_item_unrecoverable!($channel_item, $self)
}};
($channel_item:ident, $self:ident) => {{
if $channel_item.is_none() {
tracing::debug!(
task = %$self, "inbound channel broke"
);
return $crate::TaskResult::Unrecoverable{err: eyre::eyre!("inbound channel broke"), worth_logging: false}
}
$channel_item.unwrap()
}};
}

#[macro_export]
macro_rules! unwrap_pipe_item_unrecoverable {
($pipe_item:ident, $self:ident,) => {{
unwrap_pipe_item_unrecoverable!($pipe_item, $self)
}};
($pipe_item:ident, $self:ident) => {{
if $pipe_item.is_err() {
tracing::debug!(
task = %$self, "inbound pipe broke"
);
return $crate::TaskResult::Unrecoverable{err: eyre::eyre!("inbound pipe broke"), worth_logging: false}
}
$pipe_item.unwrap()
}};
}

#[macro_export]
macro_rules! unwrap_result_recoverable {
($result:ident, $self:ident,) => {{
unwrap_err_or_bail!($result, $self)
}};
($result:ident, $self:ident) => {{
bail_task_if!($result.is_err(), $self, $result.unwrap_err());
$result.unwrap()
}};
}

#[macro_export]
macro_rules! send_unrecoverable {
($tx:expr, $item:expr, $self:ident) => {
if $tx.send($item).is_err() {
return $crate::TaskResult::Unrecoverable {
err: eyre::eyre!("outbound channel broke"),
worth_logging: false,
};
}
};
}