diff --git a/README.md b/README.md index c97f03c3..497c986e 100644 --- a/README.md +++ b/README.md @@ -189,6 +189,9 @@ fn main() { - [Coroutine Overview](core/docs/en/coroutine.md) - [Scalable Stack Overview](core/docs/en/scalable-stack.md) - [Monitor Overview](core/docs/en/monitor.md) +- [Work Steal Overview](core/docs/en/work-steal.md) +- [Ordered Work Steal Overview](core/docs/en/ordered-work-steal.md) +- [Coroutine Pool Overview](core/docs/en/coroutine-pool.md) [我有故事,你有酒吗?](https://github.com/acl-dev/open-coroutine-docs) diff --git a/core/docs/en/coroutine-pool.md b/core/docs/en/coroutine-pool.md new file mode 100644 index 00000000..6d5bb33d --- /dev/null +++ b/core/docs/en/coroutine-pool.md @@ -0,0 +1,110 @@ +--- +title: Coroutine Pool Overview +date: 2025-01-18 10:00:00 +author: loongs-zhang +--- + +# Coroutine Pool Overview + +## Usage + +```rust +use open_coroutine_core::co_pool::CoroutinePool; + +fn main() -> std::io::Result<()> { + let mut pool = CoroutinePool::default(); + assert!(pool.is_empty()); + pool.submit_task( + Some(String::from(task_name)), + |_| { + println!("Hello, world!"); + Some(2) + }, + None, + None, + )?; + assert!(!pool.is_empty()); + pool.try_schedule_task() +} +``` + +## Why coroutine pool? + +Pooling the coroutines can bring several significant advantages: + +1. Resource management: The coroutine pool can manage the creation, destruction, and reuse of coroutines. By using a + coroutine pool, a certain number of coroutines can be created in advance and stored in the pool for use when needed. + This can avoid frequent creation and destruction of coroutines, reduce unnecessary resource waste, and improve system + performance. + +2. Avoid coroutine hunger: When using a coroutine pool, coroutines will be continuously provided with tasks, avoiding + the situation where coroutines are idle after completing tasks. + +3. Concurrency control: By setting the parameters of the coroutine pool, the number of concurrent coroutines can be + limited to avoid overloading the system due to too many coroutines. + +4. Improve code maintainability: Using coroutine pools can separate task execution from coroutine management, making the + code clearer and more maintainable. The execution logic of a task can be focused on the task itself, while the + creation and management of coroutines are handled by the coroutine pool. + +## How it works + +In open-coroutine-core, the coroutine pool is lazy, which means if you don't call `try_timeout_schedule_task`, tasks +will not be executed. Please refer to the sequence diagram below for details: + +```mermaid +sequenceDiagram + Actor Schedule Thread + participant CoroutinePool + participant WorkerCoroutine + participant Task + participant CoroutineCreator + + Schedule Thread ->>+ CoroutinePool: CoroutinePool::try_timeout_schedule_task + alt the coroutine pool is stopped + CoroutinePool ->>+ Schedule Thread: return error + end + alt the task queue in the coroutine pool is empty + CoroutinePool ->>+ Schedule Thread: return success + end + alt create worker coroutines + CoroutinePool ->>+ WorkerCoroutine: create worker coroutines only if the coroutine pool has not reached its maximum pool size + end + CoroutinePool ->>+ WorkerCoroutine: schedule the worker coroutines + alt run tasks + WorkerCoroutine ->>+ Task: try poll a task + alt poll success + Task ->>+ Task: run the task + alt in execution + Task ->>+ WorkerCoroutine: be preempted or enter syscall + WorkerCoroutine ->>+ WorkerCoroutine: The coroutine state changes to Suspend/Syscall + WorkerCoroutine ->>+ CoroutineCreator: Listener::on_state_changed + CoroutineCreator ->>+ WorkerCoroutine: create worker coroutines only if the coroutine pool has not reached its maximum pool size + end + alt run success + Task ->>+ WorkerCoroutine: Task exited normally + end + alt run fail + Task ->>+ WorkerCoroutine: Task exited abnormally + WorkerCoroutine ->>+ WorkerCoroutine: The coroutine state changes to Error + WorkerCoroutine ->>+ CoroutineCreator: Listener::on_state_changed + CoroutineCreator ->>+ CoroutineCreator: reduce the current coroutine count + CoroutineCreator ->>+ WorkerCoroutine: recreate worker coroutine only if the coroutine pool has not reached its maximum pool size + end + end + alt poll fail + Task ->>+ WorkerCoroutine: increase count and yield to the next coroutine + WorkerCoroutine ->>+ WorkerCoroutine: block for a while if the count has reached the current size of coroutine pool + end + WorkerCoroutine ->>+ WorkerCoroutine: try poll the next task + end + alt recycle coroutines + WorkerCoroutine ->>+ WorkerCoroutine: the schedule has exceeded the timeout time + WorkerCoroutine ->>+ CoroutinePool: has the coroutine pool exceeded the minimum pool size? + CoroutinePool ->>+ WorkerCoroutine: yes + WorkerCoroutine ->>+ WorkerCoroutine: exit + end + WorkerCoroutine ->>+ CoroutinePool: return if timeout or schedule fail + CoroutinePool ->>+ Schedule Thread: This schedule has ended + Schedule Thread ->>+ Schedule Thread: ...... +``` diff --git a/core/docs/en/monitor.md b/core/docs/en/monitor.md index 3102d39b..12f3df94 100644 --- a/core/docs/en/monitor.md +++ b/core/docs/en/monitor.md @@ -39,11 +39,6 @@ fn main() -> std::io::Result<()> { } ``` -## What is monitor? - -The `monitor` mod implements the `preemptive` feature for open-coroutine, which allows the coroutine to be preempted -when it is running for a long time. - ## Why preempt? After a `Coroutine::resume_with`, a coroutine may occupy the scheduling thread for a long time, thereby slowing down @@ -53,6 +48,11 @@ automatically suspends coroutines that are stuck in long-term execution and allo The coroutine occupies scheduling threads for a long time in two scenarios: getting stuck in heavy computing or syscall. The following only solves the problem of getting stuck in heavy computing. +## What is monitor? + +The `monitor` mod implements the `preemptive` feature for open-coroutine, which allows the coroutine to be preempted +when it is running for a long time. + ## How it works ```mermaid diff --git a/core/docs/en/ordered-work-steal.md b/core/docs/en/ordered-work-steal.md index 95b973e7..000feeeb 100644 --- a/core/docs/en/ordered-work-steal.md +++ b/core/docs/en/ordered-work-steal.md @@ -22,8 +22,11 @@ priority, the earlier it will be popped up. ## What is ordered work steal queue? An ordered work steal queue consists of a global queue and multiple local queues, the global queue is unbounded, while -the local queue has a bounded SkipList with collections. To ensure high performance, the number of local queues is -usually equal to the number of threads. +the local queue has a bounded `SkipList` with collections. To ensure high performance, the number of local queues is +usually equal to the number of threads. I's worth mentioning that if all threads prioritize local tasks, there will be +an extreme situation where tasks on the shared queue will never have a chance to be scheduled. To avoid this imbalance, +refer to goroutine, every time a thread has scheduled 60 tasks from the local queue, it will be forced to pop the +`highest priority task` from the shared queue. ## How `push` works diff --git a/core/docs/en/overview.md b/core/docs/en/overview.md index e7d0081a..bba06adf 100644 --- a/core/docs/en/overview.md +++ b/core/docs/en/overview.md @@ -26,3 +26,6 @@ replacement for IO thread pools, see [why better](../en/why-better.md). - [Coroutine Overview](../en/coroutine.md) - [Scalable Stack Overview](../en/scalable-stack.md) - [Monitor Overview](../en/monitor.md) +- [Work Steal Overview](../en/work-steal.md) +- [Ordered Work Steal Overview](../en/ordered-work-steal.md) +- [Coroutine Pool Overview](../en/coroutine-pool.md) diff --git a/core/docs/en/work-steal.md b/core/docs/en/work-steal.md index 96281690..f007ded4 100644 --- a/core/docs/en/work-steal.md +++ b/core/docs/en/work-steal.md @@ -22,7 +22,9 @@ better to let them help other threads work. A work steal queue consists of a global queue and multiple local queues, the global queue is unbounded, while the local queue has a bounded RingBuffer. To ensure high performance, the number of local queues is usually equal to the number of -threads. +threads. I's worth mentioning that if all threads prioritize local tasks, there will be an extreme situation where tasks +on the shared queue will never have a chance to be scheduled. To avoid this imbalance, refer to goroutine, every time a +thread has scheduled 60 tasks from the local queue, it will be forced to pop a task from the shared queue. ## How `push` works diff --git a/core/src/co_pool/mod.rs b/core/src/co_pool/mod.rs index 2c5067d8..79a552ff 100644 --- a/core/src/co_pool/mod.rs +++ b/core/src/co_pool/mod.rs @@ -32,6 +32,7 @@ pub struct CoroutinePool<'p> { //协程池状态 state: Cell, //任务队列 + #[doc = include_str!("../../docs/en/ordered-work-steal.md")] task_queue: OrderedLocalQueue<'p, Task<'p>>, //工作协程组 workers: Scheduler<'p>, diff --git a/core/src/lib.rs b/core/src/lib.rs index 4293ed54..67990812 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -71,6 +71,7 @@ mod monitor; pub mod scheduler; /// Coroutine pool abstraction and impl. +#[doc = include_str!("../docs/en/coroutine-pool.md")] pub mod co_pool; /// net abstraction and impl. diff --git a/core/src/scheduler.rs b/core/src/scheduler.rs index eb9067b9..2970c34e 100644 --- a/core/src/scheduler.rs +++ b/core/src/scheduler.rs @@ -85,6 +85,7 @@ pub struct Scheduler<'s> { name: String, stack_size: AtomicUsize, listeners: VecDeque<&'s dyn Listener<(), Option>>, + #[doc = include_str!("../docs/en/ordered-work-steal.md")] ready: OrderedLocalQueue<'s, SchedulableCoroutine<'s>>, suspend: BinaryHeap>, syscall: DashMap<&'s str, SchedulableCoroutine<'s>>,