From 26a7766026b56c2334d9a544c7f56cb53e7a7ede Mon Sep 17 00:00:00 2001 From: Abid Omar Date: Mon, 17 Jan 2022 11:54:12 +0100 Subject: [PATCH 1/6] refactor(epoll): move libc wrapper functions --- src/timer/epoll.rs | 58 +++++++++++++++++++++++++++++++++++++++++++++ src/utils.rs | 59 ---------------------------------------------- 2 files changed, 58 insertions(+), 59 deletions(-) diff --git a/src/timer/epoll.rs b/src/timer/epoll.rs index 6a18c271..da4647d9 100644 --- a/src/timer/epoll.rs +++ b/src/timer/epoll.rs @@ -158,3 +158,61 @@ impl Timer { Ok(()) } } + +/// Wrapper for libc functions. +/// +/// Error wrapper for some libc functions used by the library. This only does +/// Error (-1 return) wrapping. Alternatively, the nix crate could be used +/// instead of expanding this wrappers (if more functions and types are used +/// from libc) + +/// Error Wrapper for libc return. Only check for errors. +fn check_err(num: T) -> Result { + if num < T::default() { + return Err(PyroscopeError::from(std::io::Error::last_os_error())); + } + Ok(num) +} + +/// libc::timerfd wrapper +pub fn timerfd_create(clockid: libc::clockid_t, clock_flags: libc::c_int) -> Result { + check_err(unsafe { libc::timerfd_create(clockid, clock_flags) }).map(|timer_fd| timer_fd as i32) +} + +/// libc::timerfd_settime wrapper +pub fn timerfd_settime( + timer_fd: i32, set_flags: libc::c_int, new_value: &mut libc::itimerspec, + old_value: &mut libc::itimerspec, +) -> Result<()> { + check_err(unsafe { libc::timerfd_settime(timer_fd, set_flags, new_value, old_value) })?; + Ok(()) +} + +/// libc::epoll_create1 wrapper +pub fn epoll_create1(epoll_flags: libc::c_int) -> Result { + check_err(unsafe { libc::epoll_create1(epoll_flags) }).map(|epoll_fd| epoll_fd as i32) +} + +/// libc::epoll_ctl wrapper +pub fn epoll_ctl(epoll_fd: i32, epoll_flags: libc::c_int, timer_fd: i32, event: &mut libc::epoll_event) -> Result<()> { + check_err(unsafe { + libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event) + })?; + Ok(()) +} + +/// libc::epoll_wait wrapper +pub fn epoll_wait(epoll_fd: i32, events: *mut libc::epoll_event, maxevents: libc::c_int, timeout: libc::c_int) -> Result<()> { + check_err(unsafe { + libc::epoll_wait(epoll_fd, events, maxevents, timeout) + })?; + Ok(()) +} + +/// libc::read wrapper +pub fn read(timer_fd: i32, bufptr: *mut libc::c_void, count: libc::size_t) -> Result<()> { + check_err(unsafe { + libc::read(timer_fd, bufptr, count) + })?; + Ok(()) +} diff --git a/src/utils.rs b/src/utils.rs index 2ff1da71..519cc749 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -5,7 +5,6 @@ // except according to those terms. use crate::error::Result; -use crate::PyroscopeError; use std::collections::HashMap; @@ -55,61 +54,3 @@ mod tests { ) } } - -/// Wrapper for libc functions. -/// -/// Error wrapper for some libc functions used by the library. This only does -/// Error (-1 return) wrapping. Alternatively, the nix crate could be used -/// instead of expanding this wrappers (if more functions and types are used -/// from libc) - -/// Error Wrapper for libc return. Only check for errors. -fn check_err(num: T) -> Result { - if num < T::default() { - return Err(PyroscopeError::from(std::io::Error::last_os_error())); - } - Ok(num) -} - -/// libc::timerfd wrapper -pub fn timerfd_create(clockid: libc::clockid_t, clock_flags: libc::c_int) -> Result { - check_err(unsafe { libc::timerfd_create(clockid, clock_flags) }).map(|timer_fd| timer_fd as i32) -} - -/// libc::timerfd_settime wrapper -pub fn timerfd_settime( - timer_fd: i32, set_flags: libc::c_int, new_value: &mut libc::itimerspec, - old_value: &mut libc::itimerspec, -) -> Result<()> { - check_err(unsafe { libc::timerfd_settime(timer_fd, set_flags, new_value, old_value) })?; - Ok(()) -} - -/// libc::epoll_create1 wrapper -pub fn epoll_create1(epoll_flags: libc::c_int) -> Result { - check_err(unsafe { libc::epoll_create1(epoll_flags) }).map(|epoll_fd| epoll_fd as i32) -} - -/// libc::epoll_ctl wrapper -pub fn epoll_ctl(epoll_fd: i32, epoll_flags: libc::c_int, timer_fd: i32, event: &mut libc::epoll_event) -> Result<()> { - check_err(unsafe { - libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event) - })?; - Ok(()) -} - -/// libc::epoll_wait wrapper -pub fn epoll_wait(epoll_fd: i32, events: *mut libc::epoll_event, maxevents: libc::c_int, timeout: libc::c_int) -> Result<()> { - check_err(unsafe { - libc::epoll_wait(epoll_fd, events, maxevents, timeout) - })?; - Ok(()) -} - -/// libc::read wrapper -pub fn read(timer_fd: i32, bufptr: *mut libc::c_void, count: libc::size_t) -> Result<()> { - check_err(unsafe { - libc::read(timer_fd, bufptr, count) - })?; - Ok(()) -} From e5966368c256ef8af5c70ee90d0a8c68c4270726 Mon Sep 17 00:00:00 2001 From: Abid Omar Date: Tue, 18 Jan 2022 21:35:49 +0100 Subject: [PATCH 2/6] imp(timer): kqueue timer implementation --- src/timer/kqueue.rs | 107 +++++++++++++++++++++++++++++++++++++++++++- src/utils.rs | 9 ++++ 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/src/timer/kqueue.rs b/src/timer/kqueue.rs index 22f43715..b04985db 100644 --- a/src/timer/kqueue.rs +++ b/src/timer/kqueue.rs @@ -4,6 +4,7 @@ // https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed // except according to those terms. +use crate::utils::check_err; use crate::Result; use std::sync::{mpsc::Sender, Arc, Mutex}; @@ -20,7 +21,43 @@ pub struct Timer { impl Timer { pub fn initialize(self) -> Result { - Ok(self) + let txs = Arc::clone(&self.txs); + + let kqueue = kqueue()?; + + let handle = Some(thread::spawn(move || { + // Wait for initial expiration + let initial_event = Timer::register_initial_expiration(kqueue)?; + Timer::wait_event(kqueue, [initial_event].as_mut_ptr())?; + + // Register loop event + let loop_event = Timer::register_loop_expiration(kqueue)?; + + // Loop 10s + loop { + // Exit thread if there are no listeners + if txs.lock()?.len() == 0 { + // TODO: should close file descriptors? + return Ok(()); + } + + // Get current time + let current = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs(); + + // Iterate through Senders + txs.lock()?.iter().for_each(|tx| { + // Send event to attached Sender + tx.send(current).unwrap(); + }); + + // Wait 10s + Timer::wait_event(kqueue, [loop_event].as_mut_ptr())?; + } + })); + + Ok(Self { handle, ..self }) } /// Attach an mpsc::Sender to Timer @@ -42,4 +79,72 @@ impl Timer { Ok(()) } + + fn wait_event(kqueue: i32, events: *mut libc::kevent) -> Result<()> { + kevent(kqueue, [].as_mut_ptr(), 0, events, 1, std::ptr::null())?; + Ok(()) + } + fn register_initial_expiration(kqueue: i32) -> Result { + // Get the next event time + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_secs(); + let rem = 10u64.checked_sub(now.checked_rem(10).unwrap()).unwrap(); + let first_fire = now + rem; + + let initial_event = libc::kevent { + ident: 1, + filter: libc::EVFILT_TIMER, + flags: libc::EV_ADD | libc::EV_ENABLE | libc::EV_ONESHOT, + fflags: libc::NOTE_ABSOLUTE | libc::NOTE_SECONDS, + data: first_fire as isize, + udata: 0 as *mut libc::c_void, + }; + + // add first event + kevent( + kqueue, + [initial_event].as_ptr() as *const libc::kevent, + 1, + [].as_mut_ptr(), + 0, + std::ptr::null(), + )?; + + Ok(initial_event) + } + fn register_loop_expiration(kqueue: i32) -> Result { + let loop_event = libc::kevent { + ident: 1, + filter: libc::EVFILT_TIMER, + flags: libc::EV_ADD | libc::EV_ENABLE, + fflags: 0, + data: 10000, + udata: 0 as *mut libc::c_void, + }; + + // add loop event + let ke = kevent( + kqueue, + [loop_event].as_ptr() as *const libc::kevent, + 1, + [].as_mut_ptr(), + 0, + std::ptr::null(), + )?; + + Ok(loop_event) + } +} + +fn kqueue() -> Result { + check_err(unsafe { libc::kqueue() }).map(|kq| kq as i32) +} + +fn kevent( + kqueue: i32, change: *const libc::kevent, c_count: libc::c_int, events: *mut libc::kevent, + e_count: libc::c_int, timeout: *const libc::timespec, +) -> Result<()> { + check_err(unsafe { libc::kevent(kqueue, change, c_count, events, e_count, timeout) })?; + Ok(()) } diff --git a/src/utils.rs b/src/utils.rs index 519cc749..5a182f4b 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -5,6 +5,7 @@ // except according to those terms. use crate::error::Result; +use crate::PyroscopeError; use std::collections::HashMap; @@ -35,6 +36,7 @@ mod tests { use crate::utils::merge_tags_with_app_name; #[test] + fn merge_tags_with_app_name_with_tags() { let mut tags = HashMap::new(); tags.insert("env".to_string(), "staging".to_string()); @@ -54,3 +56,10 @@ mod tests { ) } } +/// Error Wrapper for libc return. Only check for errors. +pub fn check_err(num: T) -> Result { + if num < T::default() { + return Err(PyroscopeError::from(std::io::Error::last_os_error())); + } + Ok(num) +} From 4f85ac6afcc863943c1fb0e501a90678c7d6f3dd Mon Sep 17 00:00:00 2001 From: Abid Omar Date: Wed, 19 Jan 2022 13:28:41 +0100 Subject: [PATCH 3/6] fix(epoll): remove check_err function --- src/timer/epoll.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/timer/epoll.rs b/src/timer/epoll.rs index da4647d9..86c075be 100644 --- a/src/timer/epoll.rs +++ b/src/timer/epoll.rs @@ -4,8 +4,8 @@ // https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed // except according to those terms. -use crate::utils::{epoll_create1, epoll_ctl, epoll_wait, read, timerfd_create, timerfd_settime}; use crate::Result; +use crate::utils::check_err; use std::sync::{mpsc::Sender, Arc, Mutex}; use std::{thread, thread::JoinHandle}; @@ -166,14 +166,6 @@ impl Timer { /// instead of expanding this wrappers (if more functions and types are used /// from libc) -/// Error Wrapper for libc return. Only check for errors. -fn check_err(num: T) -> Result { - if num < T::default() { - return Err(PyroscopeError::from(std::io::Error::last_os_error())); - } - Ok(num) -} - /// libc::timerfd wrapper pub fn timerfd_create(clockid: libc::clockid_t, clock_flags: libc::c_int) -> Result { check_err(unsafe { libc::timerfd_create(clockid, clock_flags) }).map(|timer_fd| timer_fd as i32) From 0d1182c2aace6c965563d24b277b4843e832b7f6 Mon Sep 17 00:00:00 2001 From: Abid Omar Date: Wed, 19 Jan 2022 13:36:20 +0100 Subject: [PATCH 4/6] fix(warning): fix minor warnings --- examples/internals-timer.rs | 7 ++++--- src/pyroscope.rs | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/examples/internals-timer.rs b/examples/internals-timer.rs index b851a83e..fd075e9c 100644 --- a/examples/internals-timer.rs +++ b/examples/internals-timer.rs @@ -29,15 +29,15 @@ fn main() { .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs(); - println!("Current Time: {}", now); // Listen to the Timer events std::thread::spawn(move || { + #[allow(irrefutable_let_patterns)] while let result = rx.recv() { match result { Ok(time) => println!("Thread 1 Notification: {}", time), - Err(err) => { + Err(_err) => { println!("Error Thread 1"); break; } @@ -46,10 +46,11 @@ fn main() { }); std::thread::spawn(move || { + #[allow(irrefutable_let_patterns)] while let result = rx2.recv() { match result { Ok(time) => println!("Thread 2 Notification: {}", time), - Err(err) => { + Err(_err) => { println!("Error Thread 2"); break; } diff --git a/src/pyroscope.rs b/src/pyroscope.rs index f9e25320..badd1001 100644 --- a/src/pyroscope.rs +++ b/src/pyroscope.rs @@ -219,7 +219,7 @@ impl PyroscopeAgent { // Wait for the Thread to finish let pair = Arc::clone(&self.running); let (lock, cvar) = &*pair; - cvar.wait_while(lock.lock()?, |running| *running)?; + let _guard = cvar.wait_while(lock.lock()?, |running| *running)?; // Create a clone of Backend let backend = Arc::clone(&self.backend); From cf0abd18f32f91ab54aedb1fe3fe3f93b1247727 Mon Sep 17 00:00:00 2001 From: Abid Omar Date: Wed, 19 Jan 2022 19:00:57 +0100 Subject: [PATCH 5/6] fix(timer): add default channel to Timer --- src/timer/epoll.rs | 41 ++++++++++++++++++++++++++--------------- src/timer/kqueue.rs | 11 ++++++++++- src/timer/sleep.rs | 10 +++++++--- 3 files changed, 43 insertions(+), 19 deletions(-) diff --git a/src/timer/epoll.rs b/src/timer/epoll.rs index 86c075be..0c931c9d 100644 --- a/src/timer/epoll.rs +++ b/src/timer/epoll.rs @@ -4,10 +4,13 @@ // https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed // except according to those terms. -use crate::Result; use crate::utils::check_err; +use crate::Result; -use std::sync::{mpsc::Sender, Arc, Mutex}; +use std::sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, +}; use std::{thread, thread::JoinHandle}; #[derive(Debug, Default)] @@ -23,6 +26,10 @@ impl Timer { pub fn initialize(self) -> Result { let txs = Arc::clone(&self.txs); + // Add Default tx + let (tx, _rx): (Sender, Receiver) = channel(); + txs.lock()?.push(tx); + let timer_fd = Timer::set_timerfd()?; let epoll_fd = Timer::create_epollfd(timer_fd)?; @@ -30,7 +37,10 @@ impl Timer { loop { // Exit thread if there are no listeners if txs.lock()?.len() == 0 { - // TODO: should close file descriptors? + // Close file descriptors + unsafe { libc::close(timer_fd) }; + unsafe { libc::close(epoll_fd) }; + return Ok(()); } @@ -45,7 +55,10 @@ impl Timer { // Iterate through Senders txs.lock()?.iter().for_each(|tx| { // Send event to attached Sender - tx.send(current).unwrap(); + match tx.send(current) { + Ok(_) => {} + Err(_) => {} + } }); } })); @@ -186,25 +199,23 @@ pub fn epoll_create1(epoll_flags: libc::c_int) -> Result { } /// libc::epoll_ctl wrapper -pub fn epoll_ctl(epoll_fd: i32, epoll_flags: libc::c_int, timer_fd: i32, event: &mut libc::epoll_event) -> Result<()> { - check_err(unsafe { - libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event) - })?; +pub fn epoll_ctl( + epoll_fd: i32, epoll_flags: libc::c_int, timer_fd: i32, event: &mut libc::epoll_event, +) -> Result<()> { + check_err(unsafe { libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event) })?; Ok(()) } /// libc::epoll_wait wrapper -pub fn epoll_wait(epoll_fd: i32, events: *mut libc::epoll_event, maxevents: libc::c_int, timeout: libc::c_int) -> Result<()> { - check_err(unsafe { - libc::epoll_wait(epoll_fd, events, maxevents, timeout) - })?; +pub fn epoll_wait( + epoll_fd: i32, events: *mut libc::epoll_event, maxevents: libc::c_int, timeout: libc::c_int, +) -> Result<()> { + check_err(unsafe { libc::epoll_wait(epoll_fd, events, maxevents, timeout) })?; Ok(()) } /// libc::read wrapper pub fn read(timer_fd: i32, bufptr: *mut libc::c_void, count: libc::size_t) -> Result<()> { - check_err(unsafe { - libc::read(timer_fd, bufptr, count) - })?; + check_err(unsafe { libc::read(timer_fd, bufptr, count) })?; Ok(()) } diff --git a/src/timer/kqueue.rs b/src/timer/kqueue.rs index b04985db..046d7fef 100644 --- a/src/timer/kqueue.rs +++ b/src/timer/kqueue.rs @@ -23,6 +23,10 @@ impl Timer { pub fn initialize(self) -> Result { let txs = Arc::clone(&self.txs); + // Add Default tx + let (tx, _rx): (Sender, Receiver) = channel(); + txs.lock()?.push(tx); + let kqueue = kqueue()?; let handle = Some(thread::spawn(move || { @@ -49,7 +53,10 @@ impl Timer { // Iterate through Senders txs.lock()?.iter().for_each(|tx| { // Send event to attached Sender - tx.send(current).unwrap(); + match tx.send(current) { + Ok(_) => {}, + Err(_) => {}, + } }); // Wait 10s @@ -137,10 +144,12 @@ impl Timer { } } +/// libc::kqueue wrapper fn kqueue() -> Result { check_err(unsafe { libc::kqueue() }).map(|kq| kq as i32) } +/// libc::kevent wrapper fn kevent( kqueue: i32, change: *const libc::kevent, c_count: libc::c_int, events: *mut libc::kevent, e_count: libc::c_int, timeout: *const libc::timespec, diff --git a/src/timer/sleep.rs b/src/timer/sleep.rs index e4b9cb52..86227e9b 100644 --- a/src/timer/sleep.rs +++ b/src/timer/sleep.rs @@ -31,8 +31,9 @@ impl Timer { pub fn initialize(self) -> Result { let txs = Arc::clone(&self.txs); - // Add tx - // txs.lock().unwrap().push(tx); + // Add Default tx + let (tx, _rx): (Sender, Receiver) = channel(); + txs.lock()?.push(tx); // Spawn a Thread let handle = Some(thread::spawn(move || { @@ -61,7 +62,10 @@ impl Timer { // Iterate through Senders txs.lock()?.iter().for_each(|tx| { // Send event to attached Sender - tx.send(current).unwrap(); + match tx.send(current) { + Ok(_) => {} + Err(_) => {} + } }); // Sleep for 10s From 5d9fb73bd3216a42be492a81bf9e08f2be5e64ed Mon Sep 17 00:00:00 2001 From: Abid Omar Date: Wed, 19 Jan 2022 19:10:08 +0100 Subject: [PATCH 6/6] fix(macos): minor fix for macos build --- src/pyroscope.rs | 1 - src/timer/kqueue.rs | 9 ++++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/pyroscope.rs b/src/pyroscope.rs index badd1001..8538d85d 100644 --- a/src/pyroscope.rs +++ b/src/pyroscope.rs @@ -182,7 +182,6 @@ impl PyroscopeAgent { drop(cvar); drop(running); - // TODO: move this channel to PyroscopeAgent let (tx, rx): (Sender, Receiver) = channel(); self.timer.attach_listener(tx.clone())?; self.tx = Some(tx.clone()); diff --git a/src/timer/kqueue.rs b/src/timer/kqueue.rs index 046d7fef..410e0793 100644 --- a/src/timer/kqueue.rs +++ b/src/timer/kqueue.rs @@ -7,7 +7,10 @@ use crate::utils::check_err; use crate::Result; -use std::sync::{mpsc::Sender, Arc, Mutex}; +use std::sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, +}; use std::{thread, thread::JoinHandle}; #[derive(Debug, Default)] @@ -54,8 +57,8 @@ impl Timer { txs.lock()?.iter().for_each(|tx| { // Send event to attached Sender match tx.send(current) { - Ok(_) => {}, - Err(_) => {}, + Ok(_) => {} + Err(_) => {} } });