diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..e2dd34f0 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,82 @@ +### 0.6.x + +- [x] support custom task and coroutine priority. +- [x] support scalable stack + +### 0.5.x + +- [x] refactor syscall state, distinguish between state and innerState + +### 0.4.x + +- [x] Supports and is compatible with io_uring in terms of local file IO +- [x] elegant shutdown +- [x] use log instead of println +- [x] enhance `#[open_coroutine::main]` macro +- [x] refactor hook impl, no need to publish dylibs now +- [x] `Monitor` follow the `thread-per-core` guideline +- [x] `EventLoop` follow the `thread-per-core` guideline + +### 0.3.x + +- [x] ~~support `genawaiter` as low_level stackless coroutine (can't support it due to hook)~~ +- [x] use `corosensei` as low_level coroutine +- [x] support backtrace +- [x] support `#[open_coroutine::co]` macro +- [x] refactor `WorkStealQueue` + +### 0.2.x + +- [x] use correct `epoll_event` struct +- [x] use `rayon` for parallel computing +- [x] support `#[open_coroutine::main]` macro +- [x] hook almost all `read` syscall +
+ read syscalls + + - [x] recv + - [x] readv + - [x] pread + - [x] preadv + - [x] recvfrom + - [x] recvmsg + +
+ +- [x] hook almost all `write` syscall +
+ write syscalls + + - [x] send + - [x] writev + - [x] sendto + - [x] sendmsg + - [x] pwrite + - [x] pwritev + +
+ +- [x] hook other syscall +
+ other syscalls + + - [x] sleep + - [x] usleep + - [x] nanosleep + - [x] connect + - [x] listen + - [x] accept + - [x] shutdown + - [x] poll + - [x] select + +
+ +### 0.1.x + +- [x] basic suspend/resume supported +- [x] use jemalloc as memory pool +- [x] higher level coroutine abstraction supported +- [x] preemptive scheduling supported +- [x] work stealing supported +- [x] sleep system call hooks supported diff --git a/README.md b/README.md index 70ccdddf..20812e39 100644 --- a/README.md +++ b/README.md @@ -10,17 +10,24 @@ The `open-coroutine` is a simple, efficient and generic stackful-coroutine library. -
- -
+## πŸš€ Features -[ζˆ‘ζœ‰ζ•…δΊ‹,δ½ ζœ‰ι…’ε—?](https://github.com/acl-dev/open-coroutine-docs) +- [x] Preemptive(`not supported in windows`): even if the coroutine enters a dead loop, it can still be seized, see [example](https://github.com/loongs-zhang/open-coroutine/blob/master/open-coroutine/examples/preemptive.rs); +- [x] Hook: you are free to use most of the slow system calls in coroutine; +- [x] Scalable: the size of the coroutine stack supports unlimited expansion, and immediately shrinks to the original size after use, see [example](https://github.com/loongs-zhang/open-coroutine/blob/master/open-coroutine/examples/scalable_stack.rs); +- [x] io_uring(`only in linux`): supports and is compatible with io_uring in terms of local file IO and network IO. If it's not supported on your system, it will fall back to non-blocking IO; +- [x] Priority: support custom task and coroutine priority; +- [x] Work Stealing: internally using a lock free work steel queue; +- [x] Compatibility: the implementation of open-coroutine is no async, but it is compatible with async, which means you can use this crate in tokeno/sync-std/smol/...; +- [x] Platforms: running on Linux, MacOS and Windows; -## Status +## πŸ•Š Roadmap -Still under development, please `do not` use this library in the `production` environment ! +- [ ] support `#[open_coroutine::all_join]` and `#[open_coroutine::any_join]` macro to wait coroutines; +- [ ] add synchronization toolkit; +- [ ] support and compatibility for AF_XDP socket; -## How to use this library ? +## πŸ“– Quick Start ### step1: add dependency to your Cargo.toml @@ -39,253 +46,41 @@ fn main() { } ``` -### step3: enjoy the performance improvement brought by open-coroutine ! - -## Examples - -### Amazing preemptive schedule - -Note: not supported for windows +### step3: create a task ```rust #[open_coroutine::main] -fn main() -> std::io::Result<()> { - cfg_if::cfg_if! { - if #[cfg(all(unix, feature = "preemptive-schedule"))] { - use open_coroutine_core::scheduler::Scheduler; - use std::sync::{Arc, Condvar, Mutex}; - use std::time::Duration; - - static mut TEST_FLAG1: bool = true; - static mut TEST_FLAG2: bool = true; - let pair = Arc::new((Mutex::new(true), Condvar::new())); - let pair2 = Arc::clone(&pair); - let handler = std::thread::Builder::new() - .name("preemptive".to_string()) - .spawn(move || { - let scheduler = Scheduler::new(); - _ = scheduler.submit( - |_, _| { - println!("coroutine1 launched"); - while unsafe { TEST_FLAG1 } { - println!("loop1"); - _ = unsafe { libc::usleep(10_000) }; - } - println!("loop1 end"); - 1 - }, - None, - ); - _ = scheduler.submit( - |_, _| { - println!("coroutine2 launched"); - while unsafe { TEST_FLAG2 } { - println!("loop2"); - _ = unsafe { libc::usleep(10_000) }; - } - println!("loop2 end"); - unsafe { TEST_FLAG1 = false }; - 2 - }, - None, - ); - _ = scheduler.submit( - |_, _| { - println!("coroutine3 launched"); - unsafe { TEST_FLAG2 = false }; - 3 - }, - None, - ); - scheduler.try_schedule(); - - let (lock, cvar) = &*pair2; - let mut pending = lock.lock().unwrap(); - *pending = false; - // notify the condvar that the value has changed. - cvar.notify_one(); - }) - .expect("failed to spawn thread"); - - // wait for the thread to start up - let (lock, cvar) = &*pair; - let result = cvar - .wait_timeout_while( - lock.lock().unwrap(), - Duration::from_millis(3000), - |&mut pending| pending, - ) - .unwrap(); - if result.1.timed_out() { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - "preemptive schedule failed", - )) - } else { - unsafe { - handler.join().unwrap(); - assert!(!TEST_FLAG1); - } - Ok(()) - } - } else { - println!("please enable preemptive-schedule feature"); - Ok(()) - } - } +fn main() { + let task = open_coroutine::task!(|param| { + assert_eq!(param, 1); + }, 1); + task.timeout_join(std::time::Duration::from_secs(1)).expect("timeout"); } ``` -outputs - -```text -coroutine1 launched -loop1 -coroutine2 launched -loop2 -coroutine3 launched -loop1 -loop2 end -loop1 end -``` - -### Arbitrary use of blocking syscalls +### step4: scalable stack(optional) ```rust #[open_coroutine::main] fn main() { - std::thread::sleep(std::time::Duration::from_secs(1)); + _ = open_coroutine::task!(|_| { + fn recurse(i: u32, p: &mut [u8; 10240]) { + open_coroutine::maybe_grow!(|| { + // Ensure the stack allocation isn't optimized away. + unsafe { _ = std::ptr::read_volatile(&p) }; + if i > 0 { + recurse(i - 1, &mut [0; 10240]); + } + }) + .expect("allocate stack failed") + } + println!("[coroutine] launched"); + // Use ~500KB of stack. + recurse(50, &mut [0; 10240]); + }, ()); } ``` -outputs - -```text -nanosleep hooked -``` - -## Features - -### todo - -- [ ] support and compatibility for AF_XDP socket -- [ ] hook other syscall maybe interrupt by signal -
- syscalls - - - [ ] open - - [ ] chdir - - [ ] chroot - - [ ] readlink - - [ ] stat - - [ ] dup - - [ ] dup2 - - [ ] umask - - [ ] mount - - [ ] umount - - [ ] mknod - - [ ] fcntl - - [ ] truncate - - [ ] ftruncate - - [ ] setjmp - - [ ] longjmp - - [ ] chown - - [ ] lchown - - [ ] fchown - - [ ] chmod - - [ ] fchmod - - [ ] fchmodat - - [ ] semop - - [ ] ppoll - - [ ] pselect - - [ ] io_getevents - - [ ] semop - - [ ] semtimedop - - [ ] msgrcv - - [ ] msgsnd +## βš“ Learn -
-- [ ] support `#[open_coroutine::join]` macro to wait coroutines - -### 0.6.x - -- [x] support custom task and coroutine priority. -- [x] support scalable stack - -### 0.5.x - -- [x] refactor syscall state, distinguish between state and innerState - -### 0.4.x - -- [x] Supports and is compatible with io_uring in terms of local file IO -- [x] elegant shutdown -- [x] use log instead of println -- [x] enhance `#[open_coroutine::main]` macro -- [x] refactor hook impl, no need to publish dylibs now -- [x] `Monitor` follow the `thread-per-core` guideline -- [x] `EventLoop` follow the `thread-per-core` guideline - -### 0.3.x - -- [x] ~~support `genawaiter` as low_level stackless coroutine (can't support it due to hook)~~ -- [x] use `corosensei` as low_level coroutine -- [x] support backtrace -- [x] support `#[open_coroutine::co]` macro -- [x] refactor `WorkStealQueue` - -### 0.2.x - -- [x] use correct `epoll_event` struct -- [x] use `rayon` for parallel computing -- [x] support `#[open_coroutine::main]` macro -- [x] hook almost all `read` syscall -
- read syscalls - - - [x] recv - - [x] readv - - [x] pread - - [x] preadv - - [x] recvfrom - - [x] recvmsg - -
- -- [x] hook almost all `write` syscall -
- write syscalls - - - [x] send - - [x] writev - - [x] sendto - - [x] sendmsg - - [x] pwrite - - [x] pwritev - -
- -- [x] hook other syscall -
- other syscalls - - - [x] sleep - - [x] usleep - - [x] nanosleep - - [x] connect - - [x] listen - - [x] accept - - [x] shutdown - - [x] poll - - [x] select - -
- -### 0.1.x - -- [x] basic suspend/resume supported -- [x] use jemalloc as memory pool -- [x] higher level coroutine abstraction supported -- [x] preemptive scheduling supported -- [x] work stealing supported -- [x] sleep system call hooks supported +[ζˆ‘ζœ‰ζ•…δΊ‹,δ½ ζœ‰ι…’ε—?](https://github.com/acl-dev/open-coroutine-docs) diff --git a/open-coroutine/Cargo.toml b/open-coroutine/Cargo.toml index 5f63daaa..576084a7 100644 --- a/open-coroutine/Cargo.toml +++ b/open-coroutine/Cargo.toml @@ -37,6 +37,7 @@ cargo_metadata.workspace = true [dev-dependencies] tempfile.workspace = true +cfg-if.workspace = true [features] default = ["open-coroutine-hook/default", "open-coroutine-core/default"] diff --git a/open-coroutine/examples/preemptive.rs b/open-coroutine/examples/preemptive.rs new file mode 100644 index 00000000..00321099 --- /dev/null +++ b/open-coroutine/examples/preemptive.rs @@ -0,0 +1,99 @@ +/// outputs: +/// ``` +/// coroutine1 launched +/// loop1 +/// coroutine2 launched +/// loop2 +/// coroutine3 launched +/// loop1 +/// loop2 end +/// loop1 end +/// ``` +pub fn main() -> std::io::Result<()> { + cfg_if::cfg_if! { + if #[cfg(all(unix, feature = "preemptive"))] { + use open_coroutine_core::scheduler::Scheduler; + use std::sync::{Arc, Condvar, Mutex}; + use std::time::Duration; + + static mut TEST_FLAG1: bool = true; + static mut TEST_FLAG2: bool = true; + let pair = Arc::new((Mutex::new(true), Condvar::new())); + let pair2 = Arc::clone(&pair); + let handler = std::thread::Builder::new() + .name("preemptive".to_string()) + .spawn(move || { + let mut scheduler = Scheduler::default(); + _ = scheduler.submit_co( + |_, _| { + println!("coroutine1 launched"); + while unsafe { TEST_FLAG1 } { + println!("loop1"); + _ = unsafe { libc::usleep(10_000) }; + } + println!("loop1 end"); + None + }, + None, + None, + ); + _ = scheduler.submit_co( + |_, _| { + println!("coroutine2 launched"); + while unsafe { TEST_FLAG2 } { + println!("loop2"); + _ = unsafe { libc::usleep(10_000) }; + } + println!("loop2 end"); + unsafe { TEST_FLAG1 = false }; + None + }, + None, + None, + ); + _ = scheduler.submit_co( + |_, _| { + println!("coroutine3 launched"); + unsafe { TEST_FLAG2 = false }; + None + }, + None, + None, + ); + scheduler.try_schedule().expect("schedule failed"); + + let (lock, cvar) = &*pair2; + let mut pending = lock.lock().unwrap(); + *pending = false; + // notify the condvar that the value has changed. + cvar.notify_one(); + }) + .expect("failed to spawn thread"); + + // wait for the thread to start up + let (lock, cvar) = &*pair; + let result = cvar + .wait_timeout_while( + lock.lock().unwrap(), + Duration::from_millis(3000), + |&mut pending| pending, + ) + .unwrap(); + if result.1.timed_out() { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "preemptive schedule failed", + )) + } else { + unsafe { + handler.join().unwrap(); + assert!(!TEST_FLAG1); + } + Ok(()) + } + } else { + println!("please enable preemptive feature"); + Ok(()) + } + } +}