diff --git a/README.md b/README.md index 7fb25fd1..3effbf95 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ [![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/acl-dev/open-coroutine.svg)](http://isitmaintained.com/project/acl-dev/open-coroutine "Average time to resolve an issue") [![Percentage of issues still open](http://isitmaintained.com/badge/open/acl-dev/open-coroutine.svg)](http://isitmaintained.com/project/acl-dev/open-coroutine "Percentage of issues still open") -The `open-coroutine` is a simple, efficient and generic stackful-coroutine library. +The `open-coroutine` is a simple, efficient and generic stackfull-coroutine library, you can use this as a performance replacement for IO thread pools. English | [中文](README_ZH.md) @@ -25,11 +25,79 @@ English | [中文](README_ZH.md) ## 🕊 Roadmap +- [ ] add docs; +- [ ] add performance [benchmark](https://github.com/TechEmpower/FrameworkBenchmarks/wiki/Project-Information-Framework-Tests-Overview); - [ ] cancel coroutine/task; - [ ] add metrics; - [ ] add synchronization toolkit; - [ ] support and compatibility for AF_XDP socket; +## 🏠 Architecture + +```mermaid +graph TD + subgraph ApplicationFramework + Tower + Actix-Web + Rocket + warp + axum + end + subgraph MessageQueue + RocketMQ + Pulsar + end + subgraph RemoteProcedureCall + Dubbo + Tonic + gRPC-rs + Volo + end + subgraph Database + MySQL + Oracle + end + subgraph NetworkFramework + Tokio + monoio + async-std + smol + end + subgraph open-coroutine-architecture + subgraph core + Preemptive + ScalableStack + WorkSteal + Priority + end + subgraph hook + HookSyscall + end + subgraph macros + open-coroutine::main + end + subgraph open-coroutine + end + hook -->|depends on| core + open-coroutine -->|depends on| hook + open-coroutine -->|depends on| macros + end + subgraph OperationSystem + Linux + macOS + Windows + end + ApplicationFramework -->|maybe depends on| RemoteProcedureCall + ApplicationFramework -->|maybe depends on| MessageQueue + ApplicationFramework -->|maybe depends on| Database + MessageQueue -->|depends on| NetworkFramework + RemoteProcedureCall -->|depends on| NetworkFramework + NetworkFramework -->|runs on| OperationSystem + NetworkFramework -->|can depends on| open-coroutine-architecture + Database -->|runs on| OperationSystem + open-coroutine-architecture -->|runs on| OperationSystem +``` + ## 📖 Quick Start ### step1: add dependency to your Cargo.toml @@ -60,7 +128,9 @@ fn main() { } ``` -### create a task with priority(optional) +## 🪽 Advanced Usage + +### create a task with priority ```rust #[open_coroutine::main] @@ -71,7 +141,7 @@ fn main() { } ``` -### wait until the task is completed or timed out(optional) +### wait until the task is completed or timed out ```rust #[open_coroutine::main] @@ -83,7 +153,7 @@ fn main() { } ``` -### scalable stack(optional) +### scalable stack ```rust #[open_coroutine::main] @@ -112,3 +182,15 @@ fn main() { - [Monitor Overview](core/docs/en/monitor.md) [我有故事,你有酒吗?](https://github.com/acl-dev/open-coroutine-docs) + +## 🙏 Credits + +This crate was inspired by the following projects: + +- [acl](https://github.com/acl-dev/acl) +- [coost](https://github.com/idealvin/coost) +- [golang](https://github.com/golang/go) +- [stacker](https://github.com/rust-lang/stacker) +- [monoio](https://github.com/bytedance/monoio) +- [compio](https://github.com/compio-rs/compio) +- [may](https://github.com/Xudong-Huang/may) diff --git a/README_ZH.md b/README_ZH.md index c526583a..e0383fb5 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -8,7 +8,7 @@ [![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/acl-dev/open-coroutine.svg)](http://isitmaintained.com/project/acl-dev/open-coroutine "解决issue的平均时间") [![Percentage of issues still open](http://isitmaintained.com/badge/open/acl-dev/open-coroutine.svg)](http://isitmaintained.com/project/acl-dev/open-coroutine "仍未关闭issue的百分比") -`open-coroutine`是一个简单、高效、通用的有栈协程库。 +`open-coroutine`是一个简单、高效、通用的有栈协程库,您可以将其用作IO线程池的性能替代。 [English](README.md) | 中文 @@ -25,11 +25,79 @@ ## 🕊 未来计划 +- [ ] 完善文档; +- [ ] 增加性能[基准测试](https://github.com/TechEmpower/FrameworkBenchmarks/wiki/Project-Information-Framework-Tests-Overview); - [ ] 取消协程/任务; - [ ] 增加性能指标; - [ ] 增加并发工具包; - [ ] 支持AF_XDP套接字; +## 🏠 架构设计 + +```mermaid +graph TD + subgraph ApplicationFramework + Tower + Actix-Web + Rocket + warp + axum + end + subgraph MessageQueue + RocketMQ + Pulsar + end + subgraph RemoteProcedureCall + Dubbo + Tonic + gRPC-rs + Volo + end + subgraph Database + MySQL + Oracle + end + subgraph NetworkFramework + Tokio + monoio + async-std + smol + end + subgraph open-coroutine-architecture + subgraph core + Preemptive + ScalableStack + WorkSteal + Priority + end + subgraph hook + HookSyscall + end + subgraph macros + open-coroutine::main + end + subgraph open-coroutine + end + hook -->|depends on| core + open-coroutine -->|depends on| hook + open-coroutine -->|depends on| macros + end + subgraph OperationSystem + Linux + macOS + Windows + end + ApplicationFramework -->|maybe depends on| RemoteProcedureCall + ApplicationFramework -->|maybe depends on| MessageQueue + ApplicationFramework -->|maybe depends on| Database + MessageQueue -->|depends on| NetworkFramework + RemoteProcedureCall -->|depends on| NetworkFramework + NetworkFramework -->|runs on| OperationSystem + NetworkFramework -->|can depends on| open-coroutine-architecture + Database -->|runs on| OperationSystem + open-coroutine-architecture -->|runs on| OperationSystem +``` + ## 📖 快速接入 ### step1: 在你的Cargo.toml中添加依赖 @@ -60,7 +128,9 @@ fn main() { } ``` -### 创建具有优先级的任务(可选) +## 🪽 进阶使用 + +### 创建具有优先级的任务 ```rust #[open_coroutine::main] @@ -71,7 +141,7 @@ fn main() { } ``` -### 等待任务完成或超时(可选) +### 等待任务完成或超时 ```rust #[open_coroutine::main] @@ -83,7 +153,7 @@ fn main() { } ``` -### 扩容栈(可选) +### 扩容栈 ```rust #[open_coroutine::main] @@ -112,3 +182,15 @@ fn main() { - [语言选择](docs/cn/why-rust.md) [我有故事,你有酒吗?](https://github.com/acl-dev/open-coroutine-docs) + +## 🙏 鸣谢 + +这个crate的灵感来自以下项目: + +- [acl](https://github.com/acl-dev/acl) +- [coost](https://github.com/idealvin/coost) +- [golang](https://github.com/golang/go) +- [stacker](https://github.com/rust-lang/stacker) +- [monoio](https://github.com/bytedance/monoio) +- [compio](https://github.com/compio-rs/compio) +- [may](https://github.com/Xudong-Huang/may) \ No newline at end of file diff --git a/core/docs/en/coroutine.md b/core/docs/en/coroutine.md index 80f5fbdd..fc4e7d33 100644 --- a/core/docs/en/coroutine.md +++ b/core/docs/en/coroutine.md @@ -49,19 +49,19 @@ The above is excerpted from [corosensei](https://github.com/Amanieu/corosensei). ## Coroutine VS Thread -| | coroutine | thread | -|-------------------|-----------|---------| -| switch efficiency | ✅ Higher | ❌ High | -| memory efficiency | KB/MB | KB/MB | -| scheduled by OS | ❌ | ✅ | -| stack grow | ✅ | ❌ | +| | coroutine | thread | +|-------------------|----------------|----------| +| switch efficiency | ✅ Higher | ❌ High | +| memory usage | ✅ Bytes/KB/MB | ❌ KB/MB | +| scheduled by OS | ❌ | ✅ | +| stack grow | ✅ | ❌ | ## Stackfull VS Stackless | | stackfull | stackless | |-------------------|-----------|-----------| | switch efficiency | ❌ High | ✅ Higher | -| memory efficiency | ❌ KB/MB | ✅ Bytes | +| memory usage | ❌ KB/MB | ✅ Bytes | | limitations | ✅ Few | ❌ Many | In general, if the requirements for resource utilization and switching performance are not very strict, using a @@ -112,3 +112,5 @@ coroutines may flow between multiple threads which makes `ThreadLocal` invalid. introduce `CoroutineLocal`. It's similar to `ThreadLocal`'s approach of providing replicas, but `CoroutineLocal` has upgraded the replicas to the coroutine level, which means each coroutine has its own local variables. These local variables will be dropped together when the coroutine is dropped. + +## [Scalable Stack Overview](scalable-stack.md) diff --git a/core/docs/en/monitor.md b/core/docs/en/monitor.md index 07649af3..3102d39b 100644 --- a/core/docs/en/monitor.md +++ b/core/docs/en/monitor.md @@ -33,6 +33,7 @@ fn main() -> std::io::Result<()> { // is not enabled, it will remain stuck in a dead loop after resume. let mut coroutine: Coroutine<(), (), ()> = co!(|_, ()| { loop {} })?; assert_eq!(CoroutineState::Suspend((), 0), coroutine.resume()?); + // will never reach if the preemptive feature is not enabled assert_eq!(CoroutineState::Suspend((), 0), coroutine.state()); Ok(()) } @@ -43,7 +44,7 @@ fn main() -> std::io::Result<()> { The `monitor` mod implements the `preemptive` feature for open-coroutine, which allows the coroutine to be preempted when it is running for a long time. -## Why preempt +## Why preempt? After a `Coroutine::resume_with`, a coroutine may occupy the scheduling thread for a long time, thereby slowing down other coroutines scheduled by that scheduling thread. To solve this problem, we introduce preemptive scheduling, which diff --git a/core/docs/en/overview.md b/core/docs/en/overview.md new file mode 100644 index 00000000..e7e47029 --- /dev/null +++ b/core/docs/en/overview.md @@ -0,0 +1,20 @@ +## open-coroutine overview + +[![crates.io](https://img.shields.io/crates/v/open-coroutine.svg)](https://crates.io/crates/open-coroutine) +[![docs.rs](https://img.shields.io/badge/docs-release-blue)](https://docs.rs/open-coroutine) +[![LICENSE](https://img.shields.io/github/license/acl-dev/open-coroutine.svg?style=flat-square)](https://github.com/acl-dev/open-coroutine/blob/master/LICENSE-APACHE) +[![Build Status](https://github.com/acl-dev/open-coroutine/workflows/CI/badge.svg)](https://github.com/acl-dev/open-coroutine/actions) +[![Codecov](https://codecov.io/github/acl-dev/open-coroutine/graph/badge.svg?token=MSM3R7CBEX)](https://codecov.io/github/acl-dev/open-coroutine) +[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/acl-dev/open-coroutine.svg)](http://isitmaintained.com/project/acl-dev/open-coroutine "Average time to resolve an issue") +[![Percentage of issues still open](http://isitmaintained.com/badge/open/acl-dev/open-coroutine.svg)](http://isitmaintained.com/project/acl-dev/open-coroutine "Percentage of issues still open") + +The `open-coroutine` is a simple, efficient and generic stackfull-coroutine library, you can use this as a performance replacement for IO thread pools. + +[//]: # (todo 增加英文版本的文档) +- [Background](../../../docs/cn/background.md) +- [Why rust](../../../docs/cn/why-rust.md) +- [Why better]() +- [Quick Start](../../../README.md) +- [Coroutine Overview](../en/coroutine.md) +- [Scalable Stack Overview](../en/scalable-stack.md) +- [Monitor Overview](../en/monitor.md) diff --git a/core/docs/en/scalable-stack.md b/core/docs/en/scalable-stack.md new file mode 100644 index 00000000..6b435675 --- /dev/null +++ b/core/docs/en/scalable-stack.md @@ -0,0 +1,62 @@ +--- +title: Scalable Stack Overview +date: 2025-01-08 08:44:00 +author: loongs-zhang +--- + +# Scalable Stack Overview + +## Usage + +```rust +use open_coroutine_core::co; +use open_coroutine_core::common::constants::CoroutineState; +use open_coroutine_core::coroutine::suspender::Suspender; +use open_coroutine_core::coroutine::Coroutine; + +fn main() -> std::io::Result<()> { + let mut co = co!(|_: &Suspender<(), i32>, ()| { + fn recurse(i: u32, p: &mut [u8; 10240]) { + // You can also use `maybe_grow` in thread. + Coroutine::<(), i32, ()>::maybe_grow(|| { + // Ensure the stack allocation isn't optimized away. + unsafe { std::ptr::read_volatile(&p) }; + if i > 0 { + recurse(i - 1, &mut [0; 10240]); + } + }) + .expect("allocate stack failed") + } + // Use ~500KB of stack. + recurse(50, &mut [0; 10240]); + })?; + assert_eq!(co.resume()?, CoroutineState::Complete(())); + Ok(()) +} +``` + +## Why scalable stack? + +The default stack size of coroutine is 128KB, which is sufficient for most scenarios, but there are still some scenarios +that are not applicable, such as recursive algorithms. The scalable stack enables annotating fixed points in programs +where the stack may want to grow larger. Spills over to the heap if the stack has hit its limit. + +## How it works + +```mermaid +flowchart TD + Cond1{In coroutine} + Cond2{Approach the limit} + Cond3{Is the first growing up?} + C[Create a new stack] + RC[Run code on current stack] + RN1[Run code on new stack] + maybe_grow --> Cond1 + Cond1 -- Yes --> Cond2 + Cond2 -- Yes --> C + Cond2 -- No --> RC + C --> RN1 + Cond1 -- No --> Cond3 + Cond3 -- Yes --> C + Cond3 -- No --> Cond2 +``` diff --git a/core/src/coroutine/korosensei.rs b/core/src/coroutine/korosensei.rs index 66c9cdbe..b6eee7ea 100644 --- a/core/src/coroutine/korosensei.rs +++ b/core/src/coroutine/korosensei.rs @@ -1,9 +1,9 @@ -use crate::catch; use crate::common::constants::CoroutineState; use crate::coroutine::listener::Listener; use crate::coroutine::local::CoroutineLocal; use crate::coroutine::suspender::Suspender; use crate::coroutine::StackInfo; +use crate::{catch, warn}; use corosensei::stack::{DefaultStack, Stack}; use corosensei::trap::TrapHandlerRegs; use corosensei::CoroutineResult; @@ -318,8 +318,9 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> { /// /// The closure `f` is guaranteed to run on a stack with at least `red_zone` bytes, and it will be /// run on the current stack if there's space available. - #[allow(clippy::inline_always)] #[inline(always)] + #[allow(clippy::inline_always)] + #[doc = include_str!("../../docs/en/scalable-stack.md")] pub fn maybe_grow_with R>( red_zone: usize, stack_size: usize, @@ -366,6 +367,7 @@ impl Drop for Coroutine<'_, Param, Yield, Return> { //for test_yield case if self.inner.started() && !self.inner.done() { unsafe { self.inner.force_reset() }; + warn!("Coroutine {} is dropped without complete", self.name()); } } } diff --git a/core/src/coroutine/mod.rs b/core/src/coroutine/mod.rs index 8e577615..cfb8c73c 100644 --- a/core/src/coroutine/mod.rs +++ b/core/src/coroutine/mod.rs @@ -120,13 +120,14 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> { /// /// This function is intended to be called at manually instrumented points in a program where /// recursion is known to happen quite a bit. This function will check to see if we're within - /// `32 * 1024` bytes of the end of the stack, and if so it will allocate a new stack of at least + /// `20 * 1024` bytes of the end of the stack, and if so it will allocate a new stack of at least /// `128 * 1024` bytes. /// /// The closure `f` is guaranteed to run on a stack with at least `32 * 1024` bytes, and it will be /// run on the current stack if there's space available. - #[allow(clippy::inline_always)] #[inline(always)] + #[allow(clippy::inline_always)] + #[doc = include_str!("../../docs/en/scalable-stack.md")] pub fn maybe_grow R>(callback: F) -> std::io::Result { Self::maybe_grow_with( crate::common::default_red_zone(), diff --git a/core/src/lib.rs b/core/src/lib.rs index d960d745..4293ed54 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -51,7 +51,7 @@ clippy::separated_literal_suffix, // conflicts with clippy::unseparated_literal_suffix clippy::single_char_lifetime_names, // TODO: change lifetime names )] -//! see `https://github.com/acl-dev/open-coroutine` +#![doc = include_str!("../docs/en/overview.md")] /// Common traits and impl. pub mod common; diff --git a/core/src/syscall/unix/mod.rs b/core/src/syscall/unix/mod.rs index 7e9da196..86189a9a 100644 --- a/core/src/syscall/unix/mod.rs +++ b/core/src/syscall/unix/mod.rs @@ -959,18 +959,7 @@ pub extern "C" fn send_time_limit(fd: c_int) -> u64 { } panic!("getsockopt failed: {error}"); } - let mut time_limit = u64::try_from(tv.tv_sec) - .expect("overflow") - .saturating_mul(1_000_000_000) - .saturating_add( - u64::try_from(tv.tv_usec) - .expect("overflow") - .saturating_mul(1_000), - ); - if 0 == time_limit { - // 取消超时 - time_limit = u64::MAX; - } + let time_limit = get_time_limit(&tv); assert!(SEND_TIME_LIMIT.insert(fd, time_limit).is_none()); time_limit }, @@ -999,21 +988,26 @@ pub extern "C" fn recv_time_limit(fd: c_int) -> u64 { } panic!("getsockopt failed: {error}"); } - let mut time_limit = u64::try_from(tv.tv_sec) - .expect("overflow") - .saturating_mul(1_000_000_000) - .saturating_add( - u64::try_from(tv.tv_usec) - .expect("overflow") - .saturating_mul(1_000), - ); - if 0 == time_limit { - // 取消超时 - time_limit = u64::MAX; - } + let time_limit = get_time_limit(&tv); assert!(RECV_TIME_LIMIT.insert(fd, time_limit).is_none()); time_limit }, |v| *v.value(), ) } + +pub(crate) fn get_time_limit(tv: &libc::timeval) -> u64 { + let mut time_limit = u64::try_from(tv.tv_sec) + .expect("overflow") + .saturating_mul(1_000_000_000) + .saturating_add( + u64::try_from(tv.tv_usec) + .expect("overflow") + .saturating_mul(1_000), + ); + if 0 == time_limit { + // 取消超时 + time_limit = u64::MAX; + } + time_limit +} diff --git a/core/src/syscall/unix/setsockopt.rs b/core/src/syscall/unix/setsockopt.rs index 245d67bc..c069f66b 100644 --- a/core/src/syscall/unix/setsockopt.rs +++ b/core/src/syscall/unix/setsockopt.rs @@ -1,6 +1,7 @@ use std::ffi::{c_int, c_void}; -use libc::socklen_t; +use libc::{socklen_t, timeval}; use once_cell::sync::Lazy; +use crate::syscall::get_time_limit; use crate::syscall::unix::{RECV_TIME_LIMIT, SEND_TIME_LIMIT}; #[must_use] @@ -52,25 +53,9 @@ impl SetsockoptSyscall for NioSetsockoptSyscall { let r= self.inner.setsockopt(fn_ptr, socket, level, name, value, option_len); if 0 == r && libc::SOL_SOCKET == level { if libc::SO_SNDTIMEO == name { - let tv = unsafe { &*value.cast::() }; - let mut time_limit = u64::try_from(tv.tv_sec).expect("overflow") - .saturating_mul(1_000_000_000) - .saturating_add(u64::try_from(tv.tv_usec).expect("overflow").saturating_mul(1_000)); - if 0 == time_limit { - // 取消超时 - time_limit = u64::MAX; - } - assert!(SEND_TIME_LIMIT.insert(socket, time_limit).is_none()); + assert!(SEND_TIME_LIMIT.insert(socket, get_time_limit(unsafe { &*value.cast::() })).is_none()); } else if libc::SO_RCVTIMEO == name { - let tv = unsafe { &*value.cast::() }; - let mut time_limit = u64::try_from(tv.tv_sec).expect("overflow") - .saturating_mul(1_000_000_000) - .saturating_add(u64::try_from(tv.tv_usec).expect("overflow").saturating_mul(1_000)); - if 0 == time_limit { - // 取消超时 - time_limit = u64::MAX; - } - assert!(RECV_TIME_LIMIT.insert(socket, time_limit).is_none()); + assert!(RECV_TIME_LIMIT.insert(socket, get_time_limit(unsafe { &*value.cast::() })).is_none()); } } r diff --git a/core/tests/coroutine.rs b/core/tests/coroutine.rs index bbeb6033..11c3eaae 100644 --- a/core/tests/coroutine.rs +++ b/core/tests/coroutine.rs @@ -112,8 +112,10 @@ fn thread_stack_growth() { }) .expect("allocate stack failed") } - // Use 10MB of stack. - recurse(1000, &mut [0; 10240]); + // Use ~500KB of stack. + recurse(50, &mut [0; 10240]); + // Use ~500KB of stack. + recurse(50, &mut [0; 10240]); } #[test]