Skip to content

Commit

Permalink
[GIE/Engine]: add direct executor implementation; (#2234)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmmcq committed Nov 24, 2022
1 parent e071ce4 commit 428c785
Showing 1 changed file with 168 additions and 31 deletions.
199 changes: 168 additions & 31 deletions interactive_engine/executor/engine/pegasus/executor/src/reactor.rs
Expand Up @@ -72,7 +72,7 @@ enum RunTask {
Users(GeneralTask),
}

pub struct ExecutorRuntime {
pub struct PooledExecutorRuntime {
pub max_core: usize,
current_core: usize,
task_rx: Receiver<TaskPackage>,
Expand Down Expand Up @@ -255,14 +255,14 @@ fn work_loop(
);
}

impl ExecutorRuntime {
impl PooledExecutorRuntime {
fn new(core: usize, task_rx: Receiver<TaskPackage>) -> Self {
assert!(core > 0);
let mut in_flows = Vec::with_capacity(core);
for _ in 0..core {
in_flows.push(Arc::new(SegQueue::new()));
}
ExecutorRuntime {
PooledExecutorRuntime {
max_core: core,
current_core: 1,
task_rx,
Expand Down Expand Up @@ -342,15 +342,6 @@ impl ExecutorRuntime {
}
}

#[inline]
fn set_core_size(&mut self, core: usize) {
if core < 1 {
error!("core pool size can't less than 1; keep default;");
} else {
self.max_core = core;
}
}

#[inline]
fn try_fork_new_thread(
&mut self, queue: &WorkStealQueue<RunTask>, not_readies: &Arc<SegQueue<GeneralTask>>,
Expand Down Expand Up @@ -397,17 +388,17 @@ enum TaskPackage {
Batch(Vec<GeneralTask>),
}

pub struct ExecutorProxy {
pub struct PooledExecutorProxy {
task_tx: Sender<TaskPackage>,
}

impl ExecutorProxy {
impl PooledExecutorProxy {
fn new(task_tx: Sender<TaskPackage>) -> Self {
ExecutorProxy { task_tx }
Self { task_tx }
}
}

impl Executor for ExecutorProxy {
impl Executor for PooledExecutorProxy {
fn spawn<T: Task + 'static>(&self, task: T) -> Result<(), RejectError<T>> {
if SHUTDOWN_HOOK.load(Ordering::SeqCst) {
Err(RejectError(task))
Expand Down Expand Up @@ -445,23 +436,166 @@ impl Executor for ExecutorProxy {
}
}

impl Clone for ExecutorProxy {
impl Clone for PooledExecutorProxy {
fn clone(&self) -> Self {
ExecutorProxy { task_tx: self.task_tx.clone() }
PooledExecutorProxy { task_tx: self.task_tx.clone() }
}
}

pub struct DirectExecutorProxy {
seq: AtomicUsize,
tx: Sender<JoinHandle<()>>,
}

impl DirectExecutorProxy {
fn new(tx: Sender<JoinHandle<()>>) -> Self {
Self { seq: AtomicUsize::new(0), tx }
}
}

impl Executor for DirectExecutorProxy {
fn spawn<T: Task + 'static>(&self, mut task: T) -> Result<(), RejectError<T>> {
let seq = self.seq.fetch_add(1, Ordering::SeqCst);
let g = std::thread::Builder::new()
.name(format!("direct_task_{}", seq))
.spawn(move || loop {
match task.check_ready() {
TaskState::Finished => {
break;
}
TaskState::NotReady => {
std::thread::yield_now();
}
TaskState::Ready => match task.execute() {
TaskState::Finished => {
break;
}
TaskState::NotReady => {
std::thread::yield_now();
}
TaskState::Ready => {
continue;
}
},
}
})
.expect("spawn new thread fail;");

self.tx.send(g).ok();
Ok(())
}

fn spawn_batch<T: Task + 'static, I: IntoIterator<Item = T>>(
&self, tasks: I,
) -> Result<(), RejectError<()>> {
for task in tasks {
self.spawn(task).ok();
}
Ok(())
}
}

pub struct DirectExecutorRuntime(Receiver<JoinHandle<()>>);

impl DirectExecutorRuntime {
fn new(rx: Receiver<JoinHandle<()>>) -> Self {
Self(rx)
}

fn start(self) {
std::thread::Builder::new()
.name("direct-boss".into())
.spawn(move || {
while SHUTDOWN_HOOK.load(Ordering::SeqCst) {
match self.0.recv_timeout(Duration::from_secs(1)) {
Ok(g) => {
let name = g
.thread()
.name()
.map(|n| n.to_owned())
.unwrap_or_else(|| "unknown".into());
match g.join() {
Ok(_) => {
debug!("execute {} finished;", name);
}
Err(_err) => {
error!("fail to execute {}; ", name);
// std::panic::resume_unwind(err);
}
}
}
Err(_e) => {
// do nothing and continue;
}
}
}
})
.expect("start direct fail;")
.join()
.ok();
}
}

pub enum ExecutorProxy {
Direct(DirectExecutorProxy),
Pooled(PooledExecutorProxy),
}

impl Executor for ExecutorProxy {
fn spawn<T: Task + 'static>(&self, task: T) -> Result<(), RejectError<T>> {
match self {
ExecutorProxy::Direct(e) => e.spawn(task),
ExecutorProxy::Pooled(e) => e.spawn(task),
}
}

fn spawn_batch<T: Task + 'static, I: IntoIterator<Item = T>>(
&self, tasks: I,
) -> Result<(), RejectError<()>> {
match self {
ExecutorProxy::Direct(e) => e.spawn_batch(tasks),
ExecutorProxy::Pooled(e) => e.spawn_batch(tasks),
}
}
}

pub enum ExecutorRuntime {
Direct(DirectExecutorRuntime),
Pooled(PooledExecutorRuntime),
}

impl ExecutorRuntime {
fn start(self) {
match self {
ExecutorRuntime::Direct(e) => e.start(),
ExecutorRuntime::Pooled(e) => e.start(),
}
}
}

static CORE_POOL_SIZE: &'static str = "PEGASUS_CORE_POOL_SIZE";

pub fn init_executor() -> (Mutex<Option<ExecutorRuntime>>, ExecutorProxy) {
let (tx, rx) = crossbeam_channel::unbounded();
let cpus = num_cpus::get();
let core = ::std::env::var(CORE_POOL_SIZE)
.map(|value| value.parse::<usize>().unwrap_or(cpus))
.unwrap_or(cpus);
let runtime = Mutex::new(Some(ExecutorRuntime::new(core, rx)));
let proxy = ExecutorProxy::new(tx);
(runtime, proxy)
.map(|value| {
value
.parse::<usize>()
.unwrap_or_else(|_| num_cpus::get())
})
.unwrap_or_else(|_| num_cpus::get());
if core > 0 {
let (tx, rx) = crossbeam_channel::unbounded();
let executor = PooledExecutorRuntime::new(core, rx);
let runtime = Mutex::new(Some(ExecutorRuntime::Pooled(executor)));
let proxy = ExecutorProxy::Pooled(PooledExecutorProxy::new(tx));
(runtime, proxy)
} else {
let (tx, rx) = crossbeam_channel::unbounded();
let executor = DirectExecutorRuntime::new(rx);
let runtime = Mutex::new(Some(ExecutorRuntime::Direct(executor)));
let proxy = ExecutorProxy::Direct(DirectExecutorProxy::new(tx));
(runtime, proxy)
}
}

struct ExecutorGuard {
Expand Down Expand Up @@ -515,6 +649,8 @@ fn start_executor() {
} else {
error!("Global executor runtime is already started;");
}
} else {
error!("can't start executor as the executor is in shutdown, try start later;")
}
}

Expand Down Expand Up @@ -586,12 +722,13 @@ pub fn is_shutdown() -> bool {
/// It is recommended that use the environment variable `export PEGASUS_CORE_POOL_SIZE = n` to
/// set the thread size;
pub fn set_core_pool_size(core: usize) {
let mut lock = EXECUTOR.0.lock().expect("Executor lock poison");
if let Some(executor) = lock.as_mut() {
executor.set_core_size(core);
} else {
error!("Global executor runtime is already started;");
}
std::env::set_var(CORE_POOL_SIZE, core.to_string());
// let mut lock = EXECUTOR.0.lock().expect("Executor lock poison");
// if let Some(executor) = lock.as_mut() {
// executor.set_core_size(core);
// } else {
// error!("Global executor runtime is already started;");
// }
}

/// Spawn a new task to the reactor [`Executor`];
Expand Down

0 comments on commit 428c785

Please sign in to comment.