Skip to content

Commit

Permalink
Merge pull request #47 from bastion-rs/lightproc
Browse files Browse the repository at this point in the history
LightProc Implementation
  • Loading branch information
vertexclique committed Oct 24, 2019
2 parents e2f72ed + 7049d70 commit d3636e3
Show file tree
Hide file tree
Showing 17 changed files with 1,521 additions and 4 deletions.
2 changes: 1 addition & 1 deletion bastion/Cargo.toml
Expand Up @@ -4,7 +4,7 @@ version = "0.2.1-alpha.0"
description = "Fault-tolerant Runtime for Rust applications"
authors = ["Mahmut Bulut <vertexclique@gmail.com>"]
keywords = ["fault-tolerant", "runtime", "actor", "system"]
categories = []
categories = ["concurrency", "asynchronous"]
homepage = "https://github.com/bastion-rs/bastion"
repository = "https://github.com/bastion-rs/bastion"
documentation = "https://docs.rs/bastion"
Expand Down
16 changes: 13 additions & 3 deletions lightproc/Cargo.toml
@@ -1,7 +1,17 @@
[package]
name = "lightproc"
version = "0.2.1-alpha.0"
version = "0.3.0-alpha.0"
description = "Lightweight process abstraction for Rust"
authors = ["Mahmut Bulut <vertexclique@gmail.com>"]
keywords = ["fault-tolerant", "runtime", "actor", "system"]
categories = []
keywords = ["fault-tolerant", "runtime", "actor", "system", "lightweight-process"]
categories = ["concurrency", "asynchronous"]
edition = "2018"

[dependencies]
crossbeam-utils = "0.6"
pin-utils = "0.1.0-alpha.4"

[dev-dependencies]
crossbeam = "0.7"
futures-preview = "=0.3.0-alpha.19"
lazy_static = "1.3.0"
57 changes: 57 additions & 0 deletions lightproc/examples/proc_panic.rs
@@ -0,0 +1,57 @@
use crossbeam::channel::{unbounded, Sender};
use futures::executor;
use lazy_static::lazy_static;
use lightproc::prelude::*;
use std::future::Future;
use std::thread;

fn spawn_on_thread<F, R>(future: F) -> RecoverableHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
lazy_static! {
// A channel that holds scheduled procs.
static ref QUEUE: Sender<LightProc> = {
let (sender, receiver) = unbounded::<LightProc>();

// Start the executor thread.
thread::spawn(move || {
for proc in receiver {
proc.run();
}
});

sender
};
}

let schedule = |t| QUEUE.send(t).unwrap();
let (proc, handle) = LightProc::recoverable(
future,
schedule,
ProcStack::default()
.with_pid(1)
.with_before_start(|| {
println!("Before start");
})
.with_after_complete(|| {
println!("After complete");
})
.with_after_panic(|| {
println!("After panic");
}),
);

proc.schedule();

handle
}

fn main() {
let handle = spawn_on_thread(async {
panic!("Panic here!");
});

executor::block_on(handle);
}
51 changes: 51 additions & 0 deletions lightproc/examples/proc_run.rs
@@ -0,0 +1,51 @@
use crossbeam::channel;
use futures::executor;
use lightproc::prelude::*;
use std::future::Future;
use std::sync::Arc;
use std::thread;

fn spawn_on_thread<F, R>(fut: F) -> ProcHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (sender, receiver) = channel::unbounded();
let sender = Arc::new(sender);
let s = Arc::downgrade(&sender);

let future = async move {
let _ = sender;
fut.await
};

let schedule = move |t| s.upgrade().unwrap().send(t).unwrap();
let (proc, handle) = LightProc::build(
future,
schedule,
ProcStack::default()
.with_pid(1)
.with_before_start(|| {
println!("Before start");
})
.with_after_complete(|| {
println!("After complete");
}),
);

proc.schedule();

thread::spawn(move || {
for proc in receiver {
proc.run();
}
});

handle
}

fn main() {
executor::block_on(spawn_on_thread(async {
println!("Hello, world!");
}));
}
37 changes: 37 additions & 0 deletions lightproc/src/catch_unwind.rs
@@ -0,0 +1,37 @@
use pin_utils::unsafe_pinned;
use std::any::Any;
use std::future::Future;
use std::panic::{catch_unwind, AssertUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::task::{Context, Poll};

#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CatchUnwind<Fut>
where
Fut: Future,
{
future: Fut,
}

impl<Fut> CatchUnwind<Fut>
where
Fut: Future + UnwindSafe,
{
unsafe_pinned!(future: Fut);

pub(super) fn new(future: Fut) -> CatchUnwind<Fut> {
CatchUnwind { future }
}
}

impl<Fut> Future for CatchUnwind<Fut>
where
Fut: Future + UnwindSafe,
{
type Output = Result<Fut::Output, Box<dyn Any + Send>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
catch_unwind(AssertUnwindSafe(|| self.future().poll(cx)))?.map(Ok)
}
}
31 changes: 31 additions & 0 deletions lightproc/src/layout_helpers.rs
@@ -0,0 +1,31 @@
use std::alloc::Layout;
use std::io::{Error, ErrorKind};

#[inline]
pub(crate) fn extend(layout: Layout, next: Layout) -> (Layout, usize) {
let new_align = std::cmp::max(layout.align(), next.align());
let pad = padding_needed_for(layout, next.align());

let offset = layout
.size()
.checked_add(pad)
.ok_or(Error::new(
ErrorKind::Other,
"Padding overflow check failed",
))
.unwrap();
let new_size = offset
.checked_add(next.size())
.ok_or(Error::new(ErrorKind::Other, "New size can't be computed"))
.unwrap();

let layout = Layout::from_size_align(new_size, new_align).unwrap();
(layout, offset)
}

#[inline]
pub(crate) fn padding_needed_for(layout: Layout, align: usize) -> usize {
let len = layout.size();
let len_rounded_up = len.wrapping_add(align).wrapping_sub(1) & !align.wrapping_sub(1);
len_rounded_up.wrapping_sub(len)
}
22 changes: 22 additions & 0 deletions lightproc/src/lib.rs
@@ -0,0 +1,22 @@
mod layout_helpers;
mod proc_data;
mod proc_layout;
mod proc_vtable;
mod raw_proc;
mod state;

pub mod catch_unwind;
pub mod lightproc;
pub mod proc_ext;
pub mod proc_handle;
pub mod proc_stack;
pub mod recoverable_handle;

pub mod prelude {
pub use crate::catch_unwind::*;
pub use crate::lightproc::*;
pub use crate::proc_ext::*;
pub use crate::proc_handle::*;
pub use crate::proc_stack::*;
pub use crate::recoverable_handle::*;
}
122 changes: 122 additions & 0 deletions lightproc/src/lightproc.rs
@@ -0,0 +1,122 @@
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
use std::ptr::NonNull;

use crate::proc_data::ProcData;
use crate::proc_ext::ProcFutureExt;
use crate::proc_handle::ProcHandle;
use crate::proc_stack::*;
use crate::raw_proc::RawProc;
use crate::recoverable_handle::RecoverableHandle;
use std::panic::AssertUnwindSafe;

pub struct LightProc {
/// A pointer to the heap-allocated proc.
pub(crate) raw_proc: NonNull<()>,
}

unsafe impl Send for LightProc {}
unsafe impl Sync for LightProc {}

impl LightProc {
pub fn recoverable<F, R, S>(
future: F,
schedule: S,
stack: ProcStack,
) -> (LightProc, RecoverableHandle<R>)
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
S: Fn(LightProc) + Send + Sync + 'static,
{
let recovery_future = AssertUnwindSafe(future).catch_unwind();
let (proc, handle) = Self::build(recovery_future, schedule, stack);
(proc, RecoverableHandle(handle))
}

pub fn build<F, R, S>(future: F, schedule: S, stack: ProcStack) -> (LightProc, ProcHandle<R>)
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
S: Fn(LightProc) + Send + Sync + 'static,
{
let raw_proc = RawProc::allocate(stack, future, schedule);
let proc = LightProc { raw_proc: raw_proc };
let handle = ProcHandle {
raw_proc: raw_proc,
_marker: PhantomData,
};
(proc, handle)
}

pub fn schedule(self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
mem::forget(self);

unsafe {
((*pdata).vtable.schedule)(ptr);
}
}

pub fn run(self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
mem::forget(self);

unsafe {
((*pdata).vtable.run)(ptr);
}
}

pub fn cancel(&self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;

unsafe {
(*pdata).cancel();
}
}

pub fn stack(&self) -> &ProcStack {
let offset = ProcData::offset_stack();
let ptr = self.raw_proc.as_ptr();

unsafe {
let raw = (ptr as *mut u8).add(offset) as *const ProcStack;
&*raw
}
}
}

impl Drop for LightProc {
fn drop(&mut self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;

unsafe {
// Cancel the proc.
(*pdata).cancel();

// Drop the future.
((*pdata).vtable.drop_future)(ptr);

// Drop the proc reference.
((*pdata).vtable.decrement)(ptr);
}
}
}

impl fmt::Debug for LightProc {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;

f.debug_struct("LightProc")
.field("pdata", unsafe { &(*pdata) })
.field("stack", self.stack())
.finish()
}
}

0 comments on commit d3636e3

Please sign in to comment.