diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..d7c3f756 --- /dev/null +++ b/.gitignore @@ -0,0 +1,21 @@ +# Generated by Cargo +# will have compiled files and executables +/target/ +**/target + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +/target +**/*.rs.bk + +*.bc + +bcs + +# Intellij stuff +.idea/ diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000..92915c6a --- /dev/null +++ b/.travis.yml @@ -0,0 +1,8 @@ +language: rust +rust: + - nightly +matrix: + fast_finish: true +script: + - cargo build --verbose --all + - cargo test --verbose --all diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 00000000..2e407f30 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,70 @@ +[package] +name = "bastion" +version = "0.1.0" +description = "Fault-tolerant Runtime for Rust applications" +authors = ["Mahmut Bulut "] +keywords = ["fault-tolerant", "runtime", "actor", "system"] +categories = [] +homepage = "https://github.com/vertexclique/bastion" +repository = "https://github.com/vertexclique/bastion" +documentation = "https://docs.rs/bastion" +readme = "README.md" +license = "MIT" +edition = "2018" +exclude = [ + ".github/*", + "examples/*", + "graphstore/*", + "tests/*", + "img/*", + "ci/*", + "benches/*", + "doc/*", + "*.png", + "*.dot", + "*.yml", + "*.toml", + "*.md" +] + +[badges] +travis-ci = { repository = "vertexclique/bastion", branch = "master" } +maintenance = { status = "actively-developed" } + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = "^0.1" +tokio-executor = "0.1.8" +tokio-threadpool = "0.1.15" +futures = "0.1.28" +log = "0.4.7" +env_logger = "0.6.1" +crossbeam-channel = "0.3.8" +ratelimit = "0.4.4" +backtrace = "0.3.32" +ego-tree = "0.6.0" +lazy_static = "1.3.0" +objekt = "0.1.2" +signal-hook = "0.1.10" + +#futures-preview = "=0.3.0-alpha.16" +uuid = { version = "0.7", features = ["serde", "v4"] } + +[dev-dependencies] +reqwest = "0.9.19" + +[profile.bench] +panic = "unwind" +opt-level = 3 +debug = false +rpath = false +lto = false +debug-assertions = false +codegen-units = 1 + +[profile.debug] +panic = "unwind" + +[profile.release] +panic = "unwind" diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..5cc03ccc --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) Mahmut Bulut + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 00000000..5376adf0 --- /dev/null +++ b/README.md @@ -0,0 +1,144 @@ +
+
+
+ +----------------- + +

Bastion: Fault-tolerant Runtime for Rust applications

+ +*The breeze of the cold winter had come. This time of the year, Crashers arrive in the village. These are the giants who can't be evaded. They born to destroy and take people to the Paniks Heights. Suddenly, The developer descried the blurry silhouettes of Crashers from afar.* + +*With cold and cracked voice, he whispered:* + +*It is time to go to **Bastion Fort**.* + +--- + + + + + + + + + + + + + + + + + + + + + + + + + +
Latest Release + + Crates.io + +
License + + Crates.io + +
Build Status + + travis build status + +
Downloads + + Crates.io + +
Gitter + + + +
+ +--- + +Bastion is a Fault-tolerant Runtime for Rust applications. +It detect panics during runs of your code and serves a runtime to +prevent abrubt exits. Also, it enables you to continue serving in case of +a failure. You can select your own recovery scenario, scale your workers and +define whole application on top of it. + +--- + +## Usage + +Bastion comes with a default one-for-one strategy root supervisor. +You can use this to launch automatically supervised tasks. + +Check [root supervisor](https://github.com/vertexclique/bastion/blob/master/examples/root_spv.rs) example in examples. + +[Examples](https://github.com/vertexclique/bastion/blob/master/examples) cover all the use cases in the frame of the crate. + +In most simple way you can use Bastion like here: +```rust +use bastion::bastion::Bastion; +use bastion::spawn::RuntimeSpawn; +use bastion::child::Message; + +fn main() { + Bastion::platform(); + + let message = String::from("Some message to be passed"); + + Bastion::spawn( + |_context, msg: Box| { + let received_msg = msg.as_any().downcast_ref::().unwrap(); + + println!("Received message: {:?}", received_msg); + println!("root supervisor - spawn_at_root - 1"); + }, + message, + ); + + Bastion::start() +} +``` + +## Structure of the Runtime + +Runtime is structured by the user. Only root supervision comes in batteries-included fashion. +Worker code, worker group redundancy, supervisors and their supervision strategies are defined by the user. + +You can see overall architecture of the framework here: +![](img/bastion-arch.png) + + +## License + +License is [MIT](https://github.com/vertexclique/bastion/blob/master/LICENSE) + +## Documentation + +Official documentation is hosted on [docs.rs](https://docs.rs/bastion). + +## Getting Help +Please head to our [Gitter](https://gitter.im/bastionframework/community) or use [StackOverflow](https://stackoverflow.com/questions/tagged/bastionframework) + +## Discussion and Development +We use [Gitter](https://gitter.im/bastionframework/community) for development discussions. Also please don't hesitate to open issues on GitHub ask for features, report bugs, comment on design and more! +More interaction and more ideas are better! + +## Contributing to Bastion [![Open Source Helpers](https://www.codetriage.com/vertexclique/bastion/badges/users.svg)](https://www.codetriage.com/vertexclique/bastion) + +All contributions, bug reports, bug fixes, documentation improvements, enhancements and ideas are welcome. + +A detailed overview on how to contribute can be found in the [CONTRIBUTING guide](.github/CONTRIBUTING.md) on GitHub. + +### Thanks + +Thanks to my dear mom (Günnur Bulut) who is an artist with many things to do but +spending her efforts without any hesitation on small things that I requested +(Like this logo). My shining star. + +Also thanks to my friend [Berkan Yavrı](http://github.com/yavrib) who came with the idea of making this. +Debated over the approaches that I took, spent time on thinking of this project with me. diff --git a/examples/root_spv.rs b/examples/root_spv.rs new file mode 100644 index 00000000..920a1d02 --- /dev/null +++ b/examples/root_spv.rs @@ -0,0 +1,26 @@ +use bastion::bastion::Bastion; +use bastion::child::Message; +use bastion::context::BastionContext; +use bastion::spawn::RuntimeSpawn; + +fn main() { + Bastion::platform(); + + let message = String::from("Some message to be passed"); + + Bastion::spawn( + |context: BastionContext, msg: Box| { + // Message can be casted and reused here. + let received_msg = msg.as_any().downcast_ref::().unwrap(); + + println!("Received message: {:?}", received_msg); + println!("root supervisor - spawn_at_root - 1"); + + // Rebind to the system + context.hook(); + }, + message, + ); + + Bastion::start() +} diff --git a/examples/spv_one_for_all.rs b/examples/spv_one_for_all.rs new file mode 100644 index 00000000..a1766a27 --- /dev/null +++ b/examples/spv_one_for_all.rs @@ -0,0 +1,47 @@ +use bastion::bastion::Bastion; +use bastion::context::BastionContext; +use bastion::supervisor::SupervisionStrategy; +use std::{fs, thread}; + +fn main() { + Bastion::platform(); + + let message = "Supervision Message".to_string(); + let message2 = "Some Other Message".to_string(); + + // Name of the supervisor, and system of the new supervisor + // By default if you don't specify Supervisors use "One for One". + // We are going to take a look at "One For All" strategy. + Bastion::supervisor("background-worker", "new-system") + .strategy(SupervisionStrategy::OneForAll) + .children( + |p: BastionContext, _msg| { + println!("File below doesn't exist so it will panic."); + fs::read_to_string("kakafoni").unwrap(); + + // Hook to rebind to the system. + p.hook(); + }, + message, + 1_i32, + ) + .children( + |p: BastionContext, _msg| { + // No early exit + let mut i = 0; + loop { + i = i + 1; + // Start everyone under this supervisor. Immediately for all of them. + println!("Going to fail {} :: {:?}", i, thread::current()); + + // Hook to rebind to the system. + p.clone().hook(); + } + }, + message2, + 2_i32, + ) + .launch(); + + Bastion::start() +} diff --git a/examples/spv_one_for_one.rs b/examples/spv_one_for_one.rs new file mode 100644 index 00000000..eab5eacd --- /dev/null +++ b/examples/spv_one_for_one.rs @@ -0,0 +1,51 @@ +use bastion::bastion::Bastion; +use bastion::context::BastionContext; +use bastion::supervisor::SupervisionStrategy; +use std::{fs, thread}; + +fn main() { + Bastion::platform(); + + let message = "Supervision Message".to_string(); + let message2 = "Some Other Message".to_string(); + + // Name of the supervisor, and system of the new supervisor + // By default if you don't specify Supervisors use "One for One". + // Let's look at "One for One". + Bastion::supervisor("background-worker", "new-system") + .strategy(SupervisionStrategy::OneForOne) + .children( + |p: BastionContext, _msg| { + println!("File below doesn't exist so it will panic."); + fs::read_to_string("kakafoni").unwrap(); + + // Hook to rebind to the system. + p.hook(); + }, + message, + 1_i32, + ) + .children( + |p: BastionContext, _msg| { + // No early exit + let mut i = 0; + loop { + i = i + 1; + println!( + "This is not going to fail. \ + One for One strategy behavior. {} :: {:?}", + i, + thread::current() + ); + + // Hook to rebind to the system. + p.clone().hook(); + } + }, + message2, + 2_i32, + ) + .launch(); + + Bastion::start() +} diff --git a/examples/spv_rest_for_one.rs b/examples/spv_rest_for_one.rs new file mode 100644 index 00000000..20013632 --- /dev/null +++ b/examples/spv_rest_for_one.rs @@ -0,0 +1,55 @@ +use bastion::bastion::Bastion; +use bastion::context::BastionContext; +use bastion::supervisor::SupervisionStrategy; +use core::time; +use std::{fs, thread}; + +fn main() { + Bastion::platform(); + + let message = "Supervision Message".to_string(); + let message2 = "Some Other Message".to_string(); + + // Name of the supervisor, and system of the new supervisor + // By default if you don't specify Supervisors use "One for One". + // We are going to take a look at "Rest for One" strategy. + Bastion::supervisor("background-worker", "new-system") + .strategy(SupervisionStrategy::RestForOne) + .children( + |p: BastionContext, _msg| { + println!("File below doesn't exist so it will panic."); + fs::read_to_string("kakafoni").unwrap(); + + // Hook to rebind to the system. + p.hook(); + }, + message, + 1_i32, + ) + .children( + |p: BastionContext, _msg| { + // No early exit + let mut i = 0; + loop { + i = i + 1; + // Going to fail with other children in this group. + // Difference between "Rest for One" and "One for All" is: + // * One for All sends PoisonPill to everyone. + // * Rest for One sends only PoisonPill to rest other than related group. + println!("Going to fail {} :: {:?}", i, thread::current()); + + // Behave like some heavy-lifting work occuring. + let ten_millis = time::Duration::from_millis(2000); + thread::sleep(ten_millis); + + // Hook to rebind to the system. + p.clone().hook(); + } + }, + message2, // Message which will be passed around. + 1_i32, // How many child will be replicated from the worker closure. + ) + .launch(); + + Bastion::start() +} diff --git a/img/bastion-arch.png b/img/bastion-arch.png new file mode 100644 index 00000000..30b2b911 Binary files /dev/null and b/img/bastion-arch.png differ diff --git a/img/bastion.png b/img/bastion.png new file mode 100644 index 00000000..3c57ea2e Binary files /dev/null and b/img/bastion.png differ diff --git a/src/bastion.rs b/src/bastion.rs new file mode 100644 index 00000000..5f618f02 --- /dev/null +++ b/src/bastion.rs @@ -0,0 +1,322 @@ +use crate::child::{BastionChildren, BastionClosure, Message}; +use crate::config::BastionConfig; +use crate::context::BastionContext; +use crate::messages::PoisonPill; +use crate::runtime_manager::RuntimeManager; +use crate::runtime_system::RuntimeSystem; +use crate::spawn::RuntimeSpawn; +use crate::supervisor::{SupervisionStrategy, Supervisor}; +use crate::tramp::Tramp; +use crossbeam_channel::unbounded; +use ego_tree::{NodeRef, Tree}; +use env_logger::Builder; +use futures::future::poll_fn; +use lazy_static::lazy_static; +use log::LevelFilter; +use signal_hook::{iterator::Signals, SIGINT}; +use std::panic::AssertUnwindSafe; +use std::sync::{Arc, Mutex}; +use tokio::prelude::future::FutureResult; +use tokio::prelude::*; +use tokio::runtime::Runtime; +use uuid::Uuid; + +lazy_static! { + // Platform which contains runtime system. + pub static ref PLATFORM: Arc> = Arc::new(Mutex::new(RuntimeSystem::start())); + + // Fault induced supervisors queue + pub static ref FAULTED: Arc>> = + Arc::new(Mutex::new(Vec::::new())); +} + +pub struct Bastion { + pub config: BastionConfig, + log_builder: Builder, +} + +impl Bastion { + pub fn platform_from_config(config: BastionConfig) -> Self { + let log_builder = Builder::from_default_env(); + + let mut platform = Bastion { + config, + log_builder, + }; + + platform + .log_builder + .format(|buf, record| { + writeln!(buf, "[HIVE] - [{}] - {}", record.level(), record.args()) + }) + .filter(None, platform.config.log_level) + .is_test(platform.config.in_test) + .init(); + + platform + } + + pub fn platform() -> Self { + let default_config = BastionConfig { + log_level: LevelFilter::Info, + in_test: false, + }; + + Bastion::platform_from_config(default_config) + } + + pub fn supervisor(name: &'static str, system: &'static str) -> Supervisor { + let sp = Supervisor::default().props(name.into(), system.into()); + Bastion::traverse(sp) + } + + // + // Placement projection in the supervision tree + // + fn traverse_registry( + mut registry: Tree, + root: NodeRef, + new_supervisor: &Supervisor, + ) { + for i in root.children() { + let curn = new_supervisor.urn.clone(); + let k = new_supervisor.clone(); + if i.value().urn == curn { + let mut ancestor = registry.get_mut(i.id()).unwrap(); + ancestor.append(k); + break; + } + Bastion::traverse_registry(registry.clone(), i, &*new_supervisor) + } + } + + fn traverse(ns: Supervisor) -> Supervisor { + let runtime = PLATFORM.lock().unwrap(); + let arcreg = runtime.registry.clone(); + let registry = arcreg.lock().unwrap(); + Bastion::traverse_registry(registry.clone(), registry.root(), &ns); + + ns + } + + pub(crate) fn fault_recovery(given: Supervisor, message_box: Box) { + // Clone supervisor for trampoline bouncing + let trampoline_spv = given.clone(); + + // Push supervisor for next trampoline + let fark = FAULTED.clone(); + let mut faulted_ones = fark.lock().unwrap(); + faulted_ones.push(given.clone()); + + debug!("Fault induced supervisors: {:?}", faulted_ones); + + let restart_needed = match trampoline_spv.strategy { + SupervisionStrategy::OneForOne => { + let killed = trampoline_spv.killed; + debug!( + "One for One – Children restart triggered for :: {:?}", + killed + ); + killed + } + SupervisionStrategy::OneForAll => { + trampoline_spv.descendants.iter().for_each(|children| { + let tx = children.tx.as_ref().unwrap().clone(); + debug!( + "One for All – Restart triggered for all :: {:?}", + children.id + ); + tx.send(PoisonPill::new()).unwrap_or(()); + }); + + // Don't make avalanche effect, send messages and wait for all to come back. + let killed_processes = trampoline_spv.killed.clone(); + debug!( + "One for All – Restart triggered for killed :: {:?}", + killed_processes + ); + killed_processes + } + SupervisionStrategy::RestForOne => { + // Find the rest in the group of killed one. + trampoline_spv.killed.iter().for_each(|killed| { + let mut rest_to_kill = trampoline_spv.descendants.clone(); + rest_to_kill.retain(|i| !killed.id.contains(&i.id)); + + rest_to_kill.iter().for_each(|children| { + let tx = children.tx.as_ref().unwrap().clone(); + debug!( + "Rest for One – Restart triggered for rest :: {:?}", + children.id + ); + tx.send(PoisonPill::new()).unwrap_or(()); + }); + }); + + let killed_processes = trampoline_spv.killed.clone(); + debug!( + "Rest for One – Restart triggered for killed :: {:?}", + killed_processes + ); + + killed_processes + } + }; + + debug!("Restart Needed for – {:?}", restart_needed); + + Tramp::Traverse(restart_needed).execute(|desc| { + let message_clone = objekt::clone_box(&*message_box); + match &desc { + n if n.is_empty() => Tramp::Complete(n.to_vec()), + n => { + let mut rest_child = n.clone(); + if let Some(children) = rest_child.pop() { + //: Box<(dyn BastionClosure + 'static)> = + let bt = objekt::clone_box(&*children.thunk); + let message_box = objekt::clone_box(&*message_box); + let tx = children.tx.as_ref().unwrap().clone(); + let rx = children.rx.clone().unwrap(); + + let f = future::lazy(move || { + bt( + BastionContext { + bcast_rx: Some(rx.clone()), + bcast_tx: Some(tx.clone()), + }, + message_box, + ); + future::ok::<(), ()>(()) + }); + + let k = AssertUnwindSafe(f).catch_unwind().then( + |result| -> FutureResult<(), ()> { + if let Err(err) = result { + error!("Panic happened in restarted - {:?}", err); + let fark = FAULTED.clone(); + let mut faulted_ones = fark.lock().unwrap(); + let faulted = faulted_ones.pop().unwrap(); + + // Make trampoline to re-enter + drop(faulted_ones); + drop(fark); + Bastion::fault_recovery(faulted, message_clone); + } + future::ok(()) + }, + ); + + let ark = PLATFORM.clone(); + let mut runtime = ark.lock().unwrap(); + let shared_runtime: &mut Runtime = &mut runtime.runtime; + shared_runtime.spawn(k); + } + + Tramp::Traverse(rest_child) + } + } + }); + } + + pub fn start() { + Bastion::runtime_shutdown_callback() + } +} + +impl RuntimeSpawn for Bastion { + fn spawn(thunk: F, msg: M) -> BastionChildren + where + F: BastionClosure, + M: Message, + { + let bt = Box::new(thunk); + let msg_box = Box::new(msg); + let (p, c) = unbounded(); + let child = BastionChildren { + id: Uuid::new_v4().to_string(), + tx: Some(p), + rx: Some(c), + redundancy: 1_i32, + msg: objekt::clone_box(&*msg_box), + thunk: objekt::clone_box(&*bt), + }; + + let message_clone = objekt::clone_box(&*msg_box); + let if_killed = child.clone(); + let ret_val = child.clone(); + + { + let ark = PLATFORM.clone(); + let runtime = ark.lock().unwrap(); + let mut registry = runtime.registry.lock().unwrap(); + let mut rootn = registry.root_mut(); + let root: &mut Supervisor = rootn.value(); + + root.descendants.push(child); + } + + let tx = ret_val.tx.as_ref().unwrap().clone(); + let rx = ret_val.rx.clone().unwrap(); + + let f = future::lazy(move || { + bt( + BastionContext { + bcast_rx: Some(rx.clone()), + bcast_tx: Some(tx.clone()), + }, + msg_box, + ); + future::ok::<(), ()>(()) + }); + + let k = AssertUnwindSafe(f) + .catch_unwind() + .then(|result| -> FutureResult<(), ()> { + let ark = PLATFORM.clone(); + let runtime = ark.lock().unwrap(); + let mut registry = runtime.registry.lock().unwrap(); + let mut rootn = registry.root_mut(); + let mut root = rootn.value().clone(); + + root.killed.push(if_killed); + + // Enable re-entrant code + drop(registry); + drop(runtime); + if let Err(err) = result { + error!("Panic happened in root level child - {:?}", err); + Bastion::fault_recovery(root.clone(), message_clone); + } + future::ok(()) + }); + + let ark = PLATFORM.clone(); + let mut runtime = ark.lock().unwrap(); + let shared_runtime: &mut Runtime = &mut runtime.runtime; + shared_runtime.spawn(k); + + ret_val + } +} + +type Never = (); +const CLOSE_OVER: Result, Never> = Ok(Async::Ready(())); + +impl RuntimeManager for Bastion { + fn runtime_shutdown_callback() { + let mut entered = tokio_executor::enter().expect("main thread_local runtime lock"); + let signals = Signals::new(&[SIGINT]).unwrap(); + + entered + .block_on(poll_fn(|| { + for sig in signals.forever() { + match sig { + signal_hook::SIGINT => break, + _ => unreachable!(), + } + } + CLOSE_OVER + })) + .expect("cannot shutdown"); + } +} diff --git a/src/child.rs b/src/child.rs new file mode 100644 index 00000000..d2c9d42d --- /dev/null +++ b/src/child.rs @@ -0,0 +1,79 @@ +use crate::context::BastionContext; +use core::fmt; +use core::fmt::{Debug, Formatter}; +use crossbeam_channel::{Receiver, Sender}; +use std::any::Any; +use uuid::Uuid; + +pub trait Shell: Send + Sync + objekt::Clone + Any + 'static {} +impl Shell for T where T: Send + Sync + objekt::Clone + Any + 'static {} + +pub trait Message: Shell + Debug { + fn as_any(&self) -> &dyn Any; +} +impl Message for T +where + T: Shell + Debug, +{ + fn as_any(&self) -> &dyn Any { + self + } +} + +pub trait BastionClosure: Fn(BastionContext, Box) + Shell {} +impl BastionClosure for T where T: Fn(BastionContext, Box) + Shell {} + +pub struct BastionChildren { + pub id: String, + pub redundancy: i32, + pub tx: Option>>, + pub rx: Option>>, + pub thunk: Box, + pub msg: Box, +} + +impl PartialEq for BastionChildren { + fn eq(&self, other: &Self) -> bool { + self.id == other.id && self.redundancy == other.redundancy + } +} + +impl Debug for BastionChildren { + fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> { + // TODO: add thunk ref address here + write!( + f, + "(ID :: {:?}, Redundancy :: {:?})", + self.id, self.redundancy + ) + } +} + +impl Default for BastionChildren { + fn default() -> Self { + let e = Box::new(|_: BastionContext, _: Box| {}); + let m = Box::new("default".to_string()); + + BastionChildren { + id: Uuid::new_v4().to_string(), + redundancy: 1_i32, + tx: None, + rx: None, + thunk: e, + msg: m, + } + } +} + +impl Clone for BastionChildren { + fn clone(&self) -> Self { + BastionChildren { + id: self.id.to_string(), + tx: self.tx.clone(), + rx: self.rx.clone(), + thunk: objekt::clone_box(&*self.thunk), + redundancy: self.redundancy, + msg: objekt::clone_box(&*self.msg), + } + } +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 00000000..f027de15 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,7 @@ +use log::LevelFilter; + +#[derive(Clone, Copy)] +pub struct BastionConfig { + pub log_level: LevelFilter, + pub in_test: bool, +} diff --git a/src/context.rs b/src/context.rs new file mode 100644 index 00000000..1291a432 --- /dev/null +++ b/src/context.rs @@ -0,0 +1,50 @@ +use crate::child::Message; +use crate::messages::PoisonPill; +use crossbeam_channel::{Receiver, Sender}; +use ratelimit::Limiter; +use std::any::Any; +use std::time::Duration; + +#[derive(Clone)] +pub struct BastionContext { + pub bcast_tx: Option>>, + pub bcast_rx: Option>>, +} + +impl BastionContext { + fn dispatch_clock() -> Limiter { + ratelimit::Builder::new() + .capacity(1) + .quantum(1) + .interval(Duration::new(0, 100)) + .build() + } + + pub fn hook(self) { + let mut dc = BastionContext::dispatch_clock(); + dc.wait(); + let rx = self.bcast_rx.clone().unwrap(); + if let Ok(message) = rx.try_recv() { + let msg: &dyn Any = message.as_any(); + if msg.is::() { + dc.wait(); + panic!("PoisonPill"); + } + } + } + + pub fn blocking_hook(self) { + let mut dc = BastionContext::dispatch_clock(); + dc.wait(); + let rx = self.bcast_rx.clone().unwrap(); + loop { + if let Ok(message) = rx.try_recv() { + let msg: &dyn Any = message.as_any(); + if msg.is::() { + dc.wait(); + panic!("PoisonPill"); + } + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 00000000..e8f57c17 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,24 @@ +// Because if we can't trust, we can't make. +#![forbid(unsafe_code)] +#![feature(const_fn)] +#![feature(unboxed_closures)] +#![feature(fn_traits)] +#![feature(clamp)] +#![feature(panic_info_message)] + +#[macro_use] +extern crate log; +extern crate env_logger; + +pub mod child; +pub mod context; + +pub mod messages; +mod runtime_manager; +pub mod spawn; +pub mod supervisor; +pub mod tramp; + +pub mod bastion; +pub mod config; +pub mod runtime_system; diff --git a/src/macros.rs b/src/macros.rs new file mode 100644 index 00000000..8c67ae08 --- /dev/null +++ b/src/macros.rs @@ -0,0 +1,8 @@ +//macro_rules! supervisor(bastion) { +// (eval $e:expr) => {{ +// { +// let val: usize = $e; // Force types to be integers +// println!("{} = {}", stringify!{$e}, val); +// } +// }}; +//} diff --git a/src/messages.rs b/src/messages.rs new file mode 100644 index 00000000..900cde85 --- /dev/null +++ b/src/messages.rs @@ -0,0 +1,14 @@ +use std::any::Any; + +#[derive(Clone, Debug, Default, PartialEq)] +pub struct PoisonPill; + +impl PoisonPill { + pub fn new() -> Box { + Box::new(PoisonPill::default()) + } + + pub fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/runtime_manager.rs b/src/runtime_manager.rs new file mode 100644 index 00000000..95c30e3b --- /dev/null +++ b/src/runtime_manager.rs @@ -0,0 +1,9 @@ +use std::any::Any; + +pub(crate) trait RuntimeManager { + fn runtime_shutdown_callback(); +} + +pub(crate) trait FaultRecovery { + fn panic_dispatcher(failure: Box); +} diff --git a/src/runtime_system.rs b/src/runtime_system.rs new file mode 100644 index 00000000..1eb5a1aa --- /dev/null +++ b/src/runtime_system.rs @@ -0,0 +1,38 @@ +use crate::runtime_manager::FaultRecovery; +use crate::supervisor::Supervisor; +use ego_tree::Tree; +use std::any::Any; +use std::sync::{Arc, Mutex}; +use tokio::runtime::{Builder, Runtime}; + +pub struct RuntimeSystem { + pub runtime: Runtime, + pub registry: Arc>>, +} + +impl RuntimeSystem { + pub fn start() -> Self { + let runtime: Runtime = Builder::new() + .panic_handler(|err| RuntimeSystem::panic_dispatcher(err)) + .before_stop(|| { + debug!("Executing thread stopping"); + }) + .build() + .unwrap(); // Builder panic isn't a problem since we haven't started. + + let root_supervisor = Supervisor::default().props("root".into(), "root".into()); + + let tree = ego_tree::Tree::new(root_supervisor); + + let registry_tree = Arc::new(Mutex::new(tree)); + + RuntimeSystem { + runtime, + registry: registry_tree, + } + } +} + +impl FaultRecovery for RuntimeSystem { + fn panic_dispatcher(_failure: Box) {} +} diff --git a/src/spawn.rs b/src/spawn.rs new file mode 100644 index 00000000..fbf6fd4b --- /dev/null +++ b/src/spawn.rs @@ -0,0 +1,8 @@ +use crate::child::{BastionChildren, BastionClosure, Message}; + +pub trait RuntimeSpawn { + fn spawn(thunk: F, msg: M) -> BastionChildren + where + F: BastionClosure, + M: Message; +} diff --git a/src/supervisor.rs b/src/supervisor.rs new file mode 100644 index 00000000..85e8d8fb --- /dev/null +++ b/src/supervisor.rs @@ -0,0 +1,142 @@ +use crate::child::{BastionChildren, BastionClosure, Message}; +use crate::context::BastionContext; +use crossbeam_channel::unbounded; +use std::cmp::Ordering; +use std::panic::AssertUnwindSafe; +use tokio::prelude::future::FutureResult; +use tokio::prelude::*; +use uuid::Uuid; + +#[derive(Clone, PartialOrd, PartialEq, Eq, Debug)] +pub struct SupervisorURN { + pub sys: String, // Supervisor System Name + pub name: String, // Supervisor Name + pub res: String, // Supervisor Identifier +} + +impl Default for SupervisorURN { + fn default() -> Self { + let uuid_gen = Uuid::new_v4(); + SupervisorURN { + sys: "bastion".to_owned(), + name: "default-supervisor".to_owned(), + res: uuid_gen.to_string(), + } + } +} + +impl Ord for SupervisorURN { + fn cmp(&self, other: &Self) -> Ordering { + self.sys + .cmp(&other.sys) + .then(self.name.cmp(&other.name)) + .then(self.res.cmp(&other.res)) + } +} + +#[derive(Clone, Debug)] +pub enum SupervisionStrategy { + OneForOne, + OneForAll, + RestForOne, +} + +impl Default for SupervisionStrategy { + fn default() -> Self { + SupervisionStrategy::OneForOne + } +} + +#[derive(Default, Clone, Debug)] +pub struct Supervisor { + pub urn: SupervisorURN, + pub(crate) descendants: Vec, + pub(crate) killed: Vec, + pub(crate) strategy: SupervisionStrategy, +} + +impl Supervisor { + pub fn props(mut self, name: String, system: String) -> Self { + let mut urn = SupervisorURN::default(); + urn.name = name; + self.urn = urn; + self.urn.sys = system; + self + } + + pub fn strategy(mut self, strategy: SupervisionStrategy) -> Self { + self.strategy = strategy; + self + } + + pub fn children(mut self, thunk: F, msg: M, scale: i32) -> Self + where + F: BastionClosure, + M: Message, + { + let bt = Box::new(thunk); + let msg_box = Box::new(msg); + let (p, c) = unbounded(); + + let children = BastionChildren { + id: Uuid::new_v4().to_string(), + tx: Some(p), + rx: Some(c), + redundancy: scale, + msg: objekt::clone_box(&*msg_box), + thunk: objekt::clone_box(&*bt), + }; + + self.descendants.push(children); + + self + } + + pub fn launch(self) { + for descendant in &self.descendants { + let descendant = descendant.clone(); + + for child_id in 0..descendant.redundancy { + let tx = descendant.tx.as_ref().unwrap().clone(); + let rx = descendant.rx.clone().unwrap(); + + let nt = objekt::clone_box(&*descendant.thunk); + let msgr = objekt::clone_box(&*descendant.msg); + let msgr_panic_handler = objekt::clone_box(&*descendant.msg); + let mut if_killed = descendant.clone(); + if_killed.id = format!("{}::{}", if_killed.id, child_id); + + let mut this_spv = self.clone(); + + let f = future::lazy(move || { + nt( + BastionContext { + bcast_rx: Some(rx.clone()), + bcast_tx: Some(tx.clone()), + }, + msgr, + ); + future::ok::<(), ()>(()) + }); + + let k = AssertUnwindSafe(f) + .catch_unwind() + .then(|result| -> FutureResult<(), ()> { + this_spv.killed.push(if_killed); + + // Already re-entrant code + if let Err(err) = result { + error!("Panic happened in supervised child - {:?}", err); + crate::bastion::Bastion::fault_recovery(this_spv, msgr_panic_handler); + } + future::ok(()) + }); + + let ark = crate::bastion::PLATFORM.clone(); + let mut runtime = ark.lock().unwrap(); + let shared_runtime = &mut runtime.runtime; + shared_runtime.spawn(k); + } + } + } +} diff --git a/src/tramp.rs b/src/tramp.rs new file mode 100644 index 00000000..0369634c --- /dev/null +++ b/src/tramp.rs @@ -0,0 +1,22 @@ +pub enum Tramp { + Traverse(R), + Complete(R), +} + +impl Tramp { + pub fn execute(mut self, f: F) -> R + where + F: Fn(R) -> Tramp, + { + loop { + match self { + Tramp::Traverse(value) => { + self = f(value); + } + Tramp::Complete(value) => { + return value; + } + } + } + } +} diff --git a/tests/lib.rs b/tests/lib.rs new file mode 100644 index 00000000..01294f41 --- /dev/null +++ b/tests/lib.rs @@ -0,0 +1,185 @@ +#[cfg(test)] +mod tests { + use bastion::bastion::Bastion; + use bastion::bastion::PLATFORM; + use bastion::config::BastionConfig; + use bastion::context::BastionContext; + use bastion::spawn::RuntimeSpawn; + use bastion::supervisor::SupervisionStrategy; + use log::LevelFilter; + use std::borrow::{Borrow, BorrowMut}; + use std::sync::Once; + use std::{fs, thread, time}; + use tokio::prelude::*; + use tokio::runtime::{Builder, Runtime}; + + static INIT: Once = Once::new(); + + fn init() { + INIT.call_once(|| { + let config = BastionConfig { + log_level: LevelFilter::Debug, + in_test: true, + }; + let bastion = Bastion::platform_from_config(config); + }); + } + + fn awaiting(time: u64) { + let ten_millis = time::Duration::from_millis(time); + thread::sleep(ten_millis); + } + + #[test] + fn spawn_at_root() { + init(); + + let message = "Kokojombo".to_string(); + let message2 = "Kokojombo Two".to_string(); + + Bastion::spawn( + |p, msg| { + println!("root supervisor - spawn_at_root - 1"); + }, + message, + ); + + Bastion::spawn( + |p, msg| { + println!("root supervisor - spawn_at_root - 2"); + }, + message2, + ); + + Bastion::supervisor("k", "m"); + + awaiting(10); + } + + #[test] + fn spawn_both_root_and_supervisor() { + init(); + + let message = "Kokojombo".to_string(); + let message2 = "Kokojombo Two".to_string(); + + Bastion::spawn( + |p, msg| { + println!("root supervisor - panic_roll_starting - 1"); + fs::read_to_string("kakafoni").unwrap(); + }, + message, + ); + + awaiting(500); + } + + #[test] + fn spawn_with_supervisor_one_for_one() { + init(); + + let message = "Supervision Message".to_string(); + let message2 = "Some Other Message".to_string(); + + Bastion::supervisor("background-worker", "new-system") + .children( + |_p, _msg| { + println!("new supervisor - panic_roll_starting - 1"); + fs::read_to_string("kakafoni").unwrap(); + }, + message, + 1_i32, + ) + .children( + |p: BastionContext, _msg| { + // No early exit + let mut i = 0; + loop { + i = i + 1; + println!("CONTINUING {} :: {:?}", i, thread::current()); + awaiting(200); + p.clone().hook(); + } + }, + message2, + 2_i32, + ) + .launch(); + + awaiting(500); + } + + #[test] + fn spawn_with_supervisor_rest_for_one() { + init(); + + let panicked_message = "Panicked Children Message".to_string(); + let stable_message = "Stable Children Message".to_string(); + + Bastion::supervisor("background-worker", "new-system") + .strategy(SupervisionStrategy::RestForOne) + .children( + |p, msg| { + println!("new supervisor - panic_process - 1"); + fs::read_to_string("THERE_IS_NO_FILE_NAMED_THIS_AMIRITE").unwrap(); + }, + panicked_message, + 1_i32, + ) + .children( + |p: BastionContext, msg| { + println!("new supervisor - stable_process - 1"); + // No early exit + let mut i = 0; + loop { + i = i + 1; + println!("CONTINUING {} :: {:?}", i, thread::current()); + awaiting(200); + p.clone().hook(); + } + }, + stable_message, + 1_i32, + ) + .launch(); + + awaiting(500); + } + + #[test] + fn spawn_with_supervisor_one_for_all() { + init(); + + let panicked_message = "Panicked Children Message".to_string(); + let stable_message = "Stable Children Message".to_string(); + + Bastion::supervisor("background-worker", "new-system") + .strategy(SupervisionStrategy::OneForAll) + .children( + |p, msg| { + println!("new supervisor - panic_process - 1"); + fs::read_to_string("THERE_IS_NO_FILE_NAMED_THIS_AMIRITE").unwrap(); + }, + panicked_message, + 1_i32, + ) + .children( + |p: BastionContext, msg| { + println!("new supervisor - stable_process - 1"); + // No early exit + let mut i = 0; + loop { + i = i + 1; + println!("CONTINUING {} :: {:?}", i, thread::current()); + awaiting(200); + p.clone().hook(); + } + }, + stable_message, + 1_i32, + ) + .launch(); + + awaiting(500); + } +}