diff --git a/examples/internals-timer.rs b/examples/internals-timer.rs index b851a83e..fd075e9c 100644 --- a/examples/internals-timer.rs +++ b/examples/internals-timer.rs @@ -29,15 +29,15 @@ fn main() { .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); - println!("Current Time: {}", now); // Listen to the Timer events std::thread::spawn(move || { + #[allow(irrefutable_let_patterns)] while let result = rx.recv() { match result { Ok(time) => println!("Thread 1 Notification: {}", time), - Err(err) => { + Err(_err) => { println!("Error Thread 1"); break; } @@ -46,10 +46,11 @@ fn main() { }); std::thread::spawn(move || { + #[allow(irrefutable_let_patterns)] while let result = rx2.recv() { match result { Ok(time) => println!("Thread 2 Notification: {}", time), - Err(err) => { + Err(_err) => { println!("Error Thread 2"); break; } diff --git a/src/pyroscope.rs b/src/pyroscope.rs index f9e25320..8538d85d 100644 --- a/src/pyroscope.rs +++ b/src/pyroscope.rs @@ -182,7 +182,6 @@ impl PyroscopeAgent { drop(cvar); drop(running); - // TODO: move this channel to PyroscopeAgent let (tx, rx): (Sender, Receiver) = channel(); self.timer.attach_listener(tx.clone())?; self.tx = Some(tx.clone()); @@ -219,7 +218,7 @@ impl PyroscopeAgent { // Wait for the Thread to finish let pair = Arc::clone(&self.running); let (lock, cvar) = &*pair; - cvar.wait_while(lock.lock()?, |running| *running)?; + let _guard = cvar.wait_while(lock.lock()?, |running| *running)?; // Create a clone of Backend let backend = Arc::clone(&self.backend); diff --git a/src/timer/epoll.rs b/src/timer/epoll.rs index 6a18c271..0c931c9d 100644 --- a/src/timer/epoll.rs +++ b/src/timer/epoll.rs @@ -4,10 +4,13 @@ // https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed // except according to those terms. -use crate::utils::{epoll_create1, epoll_ctl, epoll_wait, read, timerfd_create, timerfd_settime}; +use crate::utils::check_err; use crate::Result; -use std::sync::{mpsc::Sender, Arc, Mutex}; +use std::sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, +}; use std::{thread, thread::JoinHandle}; #[derive(Debug, Default)] @@ -23,6 +26,10 @@ impl Timer { pub fn initialize(self) -> Result { let txs = Arc::clone(&self.txs); + // Add Default tx + let (tx, _rx): (Sender, Receiver) = channel(); + txs.lock()?.push(tx); + let timer_fd = Timer::set_timerfd()?; let epoll_fd = Timer::create_epollfd(timer_fd)?; @@ -30,7 +37,10 @@ impl Timer { loop { // Exit thread if there are no listeners if txs.lock()?.len() == 0 { - // TODO: should close file descriptors? + // Close file descriptors + unsafe { libc::close(timer_fd) }; + unsafe { libc::close(epoll_fd) }; + return Ok(()); } @@ -45,7 +55,10 @@ impl Timer { // Iterate through Senders txs.lock()?.iter().for_each(|tx| { // Send event to attached Sender - tx.send(current).unwrap(); + match tx.send(current) { + Ok(_) => {} + Err(_) => {} + } }); } })); @@ -158,3 +171,51 @@ impl Timer { Ok(()) } } + +/// Wrapper for libc functions. +/// +/// Error wrapper for some libc functions used by the library. This only does +/// Error (-1 return) wrapping. Alternatively, the nix crate could be used +/// instead of expanding this wrappers (if more functions and types are used +/// from libc) + +/// libc::timerfd wrapper +pub fn timerfd_create(clockid: libc::clockid_t, clock_flags: libc::c_int) -> Result { + check_err(unsafe { libc::timerfd_create(clockid, clock_flags) }).map(|timer_fd| timer_fd as i32) +} + +/// libc::timerfd_settime wrapper +pub fn timerfd_settime( + timer_fd: i32, set_flags: libc::c_int, new_value: &mut libc::itimerspec, + old_value: &mut libc::itimerspec, +) -> Result<()> { + check_err(unsafe { libc::timerfd_settime(timer_fd, set_flags, new_value, old_value) })?; + Ok(()) +} + +/// libc::epoll_create1 wrapper +pub fn epoll_create1(epoll_flags: libc::c_int) -> Result { + check_err(unsafe { libc::epoll_create1(epoll_flags) }).map(|epoll_fd| epoll_fd as i32) +} + +/// libc::epoll_ctl wrapper +pub fn epoll_ctl( + epoll_fd: i32, epoll_flags: libc::c_int, timer_fd: i32, event: &mut libc::epoll_event, +) -> Result<()> { + check_err(unsafe { libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event) })?; + Ok(()) +} + +/// libc::epoll_wait wrapper +pub fn epoll_wait( + epoll_fd: i32, events: *mut libc::epoll_event, maxevents: libc::c_int, timeout: libc::c_int, +) -> Result<()> { + check_err(unsafe { libc::epoll_wait(epoll_fd, events, maxevents, timeout) })?; + Ok(()) +} + +/// libc::read wrapper +pub fn read(timer_fd: i32, bufptr: *mut libc::c_void, count: libc::size_t) -> Result<()> { + check_err(unsafe { libc::read(timer_fd, bufptr, count) })?; + Ok(()) +} diff --git a/src/timer/kqueue.rs b/src/timer/kqueue.rs index 22f43715..410e0793 100644 --- a/src/timer/kqueue.rs +++ b/src/timer/kqueue.rs @@ -4,9 +4,13 @@ // https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed // except according to those terms. +use crate::utils::check_err; use crate::Result; -use std::sync::{mpsc::Sender, Arc, Mutex}; +use std::sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, +}; use std::{thread, thread::JoinHandle}; #[derive(Debug, Default)] @@ -20,7 +24,50 @@ pub struct Timer { impl Timer { pub fn initialize(self) -> Result { - Ok(self) + let txs = Arc::clone(&self.txs); + + // Add Default tx + let (tx, _rx): (Sender, Receiver) = channel(); + txs.lock()?.push(tx); + + let kqueue = kqueue()?; + + let handle = Some(thread::spawn(move || { + // Wait for initial expiration + let initial_event = Timer::register_initial_expiration(kqueue)?; + Timer::wait_event(kqueue, [initial_event].as_mut_ptr())?; + + // Register loop event + let loop_event = Timer::register_loop_expiration(kqueue)?; + + // Loop 10s + loop { + // Exit thread if there are no listeners + if txs.lock()?.len() == 0 { + // TODO: should close file descriptors? + return Ok(()); + } + + // Get current time + let current = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs(); + + // Iterate through Senders + txs.lock()?.iter().for_each(|tx| { + // Send event to attached Sender + match tx.send(current) { + Ok(_) => {} + Err(_) => {} + } + }); + + // Wait 10s + Timer::wait_event(kqueue, [loop_event].as_mut_ptr())?; + } + })); + + Ok(Self { handle, ..self }) } /// Attach an mpsc::Sender to Timer @@ -42,4 +89,74 @@ impl Timer { Ok(()) } + + fn wait_event(kqueue: i32, events: *mut libc::kevent) -> Result<()> { + kevent(kqueue, [].as_mut_ptr(), 0, events, 1, std::ptr::null())?; + Ok(()) + } + fn register_initial_expiration(kqueue: i32) -> Result { + // Get the next event time + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs(); + let rem = 10u64.checked_sub(now.checked_rem(10).unwrap()).unwrap(); + let first_fire = now + rem; + + let initial_event = libc::kevent { + ident: 1, + filter: libc::EVFILT_TIMER, + flags: libc::EV_ADD | libc::EV_ENABLE | libc::EV_ONESHOT, + fflags: libc::NOTE_ABSOLUTE | libc::NOTE_SECONDS, + data: first_fire as isize, + udata: 0 as *mut libc::c_void, + }; + + // add first event + kevent( + kqueue, + [initial_event].as_ptr() as *const libc::kevent, + 1, + [].as_mut_ptr(), + 0, + std::ptr::null(), + )?; + + Ok(initial_event) + } + fn register_loop_expiration(kqueue: i32) -> Result { + let loop_event = libc::kevent { + ident: 1, + filter: libc::EVFILT_TIMER, + flags: libc::EV_ADD | libc::EV_ENABLE, + fflags: 0, + data: 10000, + udata: 0 as *mut libc::c_void, + }; + + // add loop event + let ke = kevent( + kqueue, + [loop_event].as_ptr() as *const libc::kevent, + 1, + [].as_mut_ptr(), + 0, + std::ptr::null(), + )?; + + Ok(loop_event) + } +} + +/// libc::kqueue wrapper +fn kqueue() -> Result { + check_err(unsafe { libc::kqueue() }).map(|kq| kq as i32) +} + +/// libc::kevent wrapper +fn kevent( + kqueue: i32, change: *const libc::kevent, c_count: libc::c_int, events: *mut libc::kevent, + e_count: libc::c_int, timeout: *const libc::timespec, +) -> Result<()> { + check_err(unsafe { libc::kevent(kqueue, change, c_count, events, e_count, timeout) })?; + Ok(()) } diff --git a/src/timer/sleep.rs b/src/timer/sleep.rs index e4b9cb52..86227e9b 100644 --- a/src/timer/sleep.rs +++ b/src/timer/sleep.rs @@ -31,8 +31,9 @@ impl Timer { pub fn initialize(self) -> Result { let txs = Arc::clone(&self.txs); - // Add tx - // txs.lock().unwrap().push(tx); + // Add Default tx + let (tx, _rx): (Sender, Receiver) = channel(); + txs.lock()?.push(tx); // Spawn a Thread let handle = Some(thread::spawn(move || { @@ -61,7 +62,10 @@ impl Timer { // Iterate through Senders txs.lock()?.iter().for_each(|tx| { // Send event to attached Sender - tx.send(current).unwrap(); + match tx.send(current) { + Ok(_) => {} + Err(_) => {} + } }); // Sleep for 10s diff --git a/src/utils.rs b/src/utils.rs index 2ff1da71..5a182f4b 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -36,6 +36,7 @@ mod tests { use crate::utils::merge_tags_with_app_name; #[test] + fn merge_tags_with_app_name_with_tags() { let mut tags = HashMap::new(); tags.insert("env".to_string(), "staging".to_string()); @@ -55,61 +56,10 @@ mod tests { ) } } - -/// Wrapper for libc functions. -/// -/// Error wrapper for some libc functions used by the library. This only does -/// Error (-1 return) wrapping. Alternatively, the nix crate could be used -/// instead of expanding this wrappers (if more functions and types are used -/// from libc) - /// Error Wrapper for libc return. Only check for errors. -fn check_err(num: T) -> Result { +pub fn check_err(num: T) -> Result { if num < T::default() { return Err(PyroscopeError::from(std::io::Error::last_os_error())); } Ok(num) } - -/// libc::timerfd wrapper -pub fn timerfd_create(clockid: libc::clockid_t, clock_flags: libc::c_int) -> Result { - check_err(unsafe { libc::timerfd_create(clockid, clock_flags) }).map(|timer_fd| timer_fd as i32) -} - -/// libc::timerfd_settime wrapper -pub fn timerfd_settime( - timer_fd: i32, set_flags: libc::c_int, new_value: &mut libc::itimerspec, - old_value: &mut libc::itimerspec, -) -> Result<()> { - check_err(unsafe { libc::timerfd_settime(timer_fd, set_flags, new_value, old_value) })?; - Ok(()) -} - -/// libc::epoll_create1 wrapper -pub fn epoll_create1(epoll_flags: libc::c_int) -> Result { - check_err(unsafe { libc::epoll_create1(epoll_flags) }).map(|epoll_fd| epoll_fd as i32) -} - -/// libc::epoll_ctl wrapper -pub fn epoll_ctl(epoll_fd: i32, epoll_flags: libc::c_int, timer_fd: i32, event: &mut libc::epoll_event) -> Result<()> { - check_err(unsafe { - libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event) - })?; - Ok(()) -} - -/// libc::epoll_wait wrapper -pub fn epoll_wait(epoll_fd: i32, events: *mut libc::epoll_event, maxevents: libc::c_int, timeout: libc::c_int) -> Result<()> { - check_err(unsafe { - libc::epoll_wait(epoll_fd, events, maxevents, timeout) - })?; - Ok(()) -} - -/// libc::read wrapper -pub fn read(timer_fd: i32, bufptr: *mut libc::c_void, count: libc::size_t) -> Result<()> { - check_err(unsafe { - libc::read(timer_fd, bufptr, count) - })?; - Ok(()) -}