diff --git a/examples/internals-timer.rs b/examples/internals-timer.rs index 76256f24..b851a83e 100644 --- a/examples/internals-timer.rs +++ b/examples/internals-timer.rs @@ -13,7 +13,7 @@ use pyroscope::timer::Timer; fn main() { // Initialize the Timer - let mut timer = Timer::default().initialize(); + let mut timer = Timer::default().initialize().unwrap(); // Create a streaming channel let (tx, rx): (Sender, Receiver) = channel(); @@ -24,11 +24,19 @@ fn main() { timer.attach_listener(tx).unwrap(); timer.attach_listener(tx2).unwrap(); + // Show current time + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + + println!("Current Time: {}", now); + // Listen to the Timer events std::thread::spawn(move || { while let result = rx.recv() { match result { - Ok(time) => println!("Thread 2 Notification: {}", time), + Ok(time) => println!("Thread 1 Notification: {}", time), Err(err) => { println!("Error Thread 1"); break; diff --git a/src/pyroscope.rs b/src/pyroscope.rs index 7b8736f8..f9e25320 100644 --- a/src/pyroscope.rs +++ b/src/pyroscope.rs @@ -123,7 +123,7 @@ impl PyroscopeAgentBuilder { backend.lock()?.initialize(self.config.sample_rate)?; // Start Timer - let timer = Timer::default().initialize(); + let timer = Timer::default().initialize()?; // Return PyroscopeAgent Ok(PyroscopeAgent { diff --git a/src/timer/epoll.rs b/src/timer/epoll.rs index d3e7dca4..6a18c271 100644 --- a/src/timer/epoll.rs +++ b/src/timer/epoll.rs @@ -4,6 +4,7 @@ // https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed // except according to those terms. +use crate::utils::{epoll_create1, epoll_ctl, epoll_wait, read, timerfd_create, timerfd_settime}; use crate::Result; use std::sync::{mpsc::Sender, Arc, Mutex}; @@ -19,8 +20,122 @@ pub struct Timer { } impl Timer { - pub fn initialize(self) -> Self { - self + pub fn initialize(self) -> Result { + let txs = Arc::clone(&self.txs); + + let timer_fd = Timer::set_timerfd()?; + let epoll_fd = Timer::create_epollfd(timer_fd)?; + + let handle = Some(thread::spawn(move || { + loop { + // Exit thread if there are no listeners + if txs.lock()?.len() == 0 { + // TODO: should close file descriptors? + return Ok(()); + } + + // Fire @ 10th sec + Timer::epoll_wait(timer_fd, epoll_fd)?; + + // Get current time + let current = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs(); + + // Iterate through Senders + txs.lock()?.iter().for_each(|tx| { + // Send event to attached Sender + tx.send(current).unwrap(); + }); + } + })); + + Ok(Self { handle, ..self }) + } + + /// create and set a timer file descriptor + fn set_timerfd() -> Result { + // Set the timer to use the system time. + let clockid: libc::clockid_t = libc::CLOCK_REALTIME; + // Non-blocking file descriptor + let clock_flags: libc::c_int = libc::TFD_NONBLOCK; + + // Create timer fd + let tfd = timerfd_create(clockid, clock_flags)?; + + // Get the next event time + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs(); + let rem = 10u64.checked_sub(now.checked_rem(10).unwrap()).unwrap(); + let first_fire = now + rem; + + // new_value sets the Timer + let mut new_value = libc::itimerspec { + it_interval: libc::timespec { + tv_sec: 10, + tv_nsec: 0, + }, + it_value: libc::timespec { + tv_sec: first_fire as i64, + tv_nsec: 0, + }, + }; + + // Empty itimerspec object + let mut old_value = libc::itimerspec { + it_interval: libc::timespec { + tv_sec: 0, + tv_nsec: 0, + }, + it_value: libc::timespec { + tv_sec: 0, + tv_nsec: 0, + }, + }; + + let set_flags = libc::TFD_TIMER_ABSTIME; + + // Set the timer + timerfd_settime(tfd, set_flags, &mut new_value, &mut old_value)?; + + // Return file descriptor + Ok(tfd) + } + + /// Create a new epoll file descriptor and add the timer to its interests + fn create_epollfd(timer_fd: libc::c_int) -> Result { + // create a new epoll fd + let epoll_fd = epoll_create1(0)?; + + // event to pull + let mut event = libc::epoll_event { + events: libc::EPOLLIN as u32, + u64: 1, + }; + + let epoll_flags = libc::EPOLL_CTL_ADD; + + // add event to the epoll + epoll_ctl(epoll_fd, epoll_flags, timer_fd, &mut event)?; + + // return epoll fd + Ok(epoll_fd) + } + + fn epoll_wait(timer_fd: libc::c_int, epoll_fd: libc::c_int) -> Result<()> { + // vector to store events + let mut events = Vec::with_capacity(1); + + // wait for the timer to fire an event. This is function will block. + epoll_wait(epoll_fd, events.as_mut_ptr(), 1, -1)?; + + // read the value from the timerfd. This is required to re-arm the timer. + let mut buffer: u64 = 0; + let bufptr: *mut _ = &mut buffer; + read(timer_fd, bufptr as *mut libc::c_void, 8)?; + + Ok(()) } /// Attach an mpsc::Sender to Timer diff --git a/src/timer/kqueue.rs b/src/timer/kqueue.rs index d3e7dca4..22f43715 100644 --- a/src/timer/kqueue.rs +++ b/src/timer/kqueue.rs @@ -19,8 +19,8 @@ pub struct Timer { } impl Timer { - pub fn initialize(self) -> Self { - self + pub fn initialize(self) -> Result { + Ok(self) } /// Attach an mpsc::Sender to Timer diff --git a/src/timer/sleep.rs b/src/timer/sleep.rs index 23598fc0..e4b9cb52 100644 --- a/src/timer/sleep.rs +++ b/src/timer/sleep.rs @@ -28,7 +28,7 @@ pub struct Timer { impl Timer { /// Initialize Timer and run a thread to send events to attached listeners - pub fn initialize(self) -> Self { + pub fn initialize(self) -> Result { let txs = Arc::clone(&self.txs); // Add tx @@ -69,7 +69,7 @@ impl Timer { } })); - Self { handle, ..self } + Ok(Self { handle, ..self }) } /// Attach an mpsc::Sender to Timer diff --git a/src/utils.rs b/src/utils.rs index 519cc749..2ff1da71 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -5,6 +5,7 @@ // except according to those terms. use crate::error::Result; +use crate::PyroscopeError; use std::collections::HashMap; @@ -54,3 +55,61 @@ mod tests { ) } } + +/// Wrapper for libc functions. +/// +/// Error wrapper for some libc functions used by the library. This only does +/// Error (-1 return) wrapping. Alternatively, the nix crate could be used +/// instead of expanding this wrappers (if more functions and types are used +/// from libc) + +/// Error Wrapper for libc return. Only check for errors. +fn check_err(num: T) -> Result { + if num < T::default() { + return Err(PyroscopeError::from(std::io::Error::last_os_error())); + } + Ok(num) +} + +/// libc::timerfd wrapper +pub fn timerfd_create(clockid: libc::clockid_t, clock_flags: libc::c_int) -> Result { + check_err(unsafe { libc::timerfd_create(clockid, clock_flags) }).map(|timer_fd| timer_fd as i32) +} + +/// libc::timerfd_settime wrapper +pub fn timerfd_settime( + timer_fd: i32, set_flags: libc::c_int, new_value: &mut libc::itimerspec, + old_value: &mut libc::itimerspec, +) -> Result<()> { + check_err(unsafe { libc::timerfd_settime(timer_fd, set_flags, new_value, old_value) })?; + Ok(()) +} + +/// libc::epoll_create1 wrapper +pub fn epoll_create1(epoll_flags: libc::c_int) -> Result { + check_err(unsafe { libc::epoll_create1(epoll_flags) }).map(|epoll_fd| epoll_fd as i32) +} + +/// libc::epoll_ctl wrapper +pub fn epoll_ctl(epoll_fd: i32, epoll_flags: libc::c_int, timer_fd: i32, event: &mut libc::epoll_event) -> Result<()> { + check_err(unsafe { + libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event) + })?; + Ok(()) +} + +/// libc::epoll_wait wrapper +pub fn epoll_wait(epoll_fd: i32, events: *mut libc::epoll_event, maxevents: libc::c_int, timeout: libc::c_int) -> Result<()> { + check_err(unsafe { + libc::epoll_wait(epoll_fd, events, maxevents, timeout) + })?; + Ok(()) +} + +/// libc::read wrapper +pub fn read(timer_fd: i32, bufptr: *mut libc::c_void, count: libc::size_t) -> Result<()> { + check_err(unsafe { + libc::read(timer_fd, bufptr, count) + })?; + Ok(()) +}