Execute and orchestrate command line utils.
Based on io streams and optionally ipc-channel orchestrator is intended for starting and orchestrating programs.
This repo cannot be used for running production critical tasks, rather it is for local dev or non critical orchestration.
Working on linux and Mac OS X, Windows is not supported due to dependency on ipc_channels.
use tokio::process::{Command};
use ipc_orchestrator::orchestrator;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut orchestrator = orchestrator().ipc(false);
orchestrator.start("start", &mut Command::new("echo"));
orchestrator.connect().await
}
cargo run --example=orchestrate
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut orchestrator = orchestrator().ipc(true);
// Start pipeline: generate random f64 [0;1) -> sum -> write to stdout every 10_000 times
let mut cmd = Command::new("cargo");
orchestrator.start("generate", cmd.arg("run").arg("--example=generate"))
.expect("failed to start generate");
let mut cmd = Command::new("cargo");
orchestrator.start("sum", cmd.arg("run").arg("--example=sum"))
.expect("failed to start sum");
let mut cmd = Command::new("cargo");
orchestrator.start("mul", cmd.arg("run").arg("--example=write"))
.expect("failed to start mul");
// Connect log handlers and IPC handlers
let mut orchestra = match orchestrator.connect().await {
Err(_) => std::process::exit(1),
Ok(o) => o,
};
// Route IPC messages
orchestra.pipe_bridges("generate", "sum")?;
orchestra.pipe_bridges("sum", "write")?;
// Killing it hard since some spawned futures might still run
match orchestra.run().await {
Err(_) => std::process::exit(1),
_ => Ok(()),
}
}
use tokio::process::{Command, ChildStdout};
use ipc_orchestrator::Orchestrator;
use std::sync::atomic::{AtomicBool, Ordering};
static CALLED: AtomicBool = AtomicBool::new(false);
use tokio::io::{AsyncBufReadExt, BufReader};
// custom logs processor
async fn mock_log_handler(reader: ChildStdout, name: String) -> anyhow::Result<()> {
let mut reader = BufReader::new(reader).lines();
assert_eq!(reader.next_line().await?.unwrap(), "testbed");
CALLED.store(true, Ordering::Relaxed);
Ok(())
}
#[tokio::main]
async fn main() {
let mut orchestrator = Orchestrator::from_handlers(mock_log_handler).ipc(false);
let mut cmd = Command::new("echo");
cmd.arg("testbed");
orchestrator.start("start", &mut cmd);
let orchestra = orchestrator.connect().await.unwrap();
// it supposes never existing processes
// hence it will give error on when any process exit or stdout was closed
orchestra.run().await.unwrap_err();
assert!(CALLED.load(Ordering::Relaxed));
}