Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions examples/internals-timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ fn main() {
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();

println!("Current Time: {}", now);

// Listen to the Timer events
std::thread::spawn(move || {
#[allow(irrefutable_let_patterns)]
while let result = rx.recv() {
match result {
Ok(time) => println!("Thread 1 Notification: {}", time),
Err(err) => {
Err(_err) => {
println!("Error Thread 1");
break;
}
Expand All @@ -46,10 +46,11 @@ fn main() {
});

std::thread::spawn(move || {
#[allow(irrefutable_let_patterns)]
while let result = rx2.recv() {
match result {
Ok(time) => println!("Thread 2 Notification: {}", time),
Err(err) => {
Err(_err) => {
println!("Error Thread 2");
break;
}
Expand Down
3 changes: 1 addition & 2 deletions src/pyroscope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ impl PyroscopeAgent {
drop(cvar);
drop(running);

// TODO: move this channel to PyroscopeAgent
let (tx, rx): (Sender<u64>, Receiver<u64>) = channel();
self.timer.attach_listener(tx.clone())?;
self.tx = Some(tx.clone());
Expand Down Expand Up @@ -219,7 +218,7 @@ impl PyroscopeAgent {
// Wait for the Thread to finish
let pair = Arc::clone(&self.running);
let (lock, cvar) = &*pair;
cvar.wait_while(lock.lock()?, |running| *running)?;
let _guard = cvar.wait_while(lock.lock()?, |running| *running)?;

// Create a clone of Backend
let backend = Arc::clone(&self.backend);
Expand Down
69 changes: 65 additions & 4 deletions src/timer/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
// https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed
// except according to those terms.

use crate::utils::{epoll_create1, epoll_ctl, epoll_wait, read, timerfd_create, timerfd_settime};
use crate::utils::check_err;
use crate::Result;

use std::sync::{mpsc::Sender, Arc, Mutex};
use std::sync::{
mpsc::{channel, Receiver, Sender},
Arc, Mutex,
};
use std::{thread, thread::JoinHandle};

#[derive(Debug, Default)]
Expand All @@ -23,14 +26,21 @@ impl Timer {
pub fn initialize(self) -> Result<Self> {
let txs = Arc::clone(&self.txs);

// Add Default tx
let (tx, _rx): (Sender<u64>, Receiver<u64>) = channel();
txs.lock()?.push(tx);

let timer_fd = Timer::set_timerfd()?;
let epoll_fd = Timer::create_epollfd(timer_fd)?;

let handle = Some(thread::spawn(move || {
loop {
// Exit thread if there are no listeners
if txs.lock()?.len() == 0 {
// TODO: should close file descriptors?
// Close file descriptors
unsafe { libc::close(timer_fd) };
unsafe { libc::close(epoll_fd) };

return Ok(());
}

Expand All @@ -45,7 +55,10 @@ impl Timer {
// Iterate through Senders
txs.lock()?.iter().for_each(|tx| {
// Send event to attached Sender
tx.send(current).unwrap();
match tx.send(current) {
Ok(_) => {}
Err(_) => {}
}
});
}
}));
Expand Down Expand Up @@ -158,3 +171,51 @@ impl Timer {
Ok(())
}
}

/// Wrapper for libc functions.
///
/// Error wrapper for some libc functions used by the library. This only does
/// Error (-1 return) wrapping. Alternatively, the nix crate could be used
/// instead of expanding this wrappers (if more functions and types are used
/// from libc)

/// libc::timerfd wrapper
pub fn timerfd_create(clockid: libc::clockid_t, clock_flags: libc::c_int) -> Result<i32> {
check_err(unsafe { libc::timerfd_create(clockid, clock_flags) }).map(|timer_fd| timer_fd as i32)
}

/// libc::timerfd_settime wrapper
pub fn timerfd_settime(
timer_fd: i32, set_flags: libc::c_int, new_value: &mut libc::itimerspec,
old_value: &mut libc::itimerspec,
) -> Result<()> {
check_err(unsafe { libc::timerfd_settime(timer_fd, set_flags, new_value, old_value) })?;
Ok(())
}

/// libc::epoll_create1 wrapper
pub fn epoll_create1(epoll_flags: libc::c_int) -> Result<i32> {
check_err(unsafe { libc::epoll_create1(epoll_flags) }).map(|epoll_fd| epoll_fd as i32)
}

/// libc::epoll_ctl wrapper
pub fn epoll_ctl(
epoll_fd: i32, epoll_flags: libc::c_int, timer_fd: i32, event: &mut libc::epoll_event,
) -> Result<()> {
check_err(unsafe { libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event) })?;
Ok(())
}

/// libc::epoll_wait wrapper
pub fn epoll_wait(
epoll_fd: i32, events: *mut libc::epoll_event, maxevents: libc::c_int, timeout: libc::c_int,
) -> Result<()> {
check_err(unsafe { libc::epoll_wait(epoll_fd, events, maxevents, timeout) })?;
Ok(())
}

/// libc::read wrapper
pub fn read(timer_fd: i32, bufptr: *mut libc::c_void, count: libc::size_t) -> Result<()> {
check_err(unsafe { libc::read(timer_fd, bufptr, count) })?;
Ok(())
}
121 changes: 119 additions & 2 deletions src/timer/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
// https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed
// except according to those terms.

use crate::utils::check_err;
use crate::Result;

use std::sync::{mpsc::Sender, Arc, Mutex};
use std::sync::{
mpsc::{channel, Receiver, Sender},
Arc, Mutex,
};
use std::{thread, thread::JoinHandle};

#[derive(Debug, Default)]
Expand All @@ -20,7 +24,50 @@ pub struct Timer {

impl Timer {
pub fn initialize(self) -> Result<Self> {
Ok(self)
let txs = Arc::clone(&self.txs);

// Add Default tx
let (tx, _rx): (Sender<u64>, Receiver<u64>) = channel();
txs.lock()?.push(tx);

let kqueue = kqueue()?;

let handle = Some(thread::spawn(move || {
// Wait for initial expiration
let initial_event = Timer::register_initial_expiration(kqueue)?;
Timer::wait_event(kqueue, [initial_event].as_mut_ptr())?;

// Register loop event
let loop_event = Timer::register_loop_expiration(kqueue)?;

// Loop 10s
loop {
// Exit thread if there are no listeners
if txs.lock()?.len() == 0 {
// TODO: should close file descriptors?
return Ok(());
}

// Get current time
let current = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();

// Iterate through Senders
txs.lock()?.iter().for_each(|tx| {
// Send event to attached Sender
match tx.send(current) {
Ok(_) => {}
Err(_) => {}
}
});

// Wait 10s
Timer::wait_event(kqueue, [loop_event].as_mut_ptr())?;
}
}));

Ok(Self { handle, ..self })
}

/// Attach an mpsc::Sender to Timer
Expand All @@ -42,4 +89,74 @@ impl Timer {

Ok(())
}

fn wait_event(kqueue: i32, events: *mut libc::kevent) -> Result<()> {
kevent(kqueue, [].as_mut_ptr(), 0, events, 1, std::ptr::null())?;
Ok(())
}
fn register_initial_expiration(kqueue: i32) -> Result<libc::kevent> {
// Get the next event time
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
let rem = 10u64.checked_sub(now.checked_rem(10).unwrap()).unwrap();
let first_fire = now + rem;

let initial_event = libc::kevent {
ident: 1,
filter: libc::EVFILT_TIMER,
flags: libc::EV_ADD | libc::EV_ENABLE | libc::EV_ONESHOT,
fflags: libc::NOTE_ABSOLUTE | libc::NOTE_SECONDS,
data: first_fire as isize,
udata: 0 as *mut libc::c_void,
};

// add first event
kevent(
kqueue,
[initial_event].as_ptr() as *const libc::kevent,
1,
[].as_mut_ptr(),
0,
std::ptr::null(),
)?;

Ok(initial_event)
}
fn register_loop_expiration(kqueue: i32) -> Result<libc::kevent> {
let loop_event = libc::kevent {
ident: 1,
filter: libc::EVFILT_TIMER,
flags: libc::EV_ADD | libc::EV_ENABLE,
fflags: 0,
data: 10000,
udata: 0 as *mut libc::c_void,
};

// add loop event
let ke = kevent(
kqueue,
[loop_event].as_ptr() as *const libc::kevent,
1,
[].as_mut_ptr(),
0,
std::ptr::null(),
)?;

Ok(loop_event)
}
}

/// libc::kqueue wrapper
fn kqueue() -> Result<i32> {
check_err(unsafe { libc::kqueue() }).map(|kq| kq as i32)
}

/// libc::kevent wrapper
fn kevent(
kqueue: i32, change: *const libc::kevent, c_count: libc::c_int, events: *mut libc::kevent,
e_count: libc::c_int, timeout: *const libc::timespec,
) -> Result<()> {
check_err(unsafe { libc::kevent(kqueue, change, c_count, events, e_count, timeout) })?;
Ok(())
}
10 changes: 7 additions & 3 deletions src/timer/sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ impl Timer {
pub fn initialize(self) -> Result<Self> {
let txs = Arc::clone(&self.txs);

// Add tx
// txs.lock().unwrap().push(tx);
// Add Default tx
let (tx, _rx): (Sender<u64>, Receiver<u64>) = channel();
txs.lock()?.push(tx);

// Spawn a Thread
let handle = Some(thread::spawn(move || {
Expand Down Expand Up @@ -61,7 +62,10 @@ impl Timer {
// Iterate through Senders
txs.lock()?.iter().for_each(|tx| {
// Send event to attached Sender
tx.send(current).unwrap();
match tx.send(current) {
Ok(_) => {}
Err(_) => {}
}
});

// Sleep for 10s
Expand Down
54 changes: 2 additions & 52 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mod tests {
use crate::utils::merge_tags_with_app_name;

#[test]

fn merge_tags_with_app_name_with_tags() {
let mut tags = HashMap::new();
tags.insert("env".to_string(), "staging".to_string());
Expand All @@ -55,61 +56,10 @@ mod tests {
)
}
}

/// Wrapper for libc functions.
///
/// Error wrapper for some libc functions used by the library. This only does
/// Error (-1 return) wrapping. Alternatively, the nix crate could be used
/// instead of expanding this wrappers (if more functions and types are used
/// from libc)

/// Error Wrapper for libc return. Only check for errors.
fn check_err<T: Ord + Default>(num: T) -> Result<T> {
pub fn check_err<T: Ord + Default>(num: T) -> Result<T> {
if num < T::default() {
return Err(PyroscopeError::from(std::io::Error::last_os_error()));
}
Ok(num)
}

/// libc::timerfd wrapper
pub fn timerfd_create(clockid: libc::clockid_t, clock_flags: libc::c_int) -> Result<i32> {
check_err(unsafe { libc::timerfd_create(clockid, clock_flags) }).map(|timer_fd| timer_fd as i32)
}

/// libc::timerfd_settime wrapper
pub fn timerfd_settime(
timer_fd: i32, set_flags: libc::c_int, new_value: &mut libc::itimerspec,
old_value: &mut libc::itimerspec,
) -> Result<()> {
check_err(unsafe { libc::timerfd_settime(timer_fd, set_flags, new_value, old_value) })?;
Ok(())
}

/// libc::epoll_create1 wrapper
pub fn epoll_create1(epoll_flags: libc::c_int) -> Result<i32> {
check_err(unsafe { libc::epoll_create1(epoll_flags) }).map(|epoll_fd| epoll_fd as i32)
}

/// libc::epoll_ctl wrapper
pub fn epoll_ctl(epoll_fd: i32, epoll_flags: libc::c_int, timer_fd: i32, event: &mut libc::epoll_event) -> Result<()> {
check_err(unsafe {
libc::epoll_ctl(epoll_fd, epoll_flags, timer_fd, event)
})?;
Ok(())
}

/// libc::epoll_wait wrapper
pub fn epoll_wait(epoll_fd: i32, events: *mut libc::epoll_event, maxevents: libc::c_int, timeout: libc::c_int) -> Result<()> {
check_err(unsafe {
libc::epoll_wait(epoll_fd, events, maxevents, timeout)
})?;
Ok(())
}

/// libc::read wrapper
pub fn read(timer_fd: i32, bufptr: *mut libc::c_void, count: libc::size_t) -> Result<()> {
check_err(unsafe {
libc::read(timer_fd, bufptr, count)
})?;
Ok(())
}