Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LightProc Implementation #47

Merged
merged 39 commits into from Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
a48bec7
Stub
vertexclique Oct 12, 2019
2ff7604
Initial lightproc development
vertexclique Oct 15, 2019
206b2af
Future layout allocation
vertexclique Oct 15, 2019
4561833
Realloc at the assignment side
vertexclique Oct 15, 2019
5cb0ccc
Fetch relative offsets for data
vertexclique Oct 16, 2019
2114610
cargo fix
vertexclique Oct 16, 2019
252893c
Realloc pointer alignment and Debug impls
vertexclique Oct 16, 2019
a846834
Remove debug lines
vertexclique Oct 16, 2019
7deedca
Test
vertexclique Oct 17, 2019
e076792
Offset access
vertexclique Oct 20, 2019
e2ec8e9
refactor
vertexclique Oct 20, 2019
1b0f39b
Merge pull request #50 from bastion-rs/lightproc-dev
vertexclique Oct 20, 2019
3838dec
After complete callback done
vertexclique Oct 20, 2019
2c548c2
Before start callback
vertexclique Oct 20, 2019
22367c8
Cargo fix
vertexclique Oct 20, 2019
929b1e9
Cargo fmt
vertexclique Oct 20, 2019
97f9e9a
Callback clone for sharing callbacks from stack
vertexclique Oct 21, 2019
a3aba3b
After panic callback is added
vertexclique Oct 24, 2019
000d42e
Format all the code
vertexclique Oct 24, 2019
8778921
Cargo fix
vertexclique Oct 24, 2019
e7d16b7
Remove unnecessary panic helpers
vertexclique Oct 24, 2019
58e0c72
Builder pattern
vertexclique Oct 24, 2019
5f358ef
Fix and format
vertexclique Oct 24, 2019
7321b6d
Add categories
vertexclique Oct 24, 2019
a6e7e9f
Expose organization
vertexclique Oct 24, 2019
9e29a09
Add bastion categories too
vertexclique Oct 24, 2019
a9042b2
Update lightproc/Cargo.toml
vertexclique Oct 24, 2019
3fb375a
Import reorganization
vertexclique Oct 24, 2019
3f4fae0
Expose catch_unwind
vertexclique Oct 24, 2019
9c19e6d
Update lightproc/src/layout_helpers.rs
vertexclique Oct 24, 2019
a758aea
Example organization
vertexclique Oct 24, 2019
b793f74
Update lightproc/src/layout_helpers.rs
vertexclique Oct 24, 2019
dcb9bc7
Update lightproc/src/raw_proc.rs
vertexclique Oct 24, 2019
523525e
Update lightproc/Cargo.toml
vertexclique Oct 24, 2019
30dfd4d
Update lightproc/Cargo.toml
vertexclique Oct 24, 2019
5be1840
Update lightproc/Cargo.toml
vertexclique Oct 24, 2019
b9b8e1c
Remove global proc_future type
vertexclique Oct 24, 2019
36d1ace
Order imports
vertexclique Oct 24, 2019
7049d70
Rename task naming to proc
vertexclique Oct 24, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 12 additions & 2 deletions lightproc/Cargo.toml
Expand Up @@ -3,5 +3,15 @@ name = "lightproc"
version = "0.2.1-alpha.0"
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
description = "Lightweight process abstraction for Rust"
authors = ["Mahmut Bulut <vertexclique@gmail.com>"]
keywords = ["fault-tolerant", "runtime", "actor", "system"]
categories = []
keywords = ["fault-tolerant", "runtime", "actor", "system", "lightweight-process"]
categories = ["concurrency", "asynchronous"]
edition = "2018"

[dependencies]
crossbeam-utils = "0.6.6"
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
pin-utils = "0.1.0-alpha.4"

[dev-dependencies]
crossbeam = "0.7.1"
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
futures-preview = "0.3.0-alpha.17"
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
lazy_static = "1.3.0"
61 changes: 61 additions & 0 deletions lightproc/examples/proc_panic.rs
@@ -0,0 +1,61 @@
use std::future::Future;

use std::thread;

use crossbeam::channel::{unbounded, Sender};
use futures::executor;
use lazy_static::lazy_static;
use lightproc::prelude::*;
vertexclique marked this conversation as resolved.
Show resolved Hide resolved

use lightproc::recoverable_handle::RecoverableHandle;
vertexclique marked this conversation as resolved.
Show resolved Hide resolved

fn spawn_on_thread<F, R>(future: F) -> RecoverableHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
lazy_static! {
// A channel that holds scheduled tasks.
static ref QUEUE: Sender<LightProc> = {
let (sender, receiver) = unbounded::<LightProc>();

// Start the executor thread.
thread::spawn(move || {
for proc in receiver {
proc.run();
}
});

sender
};
}

let schedule = |t| QUEUE.send(t).unwrap();
let (proc, handle) = LightProc::recoverable(
future,
schedule,
ProcStack::default()
.with_pid(1)
.with_before_start(|| {
println!("Before start");
})
.with_after_complete(|| {
println!("After complete");
})
.with_after_panic(|| {
println!("After panic");
}),
);

proc.schedule();

handle
}

fn main() {
let handle = spawn_on_thread(async {
panic!("Panic here!");
});

executor::block_on(handle);
}
54 changes: 54 additions & 0 deletions lightproc/examples/proc_run.rs
@@ -0,0 +1,54 @@
//! A function that runs a future to completion on a dedicated thread.

use std::future::Future;
use std::sync::Arc;
use std::thread;

use crossbeam::channel;
use futures::executor;
use lightproc::prelude::*;
vertexclique marked this conversation as resolved.
Show resolved Hide resolved

fn spawn_on_thread<F, R>(fut: F) -> ProcHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (sender, receiver) = channel::unbounded();
let sender = Arc::new(sender);
let s = Arc::downgrade(&sender);

let future = async move {
let _ = sender;
fut.await
};

let schedule = move |t| s.upgrade().unwrap().send(t).unwrap();
let (proc, handle) = LightProc::build(
future,
schedule,
ProcStack::default()
.with_pid(1)
.with_before_start(|| {
println!("Before start");
})
.with_after_complete(|| {
println!("After complete");
}),
);

proc.schedule();

thread::spawn(move || {
for proc in receiver {
proc.run();
}
});
r3v2d0g marked this conversation as resolved.
Show resolved Hide resolved

handle
}

fn main() {
executor::block_on(spawn_on_thread(async {
println!("Hello, world!");
}));
}
37 changes: 37 additions & 0 deletions lightproc/src/catch_unwind.rs
@@ -0,0 +1,37 @@
use pin_utils::unsafe_pinned;
use std::any::Any;
use std::future::Future;
use std::panic::{catch_unwind, AssertUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::task::{Context, Poll};

#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CatchUnwind<Fut>
where
Fut: Future,
{
future: Fut,
}

impl<Fut> CatchUnwind<Fut>
where
Fut: Future + UnwindSafe,
{
unsafe_pinned!(future: Fut);

pub(super) fn new(future: Fut) -> CatchUnwind<Fut> {
CatchUnwind { future }
}
}

impl<Fut> Future for CatchUnwind<Fut>
where
Fut: Future + UnwindSafe,
{
type Output = Result<Fut::Output, Box<dyn Any + Send>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
catch_unwind(AssertUnwindSafe(|| self.future().poll(cx)))?.map(Ok)
}
}
31 changes: 31 additions & 0 deletions lightproc/src/layout_helpers.rs
@@ -0,0 +1,31 @@
use std::alloc::Layout;
use std::io::{Error, ErrorKind};

#[inline]
pub fn extend(layout: Layout, next: Layout) -> (Layout, usize) {
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
let new_align = std::cmp::max(layout.align(), next.align());
let pad = padding_needed_for(layout, next.align());

let offset = layout
.size()
.checked_add(pad)
.ok_or(Error::new(
ErrorKind::Other,
"Padding overflow check failed",
))
.unwrap();
let new_size = offset
.checked_add(next.size())
.ok_or(Error::new(ErrorKind::Other, "New size can't be computed"))
.unwrap();

let layout = Layout::from_size_align(new_size, new_align).unwrap();
(layout, offset)
}

#[inline]
pub fn padding_needed_for(layout: Layout, align: usize) -> usize {
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
let len = layout.size();
let len_rounded_up = len.wrapping_add(align).wrapping_sub(1) & !align.wrapping_sub(1);
len_rounded_up.wrapping_sub(len)
}
18 changes: 18 additions & 0 deletions lightproc/src/lib.rs
@@ -0,0 +1,18 @@
pub mod catch_unwind;
pub mod layout_helpers;
pub mod lightproc;
pub mod proc_data;
pub mod proc_ext;
pub mod proc_handle;
pub mod proc_layout;
pub mod proc_stack;
pub mod proc_vtable;
pub mod raw_proc;
pub mod recoverable_handle;
pub mod state;
vertexclique marked this conversation as resolved.
Show resolved Hide resolved

pub mod prelude {
pub use crate::lightproc::*;
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
pub use crate::proc_handle::*;
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
pub use crate::proc_stack::*;
vertexclique marked this conversation as resolved.
Show resolved Hide resolved
}
122 changes: 122 additions & 0 deletions lightproc/src/lightproc.rs
@@ -0,0 +1,122 @@
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
use std::ptr::NonNull;

use crate::proc_data::ProcData;
use crate::proc_ext::ProcFutureExt;
use crate::proc_handle::ProcHandle;
use crate::proc_stack::*;
use crate::raw_proc::RawProc;
use crate::recoverable_handle::RecoverableHandle;
use std::panic::AssertUnwindSafe;
Comment on lines +1 to +13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use crate::proc_data::ProcData;
use crate::proc_ext::ProcFutureExt;
use crate::proc_handle::ProcHandle;
use crate::proc_stack::*;
use crate::raw_proc::RawProc;
use crate::recoverable_handle::RecoverableHandle;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
use std::panic::AssertUnwindSafe;
use std::ptr::NonNull;


pub struct LightProc {
/// A pointer to the heap-allocated task.
pub(crate) raw_proc: NonNull<()>,
}

unsafe impl Send for LightProc {}
unsafe impl Sync for LightProc {}

impl LightProc {
pub fn recoverable<F, R, S>(
future: F,
schedule: S,
stack: ProcStack,
) -> (LightProc, RecoverableHandle<R>)
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
S: Fn(LightProc) + Send + Sync + 'static,
{
let recovery_future = AssertUnwindSafe(future).catch_unwind();
let (proc, handle) = Self::build(recovery_future, schedule, stack);
(proc, RecoverableHandle(handle))
}

pub fn build<F, R, S>(future: F, schedule: S, stack: ProcStack) -> (LightProc, ProcHandle<R>)
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
S: Fn(LightProc) + Send + Sync + 'static,
{
let raw_task = RawProc::allocate(stack, future, schedule);
let task = LightProc { raw_proc: raw_task };
let handle = ProcHandle {
raw_proc: raw_task,
_marker: PhantomData,
};
(task, handle)
}

pub fn schedule(self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
mem::forget(self);

unsafe {
((*pdata).vtable.schedule)(ptr);
}
}

pub fn run(self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
mem::forget(self);

unsafe {
((*pdata).vtable.run)(ptr);
}
}

pub fn cancel(&self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;

unsafe {
(*pdata).cancel();
}
}

pub fn stack(&self) -> &ProcStack {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn stack(&self) -> &ProcStack {
fn stack(&self) -> &ProcStack {

let offset = ProcData::offset_stack();
let ptr = self.raw_proc.as_ptr();

unsafe {
let raw = (ptr as *mut u8).add(offset) as *const ProcStack;
&*raw
}
}
}

impl Drop for LightProc {
fn drop(&mut self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;

unsafe {
// Cancel the task.
(*pdata).cancel();

// Drop the future.
((*pdata).vtable.drop_future)(ptr);

// Drop the task reference.
((*pdata).vtable.decrement)(ptr);
}
}
}

impl fmt::Debug for LightProc {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;

f.debug_struct("Task")
.field("pdata", unsafe { &(*pdata) })
.field("stack", self.stack())
.finish()
}
}