Skip to content

Commit

Permalink
refactor: break tools out from monitor into agent-utils
Browse files Browse the repository at this point in the history
  • Loading branch information
prestwich committed Aug 11, 2022
1 parent 24347f7 commit ce27242
Show file tree
Hide file tree
Showing 24 changed files with 392 additions and 329 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ resolver = "2"

members = [
"accumulator",
"agent-utils",
"nomad-types",
"nomad-core",
"nomad-base",
Expand Down
26 changes: 26 additions & 0 deletions agent-utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "agent-utils"
version = "0.1.0"
edition = "2021"
authors = ["James Prestwich <james@nomad.xyz>"]
description = "Utils for building better agents"
repository = "https://github.com/nomad-xyz/rust"
license = "MIT OR Apache-2.0"
keywords = ["Ethereum", "Nomad"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.0.1", features = ["rt", "macros"] }
tracing = "0.1.35"
ethers = { git = "https://github.com/gakonst/ethers-rs", branch = "master", default-features = false }

tracing-subscriber = "0.2.15"
eyre = "0.6.8"
warp = "0.3.2"
async-trait = "0.1.56"
futures-util = "0.3.21"
tracing-test = "0.2.3"

nomad-core = { path = "../nomad-core", default-features = false }
nomad-xyz-configuration = { version = "0.1.0-rc.25", path = "../configuration" }
11 changes: 11 additions & 0 deletions agent-utils/src/aliases.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use std::collections::HashMap;

use tokio::{sync::mpsc, task::JoinHandle};

pub type Restartable<Task> = JoinHandle<crate::TaskResult<Task>>;

pub type Faucet<T> = mpsc::UnboundedReceiver<T>;
pub type Sink<T> = mpsc::UnboundedSender<T>;

pub type NetworkMap<'a, T> = HashMap<&'a str, T>;
pub type HomeReplicaMap<'a, T> = HashMap<&'a str, std::collections::HashMap<&'a str, T>>;
44 changes: 44 additions & 0 deletions agent-utils/src/init.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use nomad_xyz_configuration::{get_builtin, NomadConfig};
use tracing::Level;
use tracing_subscriber::EnvFilter;

pub fn config_from_file() -> Option<NomadConfig> {
std::env::var("CONFIG_PATH")
.ok()
.and_then(|path| NomadConfig::from_file(path).ok())
}

pub fn config_from_env() -> Option<NomadConfig> {
std::env::var("RUN_ENV")
.ok()
.and_then(|env| get_builtin(&env))
.map(ToOwned::to_owned)
}

pub fn config() -> eyre::Result<NomadConfig> {
config_from_file()
.or_else(config_from_env)
.ok_or_else(|| eyre::eyre!("Unable to load config from file or env"))
}

pub fn init_tracing() {
let builder = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(Level::INFO)
.with_env_filter(EnvFilter::from_default_env())
.with_level(true);
if std::env::var("MONITOR_PRETTY").is_ok() {
builder.pretty().init()
} else {
builder.json().init()
}
}

pub fn networks_from_env() -> Option<Vec<String>> {
std::env::var("MONITOR_NETWORKS")
.ok()
.map(|s| s.split(',').map(ToOwned::to_owned).collect())
}

pub fn rpc_from_env(network: &str) -> Option<String> {
std::env::var(format!("{}_CONNECTION_URL", network.to_uppercase())).ok()
}
181 changes: 181 additions & 0 deletions agent-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
pub mod aliases;
pub mod init;
pub mod macros;
pub mod pipe;
pub mod utils;

use std::panic;

pub use aliases::*;

use tokio::task::JoinHandle;

#[derive(Debug)]
pub enum TaskResult<T> {
Recoverable {
task: T,
err: eyre::Report,
},
Unrecoverable {
err: eyre::Report,
worth_logging: bool,
},
}

pub trait ProcessStep: std::fmt::Display {
fn spawn(self) -> Restartable<Self>
where
Self: 'static + Send + Sync + Sized;

/// Run the task until it panics. Errors result in a task restart with the
/// same channels. This means that an error causes the task to lose only
/// the data that is in-scope when it faults.
fn run_until_panic(self) -> JoinHandle<()>
where
Self: 'static + Send + Sync + Sized,
{
let task_description = format!("{}", self);
tokio::spawn(async move {
let mut handle = self.spawn();
loop {
let result = handle.await;

let again = match result {
Ok(TaskResult::Recoverable { task, err }) => {
tracing::warn!(
error = %err,
task = task_description.as_str(),
"Restarting task",
);
task
}

Ok(TaskResult::Unrecoverable { err, worth_logging }) => {
if worth_logging {
tracing::error!(err = %err, task = task_description.as_str(), "Unrecoverable error encountered");
} else {
tracing::trace!(err = %err, task = task_description.as_str(), "Unrecoverable error encountered");
}
break;
}

Err(e) => {
let panic_res = e.try_into_panic();

if panic_res.is_err() {
tracing::trace!(
task = task_description.as_str(),
"Internal task cancelled",
);
break;
}
let p = panic_res.unwrap();
tracing::error!(task = task_description.as_str(), "Internal task panicked");
panic::resume_unwind(p);
}
};

utils::noisy_sleep(15_000).await;
handle = again.spawn();
}
})
}
}

#[cfg(test)]
mod test {
use crate::{ProcessStep, TaskResult};

struct RecoverableTask;
impl std::fmt::Display for RecoverableTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "RecoverableTask")
}
}

impl ProcessStep for RecoverableTask {
fn spawn(self) -> crate::Restartable<Self>
where
Self: 'static + Send + Sync + Sized,
{
tokio::spawn(async move {
TaskResult::Recoverable {
task: self,
err: eyre::eyre!("This error was recoverable"),
}
})
}
}

#[tokio::test]
#[tracing_test::traced_test]
async fn test_recovery() {
let handle = RecoverableTask.run_until_panic();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
handle.abort();
let result = handle.await;

assert!(logs_contain("RecoverableTask"));
assert!(logs_contain("Restarting task"));
assert!(logs_contain("This error was recoverable"));
assert!(result.is_err() && result.unwrap_err().is_cancelled());
}

struct UnrecoverableTask;
impl std::fmt::Display for UnrecoverableTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "UnrecoverableTask")
}
}

impl ProcessStep for UnrecoverableTask {
fn spawn(self) -> crate::Restartable<Self>
where
Self: 'static + Send + Sync + Sized,
{
tokio::spawn(async move {
TaskResult::Unrecoverable {
err: eyre::eyre!("This error was unrecoverable"),
worth_logging: true,
}
})
}
}

#[tokio::test]
#[tracing_test::traced_test]
async fn test_unrecoverable() {
let handle = UnrecoverableTask.run_until_panic();
let result = handle.await;
assert!(logs_contain("UnrecoverableTask"));
assert!(logs_contain("Unrecoverable error encountered"));
assert!(logs_contain("This error was unrecoverable"));
assert!(result.is_ok());
}

struct PanicTask;
impl std::fmt::Display for PanicTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PanicTask")
}
}

impl ProcessStep for PanicTask {
fn spawn(self) -> crate::Restartable<Self>
where
Self: 'static + Send + Sync + Sized,
{
tokio::spawn(async move { panic!("intentional panic :)") })
}
}

#[tokio::test]
#[tracing_test::traced_test]
async fn test_panic() {
let handle = PanicTask.run_until_panic();
let result = handle.await;
assert!(logs_contain("PanicTask"));
assert!(logs_contain("Internal task panicked"));
assert!(result.is_err() && result.unwrap_err().is_panic());
}
}
8 changes: 4 additions & 4 deletions agents/monitor/src/macros.rs → agent-utils/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ macro_rules! bail_task_if {
($cond:expr, $self:ident, $err:expr) => {
if $cond {
let err = eyre::eyre!($err);
return $crate::steps::TaskResult::Recoverable { task: $self, err };
return $crate::TaskResult::Recoverable { task: $self, err };
}
};
}
Expand All @@ -22,7 +22,7 @@ macro_rules! unwrap_channel_item_unrecoverable {
tracing::debug!(
task = %$self, "inbound channel broke"
);
return $crate::steps::TaskResult::Unrecoverable{err: eyre::eyre!("inbound channel broke"), worth_logging: false}
return $crate::TaskResult::Unrecoverable{err: eyre::eyre!("inbound channel broke"), worth_logging: false}
}
$channel_item.unwrap()
}};
Expand All @@ -38,7 +38,7 @@ macro_rules! unwrap_pipe_item_unrecoverable {
tracing::debug!(
task = %$self, "inbound pipe broke"
);
return $crate::steps::TaskResult::Unrecoverable{err: eyre::eyre!("inbound pipe broke"), worth_logging: false}
return $crate::TaskResult::Unrecoverable{err: eyre::eyre!("inbound pipe broke"), worth_logging: false}
}
$pipe_item.unwrap()
}};
Expand All @@ -59,7 +59,7 @@ macro_rules! unwrap_result_recoverable {
macro_rules! send_unrecoverable {
($tx:expr, $item:expr, $self:ident) => {
if $tx.send($item).is_err() {
return $crate::steps::TaskResult::Unrecoverable {
return $crate::TaskResult::Unrecoverable {
err: eyre::eyre!("outbound channel broke"),
worth_logging: false,
};
Expand Down
42 changes: 42 additions & 0 deletions agent-utils/src/pipe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::fmt::Debug;

use eyre::bail;

use crate::aliases::*;

#[derive(Debug)]
pub struct Pipe<T> {
rx: Faucet<T>,
tx: Sink<T>,
contents: Option<T>,
}

impl<T> Pipe<T>
where
T: Debug + Send + Sync + 'static,
{
pub fn new(rx: Faucet<T>, tx: Sink<T>, contents: Option<T>) -> Self {
Self { rx, tx, contents }
}

pub fn read(&self) -> Option<&T> {
self.contents.as_ref()
}

pub fn finish(&mut self) -> eyre::Result<()> {
if let Some(contents) = self.contents.take() {
self.tx.send(contents)?;
}
Ok(())
}

pub async fn next(&mut self) -> eyre::Result<&T> {
self.finish()?;

self.contents = self.rx.recv().await;
if self.contents.is_none() {
bail!("rx broke")
}
Ok(self.read().expect("checked"))
}
}

0 comments on commit ce27242

Please sign in to comment.