From 185909821faf28de7be29317b52dd260d8fe4078 Mon Sep 17 00:00:00 2001 From: hailang Date: Fri, 3 Jun 2022 22:39:27 +0800 Subject: [PATCH] [ISSUE #7] support event and condition --- Cargo.toml | 3 ++ examples/02base_echo_client.rs | 19 ++++--- examples/02hooked_echo_client.rs | 86 ++++++++++++++++++++++++++++++++ examples/02hooked_echo_server.rs | 7 +-- src/condition.rs | 41 +++++++++++++++ src/event.rs | 39 +++++++++++++++ src/fiber.rs | 8 ++- src/lib.rs | 4 +- 8 files changed, 192 insertions(+), 15 deletions(-) create mode 100644 examples/02hooked_echo_client.rs create mode 100644 src/condition.rs create mode 100644 src/event.rs diff --git a/Cargo.toml b/Cargo.toml index 174ac788..70d324fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,9 @@ license-file = "LICENSE.txt" [dependencies] libc = "0.2.119" +chrono = "0.4.19" +threadpool = "1.8.1" +num_cpus = "1.13.1" [build-dependencies] # 从c/c++头文件自动生成rust diff --git a/examples/02base_echo_client.rs b/examples/02base_echo_client.rs index ff6a3e18..c00f184f 100644 --- a/examples/02base_echo_client.rs +++ b/examples/02base_echo_client.rs @@ -11,12 +11,11 @@ struct Count { outb: u64, } -fn main() { - let address = "127.0.0.1:9898"; - let package_length = 512; - let clients = 500; - let duration = 30; +const package_length: usize = 512; +const clients: i32 = 500; +const duration: u64 = 30; +fn main() { let (tx, rx) = mpsc::channel(); let stop = Arc::new(AtomicBool::new(false)); let control = Arc::downgrade(&stop); @@ -28,8 +27,8 @@ fn main() { let mut out_buf: Vec = vec![0; package_length]; out_buf[package_length - 1] = b'\n'; let mut in_buf: Vec = vec![0; package_length]; - match TcpStream::connect(&*address) { - Ok(mut stream)=>{ + match TcpStream::connect("127.0.0.1:9898") { + Ok(mut stream) => { loop { if (*stop).load(Ordering::Relaxed) { break; @@ -58,8 +57,8 @@ fn main() { }; sum.inb += 1; } - }, - Err(e)=>println!("connect failed !") + } + Err(e) => println!("connect failed !") } tx.send(sum).unwrap(); }); @@ -79,7 +78,7 @@ fn main() { sum.inb += c.inb; sum.outb += c.outb; } - println!("Benchmarking: {}", address); + println!("Benchmarking: 127.0.0.1:9898"); println!( "{} clients, running {} bytes, {} sec.", clients, package_length, duration diff --git a/examples/02hooked_echo_client.rs b/examples/02hooked_echo_client.rs new file mode 100644 index 00000000..c060dfe9 --- /dev/null +++ b/examples/02hooked_echo_client.rs @@ -0,0 +1,86 @@ +use std::ffi::c_void; +use std::io::Error; +use std::{mem, thread}; +use std::os::raw::c_int; +use libc as c; +use threadpool::ThreadPool; +use libfiber::condition::Condition; +use libfiber::event::Event; +use libfiber::fiber::Fiber; +use libfiber::scheduler::{EventMode, Scheduler}; + +const package_length: usize = 512; +const clients: i32 = 500; +const duration: i64 = 30 * 1000; + +fn fiber_request(fiber: &Fiber, arg: Option<*mut c_void>) { + unsafe { + let socket = c::socket(c::AF_INET, c::SOCK_STREAM, c::IPPROTO_TCP); + if socket < 0 { + panic!("last OS error: {:?}", Error::last_os_error()); + } + + let servaddr = c::sockaddr_in { + sin_len: 0, + sin_family: c::AF_INET as u8, + sin_port: 9898u16.to_be(), + sin_addr: c::in_addr { + s_addr: u32::from_be_bytes([127, 0, 0, 1]).to_be() + }, + sin_zero: mem::zeroed(), + }; + + let result = c::connect(socket, &servaddr as *const c::sockaddr_in as *const c::sockaddr, mem::size_of_val(&servaddr) as u32); + if result < 0 { + println!("last OS error: {:?}", Error::last_os_error()); + c::close(socket); + } + println!("fiber-{} connect ok !", fiber.get_id()); + let msg = [0u8; package_length]; + loop { + let n = c::write(socket, &msg as *const _ as *const c_void, package_length); + if n <= 0 { + println!("last OS error: {:?}", Error::last_os_error()); + c::close(socket); + break; + } + println!("fiber-{} send {}", fiber.get_id(), String::from_utf8_lossy(&msg[0..n as usize])); + let mut buf = [0u8; package_length]; + let n = c::read(socket, &mut buf as *mut _ as *mut c_void, package_length); + if n <= 0 { + println!("last OS error: {:?}", Error::last_os_error()); + break; + } + } + + c::close(socket); + } +} + +fn fiber_main(main: &Fiber, arg: Option<*mut c_void>) { + // create clients + //todo 这里支持的不太好,另外需要添加统计信息 + let fiber1 = Fiber::new(fiber_request, None, 128000); + let fiber2 =Fiber::new(fiber_request, None, 128000); + let fiber3 =Fiber::new(fiber_request, None, 128000); + main.delay(duration as u32); + fiber1.exit(); + fiber2.exit(); + fiber3.exit(); +} + +fn thread_main() { + Fiber::new(fiber_main, None, 327680); + let scheduler = Scheduler::new(EventMode::Kernel); + scheduler.start(); +} + +fn main() { + let num_cpus = num_cpus::get(); + let pool = ThreadPool::new(num_cpus); + for i in 0..num_cpus { + pool.execute(|| thread_main()); + } + pool.join(); + println!("finished !"); +} \ No newline at end of file diff --git a/examples/02hooked_echo_server.rs b/examples/02hooked_echo_server.rs index 9bfc6540..73810ba0 100644 --- a/examples/02hooked_echo_server.rs +++ b/examples/02hooked_echo_server.rs @@ -31,9 +31,10 @@ fn echo_client(fiber: &Fiber, arg: Option<*mut c_void>) { break; } - let recv_str = String::from_utf8_lossy(&buf[0..n as usize]); - println!("fiber-{} receive {}", fiber.get_id(), recv_str); - let n = c::write(client_socket, &recv_str as *const _ as *const c_void, recv_str.len()); + let n = n as usize; + let recv_str = String::from_utf8_lossy(&buf[0..n]); + print!("fiber-{} receive {}", fiber.get_id(), recv_str); + let n = c::write(client_socket, &buf as *const _ as *const c_void, n); if n < 0 { eprintln!("write failed !"); break; diff --git a/src/condition.rs b/src/condition.rs new file mode 100644 index 00000000..77a3e926 --- /dev/null +++ b/src/condition.rs @@ -0,0 +1,41 @@ +use std::os::raw::c_int; +use crate::event::Event; +use crate::libfiber::{ACL_FIBER_COND, acl_fiber_cond_create, acl_fiber_cond_free, acl_fiber_cond_signal, acl_fiber_cond_timedwait, acl_fiber_cond_wait}; + +pub struct Condition { + condition: *mut ACL_FIBER_COND, +} + +impl Condition { + pub fn new() -> Self { + unsafe { + Condition { + condition: acl_fiber_cond_create(0) + } + } + } + + pub fn delete(&self) { + unsafe { + acl_fiber_cond_free(self.condition); + } + } + + pub fn wait(&self, event: &Event) { + unsafe { + acl_fiber_cond_wait(self.condition, event.event); + } + } + + pub fn timed_wait(&self, event: &Event, delay_ms: c_int) { + unsafe { + acl_fiber_cond_timedwait(self.condition, event.event, delay_ms); + } + } + + pub fn signal(&self) { + unsafe { + acl_fiber_cond_signal(self.condition); + } + } +} \ No newline at end of file diff --git a/src/event.rs b/src/event.rs new file mode 100644 index 00000000..dcbe47fc --- /dev/null +++ b/src/event.rs @@ -0,0 +1,39 @@ +use crate::libfiber::{ACL_FIBER_EVENT, acl_fiber_event_create, acl_fiber_event_free, acl_fiber_event_notify, acl_fiber_event_trywait, acl_fiber_event_wait}; + +pub struct Event { + pub(crate) event: *mut ACL_FIBER_EVENT, +} + +impl Event { + pub fn new() -> Self { + unsafe { + Event { + event: acl_fiber_event_create(0) + } + } + } + + pub fn delete(&self) { + unsafe { + acl_fiber_event_free(self.event) + } + } + + pub fn wait(&self) { + unsafe { + acl_fiber_event_wait(self.event); + } + } + + pub fn try_wait(&self) { + unsafe { + acl_fiber_event_trywait(self.event); + } + } + + pub fn notify(&self) { + unsafe { + acl_fiber_event_notify(self.event); + } + } +} \ No newline at end of file diff --git a/src/fiber.rs b/src/fiber.rs index 2ddc4c98..37f51bc0 100644 --- a/src/fiber.rs +++ b/src/fiber.rs @@ -1,6 +1,6 @@ use std::mem; use std::os::raw::{c_int, c_uint, c_void}; -use crate::libfiber::{ACL_FIBER, acl_fiber_create, acl_fiber_delay, acl_fiber_id, acl_fiber_kill, acl_fiber_killed, acl_fiber_running, acl_fiber_status, acl_fiber_yield, size_t}; +use crate::libfiber::{ACL_FIBER, acl_fiber_create, acl_fiber_delay, acl_fiber_id, acl_fiber_kill, acl_fiber_killed, acl_fiber_running, acl_fiber_status, acl_fiber_switch, acl_fiber_yield, size_t}; pub struct Fiber { fiber: Option<*mut ACL_FIBER>, @@ -41,6 +41,12 @@ impl Fiber { } } + pub fn switch(&self) { + unsafe { + acl_fiber_switch(); + } + } + ///获取当前运行的纤程,如果没有正在运行的纤程将返回null pub fn current_running_fiber() -> *mut ACL_FIBER { unsafe { diff --git a/src/lib.rs b/src/lib.rs index bb7d3e09..26593a7e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ pub mod libfiber; pub mod fiber; -pub mod scheduler; \ No newline at end of file +pub mod scheduler; +pub mod event; +pub mod condition; \ No newline at end of file