Skip to content

Commit

Permalink
Merge pull request #50 from bastion-rs/lightproc-dev
Browse files Browse the repository at this point in the history
Lightproc dev
  • Loading branch information
vertexclique committed Oct 20, 2019
2 parents a846834 + e2ec8e9 commit 1b0f39b
Show file tree
Hide file tree
Showing 13 changed files with 1,144 additions and 205 deletions.
5 changes: 5 additions & 0 deletions lightproc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,9 @@ categories = []
edition = "2018"

[dependencies]
crossbeam-utils = "0.6.6"
rustc-hash = "1.0.1"

[dev-dependencies]
crossbeam = "0.7.1"
futures-preview = "0.3.0-alpha.17"
47 changes: 47 additions & 0 deletions lightproc/examples/proc_run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//! A function that runs a future to completion on a dedicated thread.

use std::future::Future;
use std::sync::Arc;
use std::thread;

use crossbeam::channel;
use futures::executor;
use lightproc::prelude::*;

fn spawn_on_thread<F, R>(fut: F) -> ProcHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (sender, receiver) = channel::unbounded();
let sender = Arc::new(sender);
let s = Arc::downgrade(&sender);

let future = async move {
let _ = sender;
fut.await
};

let schedule = move |t| s.upgrade().unwrap().send(t).unwrap();
let (proc, handle) = LightProc::build(
future,
schedule,
ProcStack::default()
);

proc.schedule();

thread::spawn(move || {
for proc in receiver {
proc.run();
}
});

handle
}

fn main() {
executor::block_on(spawn_on_thread(async {
println!("Hello, world!");
}));
}
45 changes: 0 additions & 45 deletions lightproc/examples/task_build.rs

This file was deleted.

5 changes: 4 additions & 1 deletion lightproc/src/layout_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ pub fn extend(layout: Layout, next: Layout) -> (Layout, usize) {
.unwrap();
let new_size = offset
.checked_add(next.size())
.ok_or(Error::new(ErrorKind::Other, "New size can't be computed"))
.ok_or(Error::new(
ErrorKind::Other,
"New size can't be computed",
))
.unwrap();

let layout = Layout::from_size_align(new_size, new_align).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion lightproc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ pub mod state;

pub mod prelude {
pub use crate::lightproc::*;
pub use crate::proc_layout::*;
pub use crate::stack::*;
pub use crate::proc_handle::*;
}
164 changes: 71 additions & 93 deletions lightproc/src/lightproc.rs
Original file line number Diff line number Diff line change
@@ -1,129 +1,107 @@
use std::alloc;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData as marker;
use std::marker::PhantomData;
use std::mem;
use std::ptr::NonNull;

use crate::proc_layout::ProcLayout;

use crate::layout_helpers::extend;

use std::alloc::Layout;
use crate::proc_data::ProcData;
use crate::raw_proc::RawProc;
use crate::proc_handle::ProcHandle;
use crate::stack::ProcStack;
use crate::stack::*;

#[derive(Debug)]
pub struct LightProc<T> {
pub(crate) raw_proc: NonNull<()>,

pub(crate) proc_layout: ProcLayout,
pub(crate) _private: marker<T>,
pub struct LightProc {
/// A pointer to the heap-allocated task.
pub(crate) raw_proc: NonNull<()>,
}

unsafe impl<T> Send for LightProc<T> {}
unsafe impl<T> Sync for LightProc<T> {}
unsafe impl Send for LightProc {}
unsafe impl Sync for LightProc {}

impl<T> LightProc<T> {
pub fn new() -> LightProc<T> {
let proc_layout = ProcLayout::default();
impl LightProc {
pub fn build<F, R, S>(future: F, schedule: S, stack: ProcStack) -> (LightProc, ProcHandle<R>)
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
S: Fn(LightProc) + Send + Sync + 'static
{
let raw_task = RawProc::<F, R, S>::allocate(stack, future, schedule);
let task = LightProc {
raw_proc: raw_task
};
let handle = ProcHandle {
raw_proc: raw_task,
_marker: PhantomData,
};
(task, handle)
}

pub fn schedule(self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
mem::forget(self);

unsafe {
LightProc {
raw_proc: NonNull::new(alloc::alloc(proc_layout.layout) as *mut ()).unwrap(),
proc_layout,
_private: marker,
}
((*pdata).vtable.schedule)(ptr);
}
}

pub fn with_future<F, R>(mut self, f: F) -> Self
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let fut_mem = Layout::new::<F>();
let (new_layout, offset_f) = extend(self.proc_layout.layout, fut_mem);
self.proc_layout.offset_table.insert("future", offset_f);

self.reallocate(new_layout);

let rawp =
RawProc::<F, R, usize, usize>::from_ptr(
self.raw_proc.as_ptr(), &self.proc_layout);
pub fn run(self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;
mem::forget(self);

unsafe {
rawp.future.write(f);
((*pdata).vtable.run)(ptr);
}

self
}

pub fn with_schedule<S>(mut self, s: S) -> Self
where
S: Fn(LightProc<T>) + Send + Sync + 'static,
T: Send + 'static,
{
let sched_mem = Layout::new::<S>();
let (new_layout, offset_s) = extend(self.proc_layout.layout, sched_mem);
self.proc_layout.offset_table.insert("schedule", offset_s);

self.reallocate(new_layout);

let rawp =
RawProc::<usize, usize, S, T>::from_ptr(
self.raw_proc.as_ptr(), &self.proc_layout);
pub fn cancel(&self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;

unsafe {
(rawp.schedule as *mut S).write(s);
(*pdata).cancel();
}

self
}

pub fn with_stack(mut self, st: T) -> Self
where T: Send + 'static,
{
let stack_mem = Layout::new::<T>();
let (new_layout, offset_st) = extend(self.proc_layout.layout, stack_mem);
self.proc_layout.offset_table.insert("stack", offset_st);

self.reallocate(new_layout);

let rawp =
RawProc::<usize, usize, usize, T>::from_ptr(
self.raw_proc.as_ptr(), &self.proc_layout);
pub fn stack(&self) -> &ProcStack {
let offset = ProcData::offset_stack();
let ptr = self.raw_proc.as_ptr();

unsafe {
rawp.stack.write(st);
let raw = (ptr as *mut u8).add(offset) as *const ProcStack;
&*raw
}

self
}
}

pub fn returning<R>(mut self) -> (LightProc<T>, ProcHandle<R, T>) {
let raw_proc = self.raw_proc;
let proc = LightProc {
raw_proc,
proc_layout: self.proc_layout,
_private: marker,
};
let handle = ProcHandle {
raw_proc,
_private: marker,
};
(proc, handle)
}
impl Drop for LightProc {
fn drop(&mut self) {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;

fn reallocate(&mut self, added: Layout) {
unsafe {
let pointer = alloc::realloc(
self.raw_proc.as_ptr() as *mut u8,
self.proc_layout.layout,
added.size(),
);
self.raw_proc = NonNull::new(pointer as *mut ()).unwrap()
// Cancel the task.
(*pdata).cancel();

// Drop the future.
((*pdata).vtable.drop_future)(ptr);

// Drop the task reference.
((*pdata).vtable.decrement)(ptr);
}
}
}

impl fmt::Debug for LightProc {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ptr = self.raw_proc.as_ptr();
let pdata = ptr as *const ProcData;

self.proc_layout.layout = added;
f.debug_struct("Task")
.field("pdata", unsafe { &(*pdata) })
.field("stack", self.stack())
.finish()
}
}

0 comments on commit 1b0f39b

Please sign in to comment.