Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ license-file = "LICENSE.txt"

[dependencies]
libc = "0.2.119"
chrono = "0.4.19"
threadpool = "1.8.1"
num_cpus = "1.13.1"

[build-dependencies]
# 从c/c++头文件自动生成rust
Expand Down
19 changes: 9 additions & 10 deletions examples/02base_echo_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ struct Count {
outb: u64,
}

fn main() {
let address = "127.0.0.1:9898";
let package_length = 512;
let clients = 500;
let duration = 30;
const package_length: usize = 512;
const clients: i32 = 500;
const duration: u64 = 30;

fn main() {
let (tx, rx) = mpsc::channel();
let stop = Arc::new(AtomicBool::new(false));
let control = Arc::downgrade(&stop);
Expand All @@ -28,8 +27,8 @@ fn main() {
let mut out_buf: Vec<u8> = vec![0; package_length];
out_buf[package_length - 1] = b'\n';
let mut in_buf: Vec<u8> = vec![0; package_length];
match TcpStream::connect(&*address) {
Ok(mut stream)=>{
match TcpStream::connect("127.0.0.1:9898") {
Ok(mut stream) => {
loop {
if (*stop).load(Ordering::Relaxed) {
break;
Expand Down Expand Up @@ -58,8 +57,8 @@ fn main() {
};
sum.inb += 1;
}
},
Err(e)=>println!("connect failed !")
}
Err(e) => println!("connect failed !")
}
tx.send(sum).unwrap();
});
Expand All @@ -79,7 +78,7 @@ fn main() {
sum.inb += c.inb;
sum.outb += c.outb;
}
println!("Benchmarking: {}", address);
println!("Benchmarking: 127.0.0.1:9898");
println!(
"{} clients, running {} bytes, {} sec.",
clients, package_length, duration
Expand Down
86 changes: 86 additions & 0 deletions examples/02hooked_echo_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::ffi::c_void;
use std::io::Error;
use std::{mem, thread};
use std::os::raw::c_int;
use libc as c;
use threadpool::ThreadPool;
use libfiber::condition::Condition;
use libfiber::event::Event;
use libfiber::fiber::Fiber;
use libfiber::scheduler::{EventMode, Scheduler};

const package_length: usize = 512;
const clients: i32 = 500;
const duration: i64 = 30 * 1000;

fn fiber_request(fiber: &Fiber, arg: Option<*mut c_void>) {
unsafe {
let socket = c::socket(c::AF_INET, c::SOCK_STREAM, c::IPPROTO_TCP);
if socket < 0 {
panic!("last OS error: {:?}", Error::last_os_error());
}

let servaddr = c::sockaddr_in {
sin_len: 0,
sin_family: c::AF_INET as u8,
sin_port: 9898u16.to_be(),
sin_addr: c::in_addr {
s_addr: u32::from_be_bytes([127, 0, 0, 1]).to_be()
},
sin_zero: mem::zeroed(),
};

let result = c::connect(socket, &servaddr as *const c::sockaddr_in as *const c::sockaddr, mem::size_of_val(&servaddr) as u32);
if result < 0 {
println!("last OS error: {:?}", Error::last_os_error());
c::close(socket);
}
println!("fiber-{} connect ok !", fiber.get_id());
let msg = [0u8; package_length];
loop {
let n = c::write(socket, &msg as *const _ as *const c_void, package_length);
if n <= 0 {
println!("last OS error: {:?}", Error::last_os_error());
c::close(socket);
break;
}
println!("fiber-{} send {}", fiber.get_id(), String::from_utf8_lossy(&msg[0..n as usize]));
let mut buf = [0u8; package_length];
let n = c::read(socket, &mut buf as *mut _ as *mut c_void, package_length);
if n <= 0 {
println!("last OS error: {:?}", Error::last_os_error());
break;
}
}

c::close(socket);
}
}

fn fiber_main(main: &Fiber, arg: Option<*mut c_void>) {
// create clients
//todo 这里支持的不太好,另外需要添加统计信息
let fiber1 = Fiber::new(fiber_request, None, 128000);
let fiber2 =Fiber::new(fiber_request, None, 128000);
let fiber3 =Fiber::new(fiber_request, None, 128000);
main.delay(duration as u32);
fiber1.exit();
fiber2.exit();
fiber3.exit();
}

fn thread_main() {
Fiber::new(fiber_main, None, 327680);
let scheduler = Scheduler::new(EventMode::Kernel);
scheduler.start();
}

fn main() {
let num_cpus = num_cpus::get();
let pool = ThreadPool::new(num_cpus);
for i in 0..num_cpus {
pool.execute(|| thread_main());
}
pool.join();
println!("finished !");
}
7 changes: 4 additions & 3 deletions examples/02hooked_echo_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ fn echo_client(fiber: &Fiber, arg: Option<*mut c_void>) {
break;
}

let recv_str = String::from_utf8_lossy(&buf[0..n as usize]);
println!("fiber-{} receive {}", fiber.get_id(), recv_str);
let n = c::write(client_socket, &recv_str as *const _ as *const c_void, recv_str.len());
let n = n as usize;
let recv_str = String::from_utf8_lossy(&buf[0..n]);
print!("fiber-{} receive {}", fiber.get_id(), recv_str);
let n = c::write(client_socket, &buf as *const _ as *const c_void, n);
if n < 0 {
eprintln!("write failed !");
break;
Expand Down
41 changes: 41 additions & 0 deletions src/condition.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::os::raw::c_int;
use crate::event::Event;
use crate::libfiber::{ACL_FIBER_COND, acl_fiber_cond_create, acl_fiber_cond_free, acl_fiber_cond_signal, acl_fiber_cond_timedwait, acl_fiber_cond_wait};

pub struct Condition {
condition: *mut ACL_FIBER_COND,
}

impl Condition {
pub fn new() -> Self {
unsafe {
Condition {
condition: acl_fiber_cond_create(0)
}
}
}

pub fn delete(&self) {
unsafe {
acl_fiber_cond_free(self.condition);
}
}

pub fn wait(&self, event: &Event) {
unsafe {
acl_fiber_cond_wait(self.condition, event.event);
}
}

pub fn timed_wait(&self, event: &Event, delay_ms: c_int) {
unsafe {
acl_fiber_cond_timedwait(self.condition, event.event, delay_ms);
}
}

pub fn signal(&self) {
unsafe {
acl_fiber_cond_signal(self.condition);
}
}
}
39 changes: 39 additions & 0 deletions src/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use crate::libfiber::{ACL_FIBER_EVENT, acl_fiber_event_create, acl_fiber_event_free, acl_fiber_event_notify, acl_fiber_event_trywait, acl_fiber_event_wait};

pub struct Event {
pub(crate) event: *mut ACL_FIBER_EVENT,
}

impl Event {
pub fn new() -> Self {
unsafe {
Event {
event: acl_fiber_event_create(0)
}
}
}

pub fn delete(&self) {
unsafe {
acl_fiber_event_free(self.event)
}
}

pub fn wait(&self) {
unsafe {
acl_fiber_event_wait(self.event);
}
}

pub fn try_wait(&self) {
unsafe {
acl_fiber_event_trywait(self.event);
}
}

pub fn notify(&self) {
unsafe {
acl_fiber_event_notify(self.event);
}
}
}
8 changes: 7 additions & 1 deletion src/fiber.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::mem;
use std::os::raw::{c_int, c_uint, c_void};
use crate::libfiber::{ACL_FIBER, acl_fiber_create, acl_fiber_delay, acl_fiber_id, acl_fiber_kill, acl_fiber_killed, acl_fiber_running, acl_fiber_status, acl_fiber_yield, size_t};
use crate::libfiber::{ACL_FIBER, acl_fiber_create, acl_fiber_delay, acl_fiber_id, acl_fiber_kill, acl_fiber_killed, acl_fiber_running, acl_fiber_status, acl_fiber_switch, acl_fiber_yield, size_t};

pub struct Fiber {
fiber: Option<*mut ACL_FIBER>,
Expand Down Expand Up @@ -41,6 +41,12 @@ impl Fiber {
}
}

pub fn switch(&self) {
unsafe {
acl_fiber_switch();
}
}

///获取当前运行的纤程,如果没有正在运行的纤程将返回null
pub fn current_running_fiber() -> *mut ACL_FIBER {
unsafe {
Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod libfiber;
pub mod fiber;
pub mod scheduler;
pub mod scheduler;
pub mod event;
pub mod condition;