diff --git a/Cargo.toml b/Cargo.toml index da2db602..4285108b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,12 +15,14 @@ readme = "README.md" [dependencies] thiserror ="1.0" +log = "0.4" reqwest = {version = "0.11", features = ["blocking"]} pprof = { version="0.6.2"} libc = "^0.2.66" [dev-dependencies] tokio = { version = "1.13", features = ["full"] } +pretty_env_logger = "0.4.0" [profile.dev] opt-level=0 diff --git a/examples/basic.rs b/examples/basic.rs index ca685565..b11db348 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -20,13 +20,36 @@ fn main() -> Result<()> { .tags(&[("TagA", "ValueA"), ("TagB", "ValueB")]) .build()?; + // Show start time + let start = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + println!("Start Time: {}", start); + // Start Agent agent.start()?; let _result = fibonacci(47); + // Show stop time + let stop = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + println!("Stop Time: {}", stop); + // Stop Agent agent.stop()?; + drop(agent); + + // Show program exit time + let exit = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + println!("Exit Time: {}", exit); + Ok(()) } diff --git a/examples/with-logger.rs b/examples/with-logger.rs new file mode 100644 index 00000000..a5406fc8 --- /dev/null +++ b/examples/with-logger.rs @@ -0,0 +1,41 @@ +// Copyright 2021 Developers of Pyroscope. + +// Licensed under the Apache License, Version 2.0 . This file may not be copied, modified, or distributed +// except according to those terms. + +extern crate pyroscope; + +use log::{debug, error, info, trace, warn}; + +use pyroscope::{PyroscopeAgent, Result}; + +fn fibonacci(n: u64) -> u64 { + match n { + 0 | 1 => 1, + n => fibonacci(n - 1) + fibonacci(n - 2), + } +} + +fn main() -> Result<()> { + // Force rustc to display the log messages in the console. + std::env::set_var("RUST_LOG", "trace"); + + // Initialize the logger. + pretty_env_logger::init_timed(); + + info!("With Logger example"); + + // Create a new agent. + let mut agent = PyroscopeAgent::builder("http://localhost:4040", "example.logger").build()?; + + // Start Agent + agent.start()?; + + let _result = fibonacci(47); + + // Stop Agent + agent.stop()?; + + Ok(()) +} diff --git a/src/pyroscope.rs b/src/pyroscope.rs index 8538d85d..06b6bb9d 100644 --- a/src/pyroscope.rs +++ b/src/pyroscope.rs @@ -15,6 +15,8 @@ use crate::backends::pprof::Pprof; use crate::backends::Backend; use crate::error::Result; use crate::session::Session; +use crate::session::SessionManager; +use crate::session::SessionSignal; use crate::timer::Timer; /// Represent PyroscopeAgent Configuration @@ -93,7 +95,9 @@ impl PyroscopeAgentBuilder { /// Set the agent backend. Default is pprof. pub fn backend(self, backend: T) -> Self - where T: Backend { + where + T: Backend, + { Self { backend: Arc::new(Mutex::new(backend)), ..self @@ -121,15 +125,22 @@ impl PyroscopeAgentBuilder { // Initiliaze the backend let backend = Arc::clone(&self.backend); backend.lock()?.initialize(self.config.sample_rate)?; + log::trace!("PyroscopeAgent - Backend initialized"); // Start Timer let timer = Timer::default().initialize()?; + log::trace!("PyroscopeAgent - Timer initialized"); + + // Start the SessionManager + let session_manager = SessionManager::new()?; + log::trace!("PyroscopeAgent - SessionManager initialized"); // Return PyroscopeAgent Ok(PyroscopeAgent { backend: self.backend, config: self.config, timer, + session_manager, tx: None, handle: None, running: Arc::new((Mutex::new(false), Condvar::new())), @@ -142,6 +153,7 @@ impl PyroscopeAgentBuilder { pub struct PyroscopeAgent { pub backend: Arc>, timer: Timer, + session_manager: SessionManager, tx: Option>, handle: Option>>, running: Arc<(Mutex, Condvar)>, @@ -153,9 +165,29 @@ pub struct PyroscopeAgent { impl Drop for PyroscopeAgent { /// Properly shutdown the agent. fn drop(&mut self) { + log::debug!("PyroscopeAgent::drop()"); + // Stop Timer self.timer.drop_listeners().unwrap(); // Drop listeners + log::trace!("PyroscopeAgent - Dropped timer listeners"); self.timer.handle.take().unwrap().join().unwrap().unwrap(); // Wait for the Timer thread to finish + log::trace!("PyroscopeAgent - Dropped timer thread"); + + // Stop the SessionManager + self.session_manager.push(SessionSignal::Kill).unwrap(); + log::trace!("PyroscopeAgent - Sent kill signal to SessionManager"); + self.session_manager + .handle + .take() + .unwrap() + .join() + .unwrap() + .unwrap(); + log::trace!("PyroscopeAgent - Dropped SessionManager thread"); + + // Wait for main thread to finish + self.handle.take().unwrap().join().unwrap().unwrap(); + log::trace!("PyroscopeAgent - Dropped main thread"); } } @@ -168,6 +200,8 @@ impl PyroscopeAgent { /// Start profiling and sending data. The agent will keep running until stopped. pub fn start(&mut self) -> Result<()> { + log::debug!("PyroscopeAgent - Starting"); + // Create a clone of Backend let backend = Arc::clone(&self.backend); // Call start() @@ -188,13 +222,27 @@ impl PyroscopeAgent { let config = self.config.clone(); + let stx = self.session_manager.tx.clone(); + self.handle = Some(std::thread::spawn(move || { + log::trace!("PyroscopeAgent - Main Thread started"); + while let Ok(time) = rx.recv() { + log::trace!("PyroscopeAgent - Sending session {}", time); + + // Generate report from backend let report = backend.lock()?.report()?; - // start a new session - Session::new(time, config.clone(), report)?.send()?; + + // Send new Session to SessionManager + stx.send(SessionSignal::Session(Session::new( + time, + config.clone(), + report, + )?))?; if time == 0 { + log::trace!("PyroscopeAgent - Session Killed"); + let (lock, cvar) = &*pair; let mut running = lock.lock()?; *running = false; @@ -212,6 +260,7 @@ impl PyroscopeAgent { /// Stop the agent. pub fn stop(&mut self) -> Result<()> { + log::debug!("PyroscopeAgent - Stopping"); // get tx and send termination signal self.tx.take().unwrap().send(0)?; @@ -230,6 +279,7 @@ impl PyroscopeAgent { /// Add tags. This will restart the agent. pub fn add_tags(&mut self, tags: &[(&str, &str)]) -> Result<()> { + log::debug!("PyroscopeAgent - Adding tags"); // Stop Agent self.stop()?; @@ -251,6 +301,7 @@ impl PyroscopeAgent { /// Remove tags. This will restart the agent. pub fn remove_tags(&mut self, tags: &[&str]) -> Result<()> { + log::debug!("PyroscopeAgent - Removing tags"); // Stop Agent self.stop()?; diff --git a/src/session.rs b/src/session.rs index 231d59ae..a4f8caa2 100644 --- a/src/session.rs +++ b/src/session.rs @@ -4,12 +4,72 @@ // https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed // except according to those terms. -use std::{thread, thread::JoinHandle}; +use std::{ + sync::mpsc::{sync_channel, Receiver, SyncSender}, + thread, + thread::JoinHandle, +}; use crate::pyroscope::PyroscopeConfig; use crate::utils::merge_tags_with_app_name; use crate::Result; +/// Session Signal +#[derive(Debug)] +pub enum SessionSignal { + Session(Session), + Kill, +} + +/// SessionManager +#[derive(Debug)] +pub struct SessionManager { + pub handle: Option>>, + pub tx: SyncSender, +} + +impl SessionManager { + /// Create a new SessionManager + pub fn new() -> Result { + log::info!("SessionManager - Creating SessionManager"); + + // Create a channel for sending and receiving sessions + let (tx, rx): (SyncSender, Receiver) = sync_channel(10); + + // Create a thread for the SessionManager + let handle = Some(thread::spawn(move || { + log::trace!("SessionManager - SessionManager thread started"); + while let Ok(signal) = rx.recv() { + match signal { + SessionSignal::Session(session) => { + // Send the session + session.send()?; + log::trace!("SessionManager - Session sent"); + } + SessionSignal::Kill => { + // Kill the session manager + return Ok(()); + log::trace!("SessionManager - Kill signal received"); + } + } + } + Ok(()) + })); + + Ok(SessionManager { handle, tx }) + } + + /// Push a new session into the SessionManager + pub fn push(&self, session: SessionSignal) -> Result<()> { + // Push the session into the SessionManager + self.tx.send(session)?; + + log::trace!("SessionManager - SessionSignal pushed"); + + Ok(()) + } +} + /// Pyroscope Session #[derive(Clone, Debug)] pub struct Session { @@ -21,6 +81,7 @@ pub struct Session { impl Session { pub fn new(mut until: u64, config: PyroscopeConfig, report: Vec) -> Result { + log::info!("Session - Creating Session"); // Session interrupted (0 signal), determine the time if until == 0 { let now = std::time::SystemTime::now() @@ -43,6 +104,8 @@ impl Session { } pub fn send(self) -> Result<()> { + log::info!("Session - Sending Session"); + let _handle: JoinHandle> = thread::spawn(move || { if self.report.is_empty() { return Ok(());