From 8cde1ee9bb6a213a386539d9f48cec0eb7bee98d Mon Sep 17 00:00:00 2001 From: Abid Omar Date: Thu, 13 Jan 2022 23:29:23 +0100 Subject: [PATCH 1/3] imp(timer): initial epoll/timerfd implementation --- examples/internals-timer.rs | 2 +- src/timer/epoll.rs | 108 +++++++++++++++++++++++++++++++++++- 2 files changed, 108 insertions(+), 2 deletions(-) diff --git a/examples/internals-timer.rs b/examples/internals-timer.rs index 76256f24..309bd5c3 100644 --- a/examples/internals-timer.rs +++ b/examples/internals-timer.rs @@ -28,7 +28,7 @@ fn main() { std::thread::spawn(move || { while let result = rx.recv() { match result { - Ok(time) => println!("Thread 2 Notification: {}", time), + Ok(time) => println!("Thread 1 Notification: {}", time), Err(err) => { println!("Error Thread 1"); break; diff --git a/src/timer/epoll.rs b/src/timer/epoll.rs index d3e7dca4..d1e1fde7 100644 --- a/src/timer/epoll.rs +++ b/src/timer/epoll.rs @@ -20,7 +20,113 @@ pub struct Timer { impl Timer { pub fn initialize(self) -> Self { - self + let txs = Arc::clone(&self.txs); + + // TODO: use ? instead of unwrap, requires changing Timer initialize function + let timer_fd = Timer::set_timerfd().unwrap(); + let epoll_fd = Timer::create_epollfd(timer_fd).unwrap(); + + let handle = Some(thread::spawn(move || { + loop { + // Exit thread if there are no listeners + if txs.lock()?.len() == 0 { + // TODO: should close file descriptor + return Ok(()); + } + + // Fire @ 10th sec + Timer::epoll_wait(timer_fd, epoll_fd).unwrap(); + + // Get current time + let current = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs(); + + // Iterate through Senders + txs.lock()?.iter().for_each(|tx| { + // Send event to attached Sender + tx.send(current).unwrap(); + }); + } + })); + + Self { handle, ..self } + } + + fn set_timerfd() -> Result { + let clockid: libc::clockid_t = libc::CLOCK_REALTIME; + let clock_flags: libc::c_int = libc::TFD_NONBLOCK; + + // TODO: handler error (-1) + let tfd = unsafe { libc::timerfd_create(clockid, clock_flags) }; + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs(); + + let rem = 10u64.checked_sub(now.checked_rem(10).unwrap()).unwrap(); + + let first_fire = now + rem; + + let mut new_value = libc::itimerspec { + it_interval: libc::timespec { + tv_sec: 10, + tv_nsec: 0, + }, + it_value: libc::timespec { + tv_sec: first_fire as i64, + tv_nsec: 0, + }, + }; + + let mut old_value = libc::itimerspec { + it_interval: libc::timespec { + tv_sec: 0, + tv_nsec: 0, + }, + it_value: libc::timespec { + tv_sec: 0, + tv_nsec: 0, + }, + }; + + let set_flags = libc::TFD_TIMER_ABSTIME; + + // TODO: handler error (-1) + let sfd = unsafe { libc::timerfd_settime(tfd, set_flags, &mut new_value, &mut old_value) }; + + Ok(tfd) + } + + fn create_epollfd(timer_fd: libc::c_int) -> Result { + // TODO: handler error (-1) + let epoll_fd = unsafe { libc::epoll_create1(0) }; + + let mut event = libc::epoll_event { + events: libc::EPOLLIN as u32, + u64: 1, + }; + + let epoll_flags = libc::EPOLL_CTL_ADD; + + // TODO: handler error (-1) + let ctl_fd = unsafe { libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, &mut event) }; + + Ok(epoll_fd) + } + + fn epoll_wait(timer_fd: libc::c_int, epoll_fd: libc::c_int) -> Result<()> { + let mut events = Vec::with_capacity(1); + + // TODO: handler error (-1) + let wait = unsafe { libc::epoll_wait(epoll_fd, events.as_mut_ptr(), 1, -1) }; + let mut buffer: u64 = 0; + let bufptr: *mut _ = &mut buffer; + + // TODO: handler error (-1) + let read = unsafe { libc::read(timer_fd, bufptr as *mut libc::c_void, 8) }; + + Ok(()) } /// Attach an mpsc::Sender to Timer From e4afeed7d7b8c39149b0171252375b662505cdc0 Mon Sep 17 00:00:00 2001 From: Abid Omar Date: Fri, 14 Jan 2022 18:11:20 +0100 Subject: [PATCH 2/3] imp(wrapper): add wrapper functions for libc --- src/timer/epoll.rs | 20 +++++++--------- src/utils.rs | 59 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 12 deletions(-) diff --git a/src/timer/epoll.rs b/src/timer/epoll.rs index d1e1fde7..2823f699 100644 --- a/src/timer/epoll.rs +++ b/src/timer/epoll.rs @@ -4,6 +4,7 @@ // https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed // except according to those terms. +use crate::utils::{epoll_create1, epoll_ctl, epoll_wait, read, timerfd_create, timerfd_settime}; use crate::Result; use std::sync::{mpsc::Sender, Arc, Mutex}; @@ -57,8 +58,7 @@ impl Timer { let clockid: libc::clockid_t = libc::CLOCK_REALTIME; let clock_flags: libc::c_int = libc::TFD_NONBLOCK; - // TODO: handler error (-1) - let tfd = unsafe { libc::timerfd_create(clockid, clock_flags) }; + let tfd = timerfd_create(clockid, clock_flags).unwrap(); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? @@ -92,15 +92,13 @@ impl Timer { let set_flags = libc::TFD_TIMER_ABSTIME; - // TODO: handler error (-1) - let sfd = unsafe { libc::timerfd_settime(tfd, set_flags, &mut new_value, &mut old_value) }; + timerfd_settime(tfd, set_flags, &mut new_value, &mut old_value).unwrap(); Ok(tfd) } fn create_epollfd(timer_fd: libc::c_int) -> Result { - // TODO: handler error (-1) - let epoll_fd = unsafe { libc::epoll_create1(0) }; + let epoll_fd = epoll_create1(0).unwrap(); let mut event = libc::epoll_event { events: libc::EPOLLIN as u32, @@ -109,8 +107,7 @@ impl Timer { let epoll_flags = libc::EPOLL_CTL_ADD; - // TODO: handler error (-1) - let ctl_fd = unsafe { libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, &mut event) }; + epoll_ctl(epoll_fd, epoll_flags, timer_fd, &mut event).unwrap(); Ok(epoll_fd) } @@ -118,13 +115,12 @@ impl Timer { fn epoll_wait(timer_fd: libc::c_int, epoll_fd: libc::c_int) -> Result<()> { let mut events = Vec::with_capacity(1); - // TODO: handler error (-1) - let wait = unsafe { libc::epoll_wait(epoll_fd, events.as_mut_ptr(), 1, -1) }; + epoll_wait(epoll_fd, events.as_mut_ptr(), 1, -1).unwrap(); + let mut buffer: u64 = 0; let bufptr: *mut _ = &mut buffer; - // TODO: handler error (-1) - let read = unsafe { libc::read(timer_fd, bufptr as *mut libc::c_void, 8) }; + read(timer_fd, bufptr as *mut libc::c_void, 8).unwrap(); Ok(()) } diff --git a/src/utils.rs b/src/utils.rs index 519cc749..2ff1da71 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -5,6 +5,7 @@ // except according to those terms. use crate::error::Result; +use crate::PyroscopeError; use std::collections::HashMap; @@ -54,3 +55,61 @@ mod tests { ) } } + +/// Wrapper for libc functions. +/// +/// Error wrapper for some libc functions used by the library. This only does +/// Error (-1 return) wrapping. Alternatively, the nix crate could be used +/// instead of expanding this wrappers (if more functions and types are used +/// from libc) + +/// Error Wrapper for libc return. Only check for errors. +fn check_err(num: T) -> Result { + if num < T::default() { + return Err(PyroscopeError::from(std::io::Error::last_os_error())); + } + Ok(num) +} + +/// libc::timerfd wrapper +pub fn timerfd_create(clockid: libc::clockid_t, clock_flags: libc::c_int) -> Result { + check_err(unsafe { libc::timerfd_create(clockid, clock_flags) }).map(|timer_fd| timer_fd as i32) +} + +/// libc::timerfd_settime wrapper +pub fn timerfd_settime( + timer_fd: i32, set_flags: libc::c_int, new_value: &mut libc::itimerspec, + old_value: &mut libc::itimerspec, +) -> Result<()> { + check_err(unsafe { libc::timerfd_settime(timer_fd, set_flags, new_value, old_value) })?; + Ok(()) +} + +/// libc::epoll_create1 wrapper +pub fn epoll_create1(epoll_flags: libc::c_int) -> Result { + check_err(unsafe { libc::epoll_create1(epoll_flags) }).map(|epoll_fd| epoll_fd as i32) +} + +/// libc::epoll_ctl wrapper +pub fn epoll_ctl(epoll_fd: i32, epoll_flags: libc::c_int, timer_fd: i32, event: &mut libc::epoll_event) -> Result<()> { + check_err(unsafe { + libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event) + })?; + Ok(()) +} + +/// libc::epoll_wait wrapper +pub fn epoll_wait(epoll_fd: i32, events: *mut libc::epoll_event, maxevents: libc::c_int, timeout: libc::c_int) -> Result<()> { + check_err(unsafe { + libc::epoll_wait(epoll_fd, events, maxevents, timeout) + })?; + Ok(()) +} + +/// libc::read wrapper +pub fn read(timer_fd: i32, bufptr: *mut libc::c_void, count: libc::size_t) -> Result<()> { + check_err(unsafe { + libc::read(timer_fd, bufptr, count) + })?; + Ok(()) +} From e84ac2dac6431ee1ad455edd208c8476b9249fbe Mon Sep 17 00:00:00 2001 From: Abid Omar Date: Fri, 14 Jan 2022 18:47:25 +0100 Subject: [PATCH 3/3] imp(timer): wrap return for initialize function --- examples/internals-timer.rs | 10 ++++++++- src/pyroscope.rs | 2 +- src/timer/epoll.rs | 45 ++++++++++++++++++++++++------------- src/timer/kqueue.rs | 4 ++-- src/timer/sleep.rs | 4 ++-- 5 files changed, 43 insertions(+), 22 deletions(-) diff --git a/examples/internals-timer.rs b/examples/internals-timer.rs index 309bd5c3..b851a83e 100644 --- a/examples/internals-timer.rs +++ b/examples/internals-timer.rs @@ -13,7 +13,7 @@ use pyroscope::timer::Timer; fn main() { // Initialize the Timer - let mut timer = Timer::default().initialize(); + let mut timer = Timer::default().initialize().unwrap(); // Create a streaming channel let (tx, rx): (Sender, Receiver) = channel(); @@ -24,6 +24,14 @@ fn main() { timer.attach_listener(tx).unwrap(); timer.attach_listener(tx2).unwrap(); + // Show current time + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + + println!("Current Time: {}", now); + // Listen to the Timer events std::thread::spawn(move || { while let result = rx.recv() { diff --git a/src/pyroscope.rs b/src/pyroscope.rs index 7b8736f8..f9e25320 100644 --- a/src/pyroscope.rs +++ b/src/pyroscope.rs @@ -123,7 +123,7 @@ impl PyroscopeAgentBuilder { backend.lock()?.initialize(self.config.sample_rate)?; // Start Timer - let timer = Timer::default().initialize(); + let timer = Timer::default().initialize()?; // Return PyroscopeAgent Ok(PyroscopeAgent { diff --git a/src/timer/epoll.rs b/src/timer/epoll.rs index 2823f699..6a18c271 100644 --- a/src/timer/epoll.rs +++ b/src/timer/epoll.rs @@ -20,23 +20,22 @@ pub struct Timer { } impl Timer { - pub fn initialize(self) -> Self { + pub fn initialize(self) -> Result { let txs = Arc::clone(&self.txs); - // TODO: use ? instead of unwrap, requires changing Timer initialize function - let timer_fd = Timer::set_timerfd().unwrap(); - let epoll_fd = Timer::create_epollfd(timer_fd).unwrap(); + let timer_fd = Timer::set_timerfd()?; + let epoll_fd = Timer::create_epollfd(timer_fd)?; let handle = Some(thread::spawn(move || { loop { // Exit thread if there are no listeners if txs.lock()?.len() == 0 { - // TODO: should close file descriptor + // TODO: should close file descriptors? return Ok(()); } // Fire @ 10th sec - Timer::epoll_wait(timer_fd, epoll_fd).unwrap(); + Timer::epoll_wait(timer_fd, epoll_fd)?; // Get current time let current = std::time::SystemTime::now() @@ -51,23 +50,27 @@ impl Timer { } })); - Self { handle, ..self } + Ok(Self { handle, ..self }) } + /// create and set a timer file descriptor fn set_timerfd() -> Result { + // Set the timer to use the system time. let clockid: libc::clockid_t = libc::CLOCK_REALTIME; + // Non-blocking file descriptor let clock_flags: libc::c_int = libc::TFD_NONBLOCK; - let tfd = timerfd_create(clockid, clock_flags).unwrap(); + // Create timer fd + let tfd = timerfd_create(clockid, clock_flags)?; + // Get the next event time let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)? .as_secs(); - let rem = 10u64.checked_sub(now.checked_rem(10).unwrap()).unwrap(); - let first_fire = now + rem; + // new_value sets the Timer let mut new_value = libc::itimerspec { it_interval: libc::timespec { tv_sec: 10, @@ -79,6 +82,7 @@ impl Timer { }, }; + // Empty itimerspec object let mut old_value = libc::itimerspec { it_interval: libc::timespec { tv_sec: 0, @@ -92,14 +96,19 @@ impl Timer { let set_flags = libc::TFD_TIMER_ABSTIME; - timerfd_settime(tfd, set_flags, &mut new_value, &mut old_value).unwrap(); + // Set the timer + timerfd_settime(tfd, set_flags, &mut new_value, &mut old_value)?; + // Return file descriptor Ok(tfd) } + /// Create a new epoll file descriptor and add the timer to its interests fn create_epollfd(timer_fd: libc::c_int) -> Result { - let epoll_fd = epoll_create1(0).unwrap(); + // create a new epoll fd + let epoll_fd = epoll_create1(0)?; + // event to pull let mut event = libc::epoll_event { events: libc::EPOLLIN as u32, u64: 1, @@ -107,20 +116,24 @@ impl Timer { let epoll_flags = libc::EPOLL_CTL_ADD; - epoll_ctl(epoll_fd, epoll_flags, timer_fd, &mut event).unwrap(); + // add event to the epoll + epoll_ctl(epoll_fd, epoll_flags, timer_fd, &mut event)?; + // return epoll fd Ok(epoll_fd) } fn epoll_wait(timer_fd: libc::c_int, epoll_fd: libc::c_int) -> Result<()> { + // vector to store events let mut events = Vec::with_capacity(1); - epoll_wait(epoll_fd, events.as_mut_ptr(), 1, -1).unwrap(); + // wait for the timer to fire an event. This is function will block. + epoll_wait(epoll_fd, events.as_mut_ptr(), 1, -1)?; + // read the value from the timerfd. This is required to re-arm the timer. let mut buffer: u64 = 0; let bufptr: *mut _ = &mut buffer; - - read(timer_fd, bufptr as *mut libc::c_void, 8).unwrap(); + read(timer_fd, bufptr as *mut libc::c_void, 8)?; Ok(()) } diff --git a/src/timer/kqueue.rs b/src/timer/kqueue.rs index d3e7dca4..22f43715 100644 --- a/src/timer/kqueue.rs +++ b/src/timer/kqueue.rs @@ -19,8 +19,8 @@ pub struct Timer { } impl Timer { - pub fn initialize(self) -> Self { - self + pub fn initialize(self) -> Result { + Ok(self) } /// Attach an mpsc::Sender to Timer diff --git a/src/timer/sleep.rs b/src/timer/sleep.rs index 23598fc0..e4b9cb52 100644 --- a/src/timer/sleep.rs +++ b/src/timer/sleep.rs @@ -28,7 +28,7 @@ pub struct Timer { impl Timer { /// Initialize Timer and run a thread to send events to attached listeners - pub fn initialize(self) -> Self { + pub fn initialize(self) -> Result { let txs = Arc::clone(&self.txs); // Add tx @@ -69,7 +69,7 @@ impl Timer { } })); - Self { handle, ..self } + Ok(Self { handle, ..self }) } /// Attach an mpsc::Sender to Timer