Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ readme = "README.md"

[dependencies]
thiserror ="1.0"
log = "0.4"
reqwest = {version = "0.11", features = ["blocking"]}
pprof = { version="0.6.2"}
libc = "^0.2.66"

[dev-dependencies]
tokio = { version = "1.13", features = ["full"] }
pretty_env_logger = "0.4.0"

[profile.dev]
opt-level=0
Expand Down
23 changes: 23 additions & 0 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,36 @@ fn main() -> Result<()> {
.tags(&[("TagA", "ValueA"), ("TagB", "ValueB")])
.build()?;

// Show start time
let start = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
println!("Start Time: {}", start);

// Start Agent
agent.start()?;

let _result = fibonacci(47);

// Show stop time
let stop = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
println!("Stop Time: {}", stop);

// Stop Agent
agent.stop()?;

drop(agent);

// Show program exit time
let exit = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
println!("Exit Time: {}", exit);

Ok(())
}
41 changes: 41 additions & 0 deletions examples/with-logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2021 Developers of Pyroscope.

// Licensed under the Apache License, Version 2.0 <LICENSE or
// https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed
// except according to those terms.

extern crate pyroscope;

use log::{debug, error, info, trace, warn};

use pyroscope::{PyroscopeAgent, Result};

fn fibonacci(n: u64) -> u64 {
match n {
0 | 1 => 1,
n => fibonacci(n - 1) + fibonacci(n - 2),
}
}

fn main() -> Result<()> {
// Force rustc to display the log messages in the console.
std::env::set_var("RUST_LOG", "trace");

// Initialize the logger.
pretty_env_logger::init_timed();

info!("With Logger example");

// Create a new agent.
let mut agent = PyroscopeAgent::builder("http://localhost:4040", "example.logger").build()?;

// Start Agent
agent.start()?;

let _result = fibonacci(47);

// Stop Agent
agent.stop()?;

Ok(())
}
57 changes: 54 additions & 3 deletions src/pyroscope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::backends::pprof::Pprof;
use crate::backends::Backend;
use crate::error::Result;
use crate::session::Session;
use crate::session::SessionManager;
use crate::session::SessionSignal;
use crate::timer::Timer;

/// Represent PyroscopeAgent Configuration
Expand Down Expand Up @@ -93,7 +95,9 @@ impl PyroscopeAgentBuilder {

/// Set the agent backend. Default is pprof.
pub fn backend<T: 'static>(self, backend: T) -> Self
where T: Backend {
where
T: Backend,
{
Self {
backend: Arc::new(Mutex::new(backend)),
..self
Expand Down Expand Up @@ -121,15 +125,22 @@ impl PyroscopeAgentBuilder {
// Initiliaze the backend
let backend = Arc::clone(&self.backend);
backend.lock()?.initialize(self.config.sample_rate)?;
log::trace!("PyroscopeAgent - Backend initialized");

// Start Timer
let timer = Timer::default().initialize()?;
log::trace!("PyroscopeAgent - Timer initialized");

// Start the SessionManager
let session_manager = SessionManager::new()?;
log::trace!("PyroscopeAgent - SessionManager initialized");

// Return PyroscopeAgent
Ok(PyroscopeAgent {
backend: self.backend,
config: self.config,
timer,
session_manager,
tx: None,
handle: None,
running: Arc::new((Mutex::new(false), Condvar::new())),
Expand All @@ -142,6 +153,7 @@ impl PyroscopeAgentBuilder {
pub struct PyroscopeAgent {
pub backend: Arc<Mutex<dyn Backend>>,
timer: Timer,
session_manager: SessionManager,
tx: Option<Sender<u64>>,
handle: Option<JoinHandle<Result<()>>>,
running: Arc<(Mutex<bool>, Condvar)>,
Expand All @@ -153,9 +165,29 @@ pub struct PyroscopeAgent {
impl Drop for PyroscopeAgent {
/// Properly shutdown the agent.
fn drop(&mut self) {
log::debug!("PyroscopeAgent::drop()");

// Stop Timer
self.timer.drop_listeners().unwrap(); // Drop listeners
log::trace!("PyroscopeAgent - Dropped timer listeners");
self.timer.handle.take().unwrap().join().unwrap().unwrap(); // Wait for the Timer thread to finish
log::trace!("PyroscopeAgent - Dropped timer thread");

// Stop the SessionManager
self.session_manager.push(SessionSignal::Kill).unwrap();
log::trace!("PyroscopeAgent - Sent kill signal to SessionManager");
self.session_manager
.handle
.take()
.unwrap()
.join()
.unwrap()
.unwrap();
log::trace!("PyroscopeAgent - Dropped SessionManager thread");

// Wait for main thread to finish
self.handle.take().unwrap().join().unwrap().unwrap();
log::trace!("PyroscopeAgent - Dropped main thread");
}
}

Expand All @@ -168,6 +200,8 @@ impl PyroscopeAgent {

/// Start profiling and sending data. The agent will keep running until stopped.
pub fn start(&mut self) -> Result<()> {
log::debug!("PyroscopeAgent - Starting");

// Create a clone of Backend
let backend = Arc::clone(&self.backend);
// Call start()
Expand All @@ -188,13 +222,27 @@ impl PyroscopeAgent {

let config = self.config.clone();

let stx = self.session_manager.tx.clone();

self.handle = Some(std::thread::spawn(move || {
log::trace!("PyroscopeAgent - Main Thread started");

while let Ok(time) = rx.recv() {
log::trace!("PyroscopeAgent - Sending session {}", time);

// Generate report from backend
let report = backend.lock()?.report()?;
// start a new session
Session::new(time, config.clone(), report)?.send()?;

// Send new Session to SessionManager
stx.send(SessionSignal::Session(Session::new(
time,
config.clone(),
report,
)?))?;

if time == 0 {
log::trace!("PyroscopeAgent - Session Killed");

let (lock, cvar) = &*pair;
let mut running = lock.lock()?;
*running = false;
Expand All @@ -212,6 +260,7 @@ impl PyroscopeAgent {

/// Stop the agent.
pub fn stop(&mut self) -> Result<()> {
log::debug!("PyroscopeAgent - Stopping");
// get tx and send termination signal
self.tx.take().unwrap().send(0)?;

Expand All @@ -230,6 +279,7 @@ impl PyroscopeAgent {

/// Add tags. This will restart the agent.
pub fn add_tags(&mut self, tags: &[(&str, &str)]) -> Result<()> {
log::debug!("PyroscopeAgent - Adding tags");
// Stop Agent
self.stop()?;

Expand All @@ -251,6 +301,7 @@ impl PyroscopeAgent {

/// Remove tags. This will restart the agent.
pub fn remove_tags(&mut self, tags: &[&str]) -> Result<()> {
log::debug!("PyroscopeAgent - Removing tags");
// Stop Agent
self.stop()?;

Expand Down
65 changes: 64 additions & 1 deletion src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,72 @@
// https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed
// except according to those terms.

use std::{thread, thread::JoinHandle};
use std::{
sync::mpsc::{sync_channel, Receiver, SyncSender},
thread,
thread::JoinHandle,
};

use crate::pyroscope::PyroscopeConfig;
use crate::utils::merge_tags_with_app_name;
use crate::Result;

/// Session Signal
#[derive(Debug)]
pub enum SessionSignal {
Session(Session),
Kill,
}

/// SessionManager
#[derive(Debug)]
pub struct SessionManager {
pub handle: Option<JoinHandle<Result<()>>>,
pub tx: SyncSender<SessionSignal>,
}

impl SessionManager {
/// Create a new SessionManager
pub fn new() -> Result<Self> {
log::info!("SessionManager - Creating SessionManager");

// Create a channel for sending and receiving sessions
let (tx, rx): (SyncSender<SessionSignal>, Receiver<SessionSignal>) = sync_channel(10);

// Create a thread for the SessionManager
let handle = Some(thread::spawn(move || {
log::trace!("SessionManager - SessionManager thread started");
while let Ok(signal) = rx.recv() {
match signal {
SessionSignal::Session(session) => {
// Send the session
session.send()?;
log::trace!("SessionManager - Session sent");
}
SessionSignal::Kill => {
// Kill the session manager
return Ok(());
log::trace!("SessionManager - Kill signal received");
}
}
}
Ok(())
}));

Ok(SessionManager { handle, tx })
}

/// Push a new session into the SessionManager
pub fn push(&self, session: SessionSignal) -> Result<()> {
// Push the session into the SessionManager
self.tx.send(session)?;

log::trace!("SessionManager - SessionSignal pushed");

Ok(())
}
}

/// Pyroscope Session
#[derive(Clone, Debug)]
pub struct Session {
Expand All @@ -21,6 +81,7 @@ pub struct Session {

impl Session {
pub fn new(mut until: u64, config: PyroscopeConfig, report: Vec<u8>) -> Result<Self> {
log::info!("Session - Creating Session");
// Session interrupted (0 signal), determine the time
if until == 0 {
let now = std::time::SystemTime::now()
Expand All @@ -43,6 +104,8 @@ impl Session {
}

pub fn send(self) -> Result<()> {
log::info!("Session - Sending Session");

let _handle: JoinHandle<Result<()>> = thread::spawn(move || {
if self.report.is_empty() {
return Ok(());
Expand Down