Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ fn main() {
- [Coroutine Overview](core/docs/en/coroutine.md)
- [Scalable Stack Overview](core/docs/en/scalable-stack.md)
- [Monitor Overview](core/docs/en/monitor.md)
- [Work Steal Overview](core/docs/en/work-steal.md)
- [Ordered Work Steal Overview](core/docs/en/ordered-work-steal.md)
- [Coroutine Pool Overview](core/docs/en/coroutine-pool.md)

[我有故事,你有酒吗?](https://github.com/acl-dev/open-coroutine-docs)

Expand Down
110 changes: 110 additions & 0 deletions core/docs/en/coroutine-pool.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
---
title: Coroutine Pool Overview
date: 2025-01-18 10:00:00
author: loongs-zhang
---

# Coroutine Pool Overview

## Usage

```rust
use open_coroutine_core::co_pool::CoroutinePool;

fn main() -> std::io::Result<()> {
let mut pool = CoroutinePool::default();
assert!(pool.is_empty());
pool.submit_task(
Some(String::from(task_name)),
|_| {
println!("Hello, world!");
Some(2)
},
None,
None,
)?;
assert!(!pool.is_empty());
pool.try_schedule_task()
}
```

## Why coroutine pool?

Pooling the coroutines can bring several significant advantages:

1. Resource management: The coroutine pool can manage the creation, destruction, and reuse of coroutines. By using a
coroutine pool, a certain number of coroutines can be created in advance and stored in the pool for use when needed.
This can avoid frequent creation and destruction of coroutines, reduce unnecessary resource waste, and improve system
performance.

2. Avoid coroutine hunger: When using a coroutine pool, coroutines will be continuously provided with tasks, avoiding
the situation where coroutines are idle after completing tasks.

3. Concurrency control: By setting the parameters of the coroutine pool, the number of concurrent coroutines can be
limited to avoid overloading the system due to too many coroutines.

4. Improve code maintainability: Using coroutine pools can separate task execution from coroutine management, making the
code clearer and more maintainable. The execution logic of a task can be focused on the task itself, while the
creation and management of coroutines are handled by the coroutine pool.

## How it works

In open-coroutine-core, the coroutine pool is lazy, which means if you don't call `try_timeout_schedule_task`, tasks
will not be executed. Please refer to the sequence diagram below for details:

```mermaid
sequenceDiagram
Actor Schedule Thread
participant CoroutinePool
participant WorkerCoroutine
participant Task
participant CoroutineCreator

Schedule Thread ->>+ CoroutinePool: CoroutinePool::try_timeout_schedule_task
alt the coroutine pool is stopped
CoroutinePool ->>+ Schedule Thread: return error
end
alt the task queue in the coroutine pool is empty
CoroutinePool ->>+ Schedule Thread: return success
end
alt create worker coroutines
CoroutinePool ->>+ WorkerCoroutine: create worker coroutines only if the coroutine pool has not reached its maximum pool size
end
CoroutinePool ->>+ WorkerCoroutine: schedule the worker coroutines
alt run tasks
WorkerCoroutine ->>+ Task: try poll a task
alt poll success
Task ->>+ Task: run the task
alt in execution
Task ->>+ WorkerCoroutine: be preempted or enter syscall
WorkerCoroutine ->>+ WorkerCoroutine: The coroutine state changes to Suspend/Syscall
WorkerCoroutine ->>+ CoroutineCreator: Listener::on_state_changed
CoroutineCreator ->>+ WorkerCoroutine: create worker coroutines only if the coroutine pool has not reached its maximum pool size
end
alt run success
Task ->>+ WorkerCoroutine: Task exited normally
end
alt run fail
Task ->>+ WorkerCoroutine: Task exited abnormally
WorkerCoroutine ->>+ WorkerCoroutine: The coroutine state changes to Error
WorkerCoroutine ->>+ CoroutineCreator: Listener::on_state_changed
CoroutineCreator ->>+ CoroutineCreator: reduce the current coroutine count
CoroutineCreator ->>+ WorkerCoroutine: recreate worker coroutine only if the coroutine pool has not reached its maximum pool size
end
end
alt poll fail
Task ->>+ WorkerCoroutine: increase count and yield to the next coroutine
WorkerCoroutine ->>+ WorkerCoroutine: block for a while if the count has reached the current size of coroutine pool
end
WorkerCoroutine ->>+ WorkerCoroutine: try poll the next task
end
alt recycle coroutines
WorkerCoroutine ->>+ WorkerCoroutine: the schedule has exceeded the timeout time
WorkerCoroutine ->>+ CoroutinePool: has the coroutine pool exceeded the minimum pool size?
CoroutinePool ->>+ WorkerCoroutine: yes
WorkerCoroutine ->>+ WorkerCoroutine: exit
end
WorkerCoroutine ->>+ CoroutinePool: return if timeout or schedule fail
CoroutinePool ->>+ Schedule Thread: This schedule has ended
Schedule Thread ->>+ Schedule Thread: ......
```
10 changes: 5 additions & 5 deletions core/docs/en/monitor.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ fn main() -> std::io::Result<()> {
}
```

## What is monitor?

The `monitor` mod implements the `preemptive` feature for open-coroutine, which allows the coroutine to be preempted
when it is running for a long time.

## Why preempt?

After a `Coroutine::resume_with`, a coroutine may occupy the scheduling thread for a long time, thereby slowing down
Expand All @@ -53,6 +48,11 @@ automatically suspends coroutines that are stuck in long-term execution and allo
The coroutine occupies scheduling threads for a long time in two scenarios: getting stuck in heavy computing or syscall.
The following only solves the problem of getting stuck in heavy computing.

## What is monitor?

The `monitor` mod implements the `preemptive` feature for open-coroutine, which allows the coroutine to be preempted
when it is running for a long time.

## How it works

```mermaid
Expand Down
7 changes: 5 additions & 2 deletions core/docs/en/ordered-work-steal.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ priority, the earlier it will be popped up.
## What is ordered work steal queue?

An ordered work steal queue consists of a global queue and multiple local queues, the global queue is unbounded, while
the local queue has a bounded SkipList with collections. To ensure high performance, the number of local queues is
usually equal to the number of threads.
the local queue has a bounded `SkipList` with collections. To ensure high performance, the number of local queues is
usually equal to the number of threads. I's worth mentioning that if all threads prioritize local tasks, there will be
an extreme situation where tasks on the shared queue will never have a chance to be scheduled. To avoid this imbalance,
refer to goroutine, every time a thread has scheduled 60 tasks from the local queue, it will be forced to pop the
`highest priority task` from the shared queue.

## How `push` works

Expand Down
3 changes: 3 additions & 0 deletions core/docs/en/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ replacement for IO thread pools, see [why better](../en/why-better.md).
- [Coroutine Overview](../en/coroutine.md)
- [Scalable Stack Overview](../en/scalable-stack.md)
- [Monitor Overview](../en/monitor.md)
- [Work Steal Overview](../en/work-steal.md)
- [Ordered Work Steal Overview](../en/ordered-work-steal.md)
- [Coroutine Pool Overview](../en/coroutine-pool.md)
4 changes: 3 additions & 1 deletion core/docs/en/work-steal.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ better to let them help other threads work.

A work steal queue consists of a global queue and multiple local queues, the global queue is unbounded, while the local
queue has a bounded RingBuffer. To ensure high performance, the number of local queues is usually equal to the number of
threads.
threads. I's worth mentioning that if all threads prioritize local tasks, there will be an extreme situation where tasks
on the shared queue will never have a chance to be scheduled. To avoid this imbalance, refer to goroutine, every time a
thread has scheduled 60 tasks from the local queue, it will be forced to pop a task from the shared queue.

## How `push` works

Expand Down
1 change: 1 addition & 0 deletions core/src/co_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct CoroutinePool<'p> {
//协程池状态
state: Cell<PoolState>,
//任务队列
#[doc = include_str!("../../docs/en/ordered-work-steal.md")]
task_queue: OrderedLocalQueue<'p, Task<'p>>,
//工作协程组
workers: Scheduler<'p>,
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ mod monitor;
pub mod scheduler;

/// Coroutine pool abstraction and impl.
#[doc = include_str!("../docs/en/coroutine-pool.md")]
pub mod co_pool;

/// net abstraction and impl.
Expand Down
1 change: 1 addition & 0 deletions core/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub struct Scheduler<'s> {
name: String,
stack_size: AtomicUsize,
listeners: VecDeque<&'s dyn Listener<(), Option<usize>>>,
#[doc = include_str!("../docs/en/ordered-work-steal.md")]
ready: OrderedLocalQueue<'s, SchedulableCoroutine<'s>>,
suspend: BinaryHeap<SuspendItem<'s>>,
syscall: DashMap<&'s str, SchedulableCoroutine<'s>>,
Expand Down
Loading