diff --git a/interactive_engine/executor/engine/pegasus/executor/src/reactor.rs b/interactive_engine/executor/engine/pegasus/executor/src/reactor.rs index f9ea6143bea6..f0c7ea5672ce 100644 --- a/interactive_engine/executor/engine/pegasus/executor/src/reactor.rs +++ b/interactive_engine/executor/engine/pegasus/executor/src/reactor.rs @@ -72,7 +72,7 @@ enum RunTask { Users(GeneralTask), } -pub struct ExecutorRuntime { +pub struct PooledExecutorRuntime { pub max_core: usize, current_core: usize, task_rx: Receiver, @@ -255,14 +255,14 @@ fn work_loop( ); } -impl ExecutorRuntime { +impl PooledExecutorRuntime { fn new(core: usize, task_rx: Receiver) -> Self { assert!(core > 0); let mut in_flows = Vec::with_capacity(core); for _ in 0..core { in_flows.push(Arc::new(SegQueue::new())); } - ExecutorRuntime { + PooledExecutorRuntime { max_core: core, current_core: 1, task_rx, @@ -342,15 +342,6 @@ impl ExecutorRuntime { } } - #[inline] - fn set_core_size(&mut self, core: usize) { - if core < 1 { - error!("core pool size can't less than 1; keep default;"); - } else { - self.max_core = core; - } - } - #[inline] fn try_fork_new_thread( &mut self, queue: &WorkStealQueue, not_readies: &Arc>, @@ -397,17 +388,17 @@ enum TaskPackage { Batch(Vec), } -pub struct ExecutorProxy { +pub struct PooledExecutorProxy { task_tx: Sender, } -impl ExecutorProxy { +impl PooledExecutorProxy { fn new(task_tx: Sender) -> Self { - ExecutorProxy { task_tx } + Self { task_tx } } } -impl Executor for ExecutorProxy { +impl Executor for PooledExecutorProxy { fn spawn(&self, task: T) -> Result<(), RejectError> { if SHUTDOWN_HOOK.load(Ordering::SeqCst) { Err(RejectError(task)) @@ -445,23 +436,166 @@ impl Executor for ExecutorProxy { } } -impl Clone for ExecutorProxy { +impl Clone for PooledExecutorProxy { fn clone(&self) -> Self { - ExecutorProxy { task_tx: self.task_tx.clone() } + PooledExecutorProxy { task_tx: self.task_tx.clone() } + } +} + +pub struct DirectExecutorProxy { + seq: AtomicUsize, + tx: Sender>, +} + +impl DirectExecutorProxy { + fn new(tx: Sender>) -> Self { + Self { seq: AtomicUsize::new(0), tx } + } +} + +impl Executor for DirectExecutorProxy { + fn spawn(&self, mut task: T) -> Result<(), RejectError> { + let seq = self.seq.fetch_add(1, Ordering::SeqCst); + let g = std::thread::Builder::new() + .name(format!("direct_task_{}", seq)) + .spawn(move || loop { + match task.check_ready() { + TaskState::Finished => { + break; + } + TaskState::NotReady => { + std::thread::yield_now(); + } + TaskState::Ready => match task.execute() { + TaskState::Finished => { + break; + } + TaskState::NotReady => { + std::thread::yield_now(); + } + TaskState::Ready => { + continue; + } + }, + } + }) + .expect("spawn new thread fail;"); + + self.tx.send(g).ok(); + Ok(()) + } + + fn spawn_batch>( + &self, tasks: I, + ) -> Result<(), RejectError<()>> { + for task in tasks { + self.spawn(task).ok(); + } + Ok(()) + } +} + +pub struct DirectExecutorRuntime(Receiver>); + +impl DirectExecutorRuntime { + fn new(rx: Receiver>) -> Self { + Self(rx) + } + + fn start(self) { + std::thread::Builder::new() + .name("direct-boss".into()) + .spawn(move || { + while SHUTDOWN_HOOK.load(Ordering::SeqCst) { + match self.0.recv_timeout(Duration::from_secs(1)) { + Ok(g) => { + let name = g + .thread() + .name() + .map(|n| n.to_owned()) + .unwrap_or_else(|| "unknown".into()); + match g.join() { + Ok(_) => { + debug!("execute {} finished;", name); + } + Err(_err) => { + error!("fail to execute {}; ", name); + // std::panic::resume_unwind(err); + } + } + } + Err(_e) => { + // do nothing and continue; + } + } + } + }) + .expect("start direct fail;") + .join() + .ok(); + } +} + +pub enum ExecutorProxy { + Direct(DirectExecutorProxy), + Pooled(PooledExecutorProxy), +} + +impl Executor for ExecutorProxy { + fn spawn(&self, task: T) -> Result<(), RejectError> { + match self { + ExecutorProxy::Direct(e) => e.spawn(task), + ExecutorProxy::Pooled(e) => e.spawn(task), + } + } + + fn spawn_batch>( + &self, tasks: I, + ) -> Result<(), RejectError<()>> { + match self { + ExecutorProxy::Direct(e) => e.spawn_batch(tasks), + ExecutorProxy::Pooled(e) => e.spawn_batch(tasks), + } + } +} + +pub enum ExecutorRuntime { + Direct(DirectExecutorRuntime), + Pooled(PooledExecutorRuntime), +} + +impl ExecutorRuntime { + fn start(self) { + match self { + ExecutorRuntime::Direct(e) => e.start(), + ExecutorRuntime::Pooled(e) => e.start(), + } } } static CORE_POOL_SIZE: &'static str = "PEGASUS_CORE_POOL_SIZE"; pub fn init_executor() -> (Mutex>, ExecutorProxy) { - let (tx, rx) = crossbeam_channel::unbounded(); - let cpus = num_cpus::get(); let core = ::std::env::var(CORE_POOL_SIZE) - .map(|value| value.parse::().unwrap_or(cpus)) - .unwrap_or(cpus); - let runtime = Mutex::new(Some(ExecutorRuntime::new(core, rx))); - let proxy = ExecutorProxy::new(tx); - (runtime, proxy) + .map(|value| { + value + .parse::() + .unwrap_or_else(|_| num_cpus::get()) + }) + .unwrap_or_else(|_| num_cpus::get()); + if core > 0 { + let (tx, rx) = crossbeam_channel::unbounded(); + let executor = PooledExecutorRuntime::new(core, rx); + let runtime = Mutex::new(Some(ExecutorRuntime::Pooled(executor))); + let proxy = ExecutorProxy::Pooled(PooledExecutorProxy::new(tx)); + (runtime, proxy) + } else { + let (tx, rx) = crossbeam_channel::unbounded(); + let executor = DirectExecutorRuntime::new(rx); + let runtime = Mutex::new(Some(ExecutorRuntime::Direct(executor))); + let proxy = ExecutorProxy::Direct(DirectExecutorProxy::new(tx)); + (runtime, proxy) + } } struct ExecutorGuard { @@ -515,6 +649,8 @@ fn start_executor() { } else { error!("Global executor runtime is already started;"); } + } else { + error!("can't start executor as the executor is in shutdown, try start later;") } } @@ -586,12 +722,13 @@ pub fn is_shutdown() -> bool { /// It is recommended that use the environment variable `export PEGASUS_CORE_POOL_SIZE = n` to /// set the thread size; pub fn set_core_pool_size(core: usize) { - let mut lock = EXECUTOR.0.lock().expect("Executor lock poison"); - if let Some(executor) = lock.as_mut() { - executor.set_core_size(core); - } else { - error!("Global executor runtime is already started;"); - } + std::env::set_var(CORE_POOL_SIZE, core.to_string()); + // let mut lock = EXECUTOR.0.lock().expect("Executor lock poison"); + // if let Some(executor) = lock.as_mut() { + // executor.set_core_size(core); + // } else { + // error!("Global executor runtime is already started;"); + // } } /// Spawn a new task to the reactor [`Executor`];