diff --git a/.github/actions/rust-build/action.yml b/.github/actions/rust-build/action.yml index ed17e72..51f4633 100644 --- a/.github/actions/rust-build/action.yml +++ b/.github/actions/rust-build/action.yml @@ -24,4 +24,8 @@ runs: - name: Run tests shell: bash run: | - cargo test --verbose ${{ inputs.flags }} \ No newline at end of file + cargo test --verbose ${{ inputs.flags }} + if [ "${{ inputs.toolchain }}" == nightly -a "${{ inputs.flags }}" == "--all-features" ]; then + # docs use unstable features, run them on nightly + RUSTDOCFLAGS="-D warnings --cfg docsrs" cargo doc --verbose ${{ inputs.flags }} + fi \ No newline at end of file diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 7a08283..feb3df6 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -10,6 +10,7 @@ jobs: toolchain: - "1.86" # Current MSRV due to 1.85 having problems with the AWS SDK - stable + - nightly flags: - "--all-features" - "--no-default-features" @@ -92,4 +93,4 @@ jobs: - name: Run integration test shell: bash working-directory: tests - run: chmod +x simple pollcatch-decoder && LD_LIBRARY_PATH=$PWD ./integration.sh + run: chmod +x simple pollcatch-decoder && LD_LIBRARY_PATH=$PWD ./integration.sh && LD_LIBRARY_PATH=$PWD ./separate_runtime_integration.sh diff --git a/.github/workflows/format.yml b/.github/workflows/format.yml index 87a1d5c..66c6270 100644 --- a/.github/workflows/format.yml +++ b/.github/workflows/format.yml @@ -27,6 +27,9 @@ jobs: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable - uses: Swatinem/rust-cache@v2 + - name: Install clippy + shell: bash + run: rustup component add clippy - name: Run clippy check id: cargoClippy shell: bash @@ -40,6 +43,9 @@ jobs: - uses: Swatinem/rust-cache@v2 with: cache-directories: decoder + - name: Install clippy + shell: bash + run: rustup component add clippy - name: Run clippy check - decoder id: cargoClippyDecoder shell: bash diff --git a/Cargo.toml b/Cargo.toml index 106fecf..9403322 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,3 +47,8 @@ s3-no-defaults = ["dep:aws-config", "dep:aws-sdk-s3"] aws-metadata = ["aws-metadata-no-defaults", "aws-config/default", "reqwest/rustls-tls"] # A version of the aws-metadata feature that does not enable AWS default features aws-metadata-no-defaults = ["dep:reqwest", "dep:aws-config", "dep:aws-arn"] + +[package.metadata.docs.rs] +all-features = true +targets = ["x86_64-unknown-linux-gnu"] +rustdoc-args = ["--cfg", "docsrs"] \ No newline at end of file diff --git a/README.md b/README.md index 3aca65a..b964f53 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,35 @@ The metadata is not used by the agent directly, and only provided to the reporte [Fargate]: https://aws.amazon.com/fargate [IMDS]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html +### What information the profiler gathers + +Memory samples (JFR `profiler.Malloc`) sample allocated memory every +so many bytes of allocated memory, and are matched by `profiler.Free` +to allow detecting if that memory is not free'd. + +CPU-time samples (JFR `jdk.ExecutionSample`) sample only threads that +are currently running on a CPU, not threads that are sleeping. + +Wall-clock samples (JFR `profiler.WallClockSample`) sample threads +whether they are sleeping or running, and can therefore be +very useful for finding threads that are blocked, for example +on a synchronous lock or a slow system call. + +When using Tokio, since tasks are not threads, tasks that are not +currently running will not be sampled by a wall clock sample. However, +a wall clock sample is still very useful in Tokio, since it is what +you want to catch tasks that are blocking a thread by waiting on +synchronous operations. + +The default is to do a wall-clock sample every second, and a CPU-time +sample every 100 CPU milliseconds. This can be configured via +[`ProfilerOptionsBuilder`]. + +Memory samples are not enabled by default, but can be enabled by [`with_native_mem_bytes`]. + +[`ProfilerOptionsBuilder`]: https://docs.rs/async-profiler-agent/0.1/async_profiler_agent/profiler/struct.ProfilerOptionsBuilder.html +[`with_native_mem_bytes`]: https://docs.rs/async-profiler-agent/0.1/async_profiler_agent/profiler/struct.ProfilerOptionsBuilder.html#method.with_native_mem_bytes + ### PollCatch If you want to find long poll times, and you have `RUSTFLAGS="--cfg tokio_unstable"`, you can diff --git a/examples/simple/main.rs b/examples/simple/main.rs index 4d6f063..7ba7b52 100644 --- a/examples/simple/main.rs +++ b/examples/simple/main.rs @@ -40,6 +40,8 @@ struct S3BucketArgs { } /// Simple program to test the profiler agent +/// +/// This program is intended for test purposes ONLY. #[derive(Debug, Parser)] #[command(group( ArgGroup::new("options") @@ -62,6 +64,9 @@ struct Args { worker_threads: Option, #[arg(long)] native_mem: Option, + /// Use the spawn_thread API instead of the Tokio API (does not demonstrate stopping) + #[arg(long)] + spawn_into_thread: bool, } impl Args { @@ -95,6 +100,16 @@ pub fn main() -> anyhow::Result<()> { rt.block_on(main_internal(args)) } +async fn run_slow(args: &Args) { + if let Some(timeout) = args.duration { + tokio::time::timeout(timeout, slow::run()) + .await + .unwrap_err(); + } else { + slow::run().await; + } +} + async fn main_internal(args: Args) -> Result<(), anyhow::Error> { set_up_tracing(); tracing::info!("main started"); @@ -104,7 +119,7 @@ async fn main_internal(args: Args) -> Result<(), anyhow::Error> { let profiler = match (&args.local, args.s3_bucket_args()) { (Some(local), S3BucketArgs { .. }) => profiler .with_reporter(LocalReporter::new(local)) - .with_custom_agent_metadata(AgentMetadata::Other), + .with_custom_agent_metadata(AgentMetadata::NoMetadata), #[cfg(feature = "s3-no-defaults")] ( _, @@ -133,19 +148,21 @@ async fn main_internal(args: Args) -> Result<(), anyhow::Error> { .with_profiler_options(profiler_options) .build(); - tracing::info!("starting profiler"); - let handle = profiler.spawn_controllable()?; - tracing::info!("profiler started"); - - if let Some(timeout) = args.duration { - tokio::time::timeout(timeout, slow::run()) - .await - .unwrap_err(); + if args.spawn_into_thread { + tracing::info!("starting profiler"); + std::thread::spawn(move || { + profiler.spawn_thread().unwrap(); + }); + run_slow(&args).await; } else { - slow::run().await; - } + tracing::info!("starting profiler"); + let handle = profiler.spawn_controllable()?; + tracing::info!("profiler started"); - handle.stop().await; + run_slow(&args).await; + + handle.stop().await; + } Ok(()) } diff --git a/src/asprof/mod.rs b/src/asprof/mod.rs index 2e7686c..fb145ca 100644 --- a/src/asprof/mod.rs +++ b/src/asprof/mod.rs @@ -108,18 +108,18 @@ impl super::profiler::ProfilerEngine for AsProf { jfr_file_path: &Path, options: &ProfilerOptions, ) -> Result<(), self::AsProfError> { - tracing::debug!("starting the async-profiler and giving JFR file path: {jfr_file_path:?}"); + tracing::debug!("starting profiling session and giving JFR file path: {jfr_file_path:?}"); let args = options.to_args_string(jfr_file_path); Self::asprof_execute(&args)?; - tracing::debug!("async-profiler started successfully"); + tracing::debug!("starting profiling session - success"); Ok(()) } fn stop_async_profiler() -> Result<(), self::AsProfError> { Self::asprof_execute("stop")?; - tracing::debug!("async-profiler stopped successfully"); + tracing::debug!("stopping profiling session - success"); Ok(()) } } diff --git a/src/lib.rs b/src/lib.rs index 05d1095..64e1234 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 #![deny(missing_docs)] +#![cfg_attr(docsrs, feature(doc_auto_cfg))] //! ## async-profiler Rust agent //! An in-process Rust agent for profiling an application using [async-profiler] and uploading the resulting profiles. diff --git a/src/metadata/mod.rs b/src/metadata/mod.rs index a2e0692..7941ceb 100644 --- a/src/metadata/mod.rs +++ b/src/metadata/mod.rs @@ -6,7 +6,7 @@ pub use std::time::Duration; /// Host Metadata, which describes a host that runs a profiling agent. The current set of supported agent metadata is -/// AWS-specific. If you are not running on AWS, you can use [AgentMetadata::Other]. +/// AWS-specific. If you are not running on AWS, you can use [AgentMetadata::NoMetadata]. #[derive(Debug, Clone, PartialEq, Eq)] #[non_exhaustive] pub enum AgentMetadata { @@ -43,7 +43,11 @@ pub enum AgentMetadata { ecs_cluster_arn: String, }, /// Metadata for a host that is neither an EC2 nor a Fargate + #[deprecated = "Use AgentMetadata::NoMetadata"] Other, + /// A placeholder when a host has no metadata, or when a reporter does not + /// use metadata. + NoMetadata, } /// Metadata associated with a specific individual profiling report @@ -66,7 +70,7 @@ pub mod aws; /// [private] dummy metadata to make testing easier #[cfg(test)] pub(crate) const DUMMY_METADATA: ReportMetadata<'static> = ReportMetadata { - instance: &AgentMetadata::Other, + instance: &AgentMetadata::NoMetadata, start: Duration::from_secs(1), end: Duration::from_secs(2), reporting_interval: Duration::from_secs(1), diff --git a/src/profiler.rs b/src/profiler.rs index b03788c..c3883a2 100644 --- a/src/profiler.rs +++ b/src/profiler.rs @@ -6,7 +6,7 @@ use crate::{ asprof::{self, AsProfError}, metadata::{AgentMetadata, ReportMetadata}, - reporter::Reporter, + reporter::{local::LocalReporter, Reporter}, }; use std::{ fs::File, @@ -75,6 +75,7 @@ impl JfrFile { /// Currently supports: /// - Native memory allocation tracking #[derive(Debug, Default)] +#[non_exhaustive] pub struct ProfilerOptions { /// If set, the profiler will collect information about /// native memory allocations. @@ -89,13 +90,21 @@ pub struct ProfilerOptions { /// /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks pub native_mem: Option, + cpu_interval: Option, + wall_clock_millis: Option, } +const DEFAULT_CPU_INTERVAL_NANOS: u128 = 100_000_000; +const DEFAULT_WALL_CLOCK_INTERVAL_MILLIS: u128 = 1_000; + impl ProfilerOptions { /// Convert the profiler options to a string of arguments for the async-profiler. pub fn to_args_string(&self, jfr_file_path: &std::path::Path) -> String { let mut args = format!( - "start,event=cpu,interval=100000000,wall=1000ms,jfr,cstack=dwarf,file={}", + "start,event=cpu,interval={},wall={}ms,jfr,cstack=dwarf,file={}", + self.cpu_interval.unwrap_or(DEFAULT_CPU_INTERVAL_NANOS), + self.wall_clock_millis + .unwrap_or(DEFAULT_WALL_CLOCK_INTERVAL_MILLIS), jfr_file_path.display() ); if let Some(ref native_mem) = self.native_mem { @@ -109,15 +118,50 @@ impl ProfilerOptions { #[derive(Debug, Default)] pub struct ProfilerOptionsBuilder { native_mem: Option, + cpu_interval: Option, + wall_clock_millis: Option, } impl ProfilerOptionsBuilder { - /// If set, the profiler will collect information about - /// native memory allocations. + /// Same as [ProfilerOptionsBuilder::with_native_mem_bytes], but pass + /// the string input directly to async_profiler. /// /// The value is the interval in bytes or in other units, /// if followed by k (kilobytes), m (megabytes), or g (gigabytes). /// + /// Prefer using [ProfilerOptionsBuilder::with_native_mem_bytes], since it's + /// type-checked. + /// + /// ### Examples + /// + /// This will sample allocations for every 10 megabytes allocated: + /// + /// ``` + /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder}; + /// # use async_profiler_agent::profiler::SpawnError; + /// # fn main() -> Result<(), SpawnError> { + /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build(); + /// let profiler = ProfilerBuilder::default() + /// .with_profiler_options(opts) + /// .with_local_reporter("/tmp/profiles") + /// .build(); + /// # if false { // don't spawn the profiler in doctests + /// profiler.spawn()?; + /// # } + /// # Ok(()) + /// # } + /// ``` + pub fn with_native_mem(mut self, native_mem_interval: String) -> Self { + self.native_mem = Some(native_mem_interval); + self + } + + /// If set, the profiler will collect information about + /// native memory allocations. + /// + /// The argument passed is the profiling interval - the profiler will + /// sample allocations every about that many bytes. + /// /// See [ProfilingModes in the async-profiler docs] for more details. /// /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks @@ -129,12 +173,11 @@ impl ProfilerOptionsBuilder { /// ``` /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder}; /// # use async_profiler_agent::profiler::SpawnError; - /// # use async_profiler_agent::reporter::local::LocalReporter; /// # fn main() -> Result<(), SpawnError> { - /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build(); + /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(10_000_000).build(); /// let profiler = ProfilerBuilder::default() /// .with_profiler_options(opts) - /// .with_reporter(LocalReporter::new("/tmp/profiles")) + /// .with_local_reporter("/tmp/profiles") /// .build(); /// # if false { // don't spawn the profiler in doctests /// profiler.spawn()?; @@ -147,12 +190,11 @@ impl ProfilerOptionsBuilder { /// ``` /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder}; /// # use async_profiler_agent::profiler::SpawnError; - /// # use async_profiler_agent::reporter::local::LocalReporter; /// # fn main() -> Result<(), SpawnError> { - /// let opts = ProfilerOptionsBuilder::default().with_native_mem("0".into()).build(); + /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(0).build(); /// let profiler = ProfilerBuilder::default() /// .with_profiler_options(opts) - /// .with_reporter(LocalReporter::new("/tmp/profiles")) + /// .with_local_reporter("/tmp/profiles") /// .build(); /// # if false { // don't spawn the profiler in doctests /// profiler.spawn()?; @@ -160,8 +202,107 @@ impl ProfilerOptionsBuilder { /// # Ok(()) /// # } /// ``` - pub fn with_native_mem(mut self, native_mem_interval: String) -> Self { - self.native_mem = Some(native_mem_interval); + pub fn with_native_mem_bytes(mut self, native_mem_interval: usize) -> Self { + self.native_mem = Some(native_mem_interval.to_string()); + self + } + + /// Sets the interval in which the profiler will collect + /// CPU-time samples, via the [async-profiler `interval` option]. + /// + /// CPU-time samples (JFR `jdk.ExecutionSample`) sample only threads that + /// are currently running on a CPU, not threads that are sleeping. + /// + /// It can use a higher frequency than wall-clock sampling since the + /// number of the threads that are running on a CPU at a given time is + /// naturally limited by the number of CPUs, while the number of sleeping + /// threads can be much larger. + /// + /// The default is to do a CPU-time sample every 100 milliseconds. + /// + /// The async-profiler agent collects both CPU time and wall-clock time + /// samples, so this function should normally be used along with + /// [ProfilerOptionsBuilder::with_wall_clock_interval]. + /// + /// [async-profiler `interval` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format + /// + /// ### Examples + /// + /// This will sample allocations for every 10 CPU milliseconds (when running) + /// and 100 wall-clock milliseconds (running or sleeping): + /// + /// ``` + /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder}; + /// # use async_profiler_agent::profiler::SpawnError; + /// # use std::time::Duration; + /// # fn main() -> Result<(), SpawnError> { + /// let opts = ProfilerOptionsBuilder::default() + /// .with_cpu_interval(Duration::from_millis(10)) + /// .with_wall_clock_interval(Duration::from_millis(100)) + /// .build(); + /// let profiler = ProfilerBuilder::default() + /// .with_profiler_options(opts) + /// .with_local_reporter("/tmp/profiles") + /// .build(); + /// # if false { // don't spawn the profiler in doctests + /// profiler.spawn()?; + /// # } + /// # Ok(()) + /// # } + /// ``` + pub fn with_cpu_interval(mut self, cpu_interval: Duration) -> Self { + self.cpu_interval = Some(cpu_interval.as_nanos()); + self + } + + /// Sets the interval, in milliseconds, in which the profiler will collect + /// wall-clock samples, via the [async-profiler `wall` option]. + /// + /// Wall-clock samples (JFR `profiler.WallClockSample`) sample threads + /// whether they are sleeping or running, and can therefore be + /// very useful for finding threads that are blocked, for example + /// on a synchronous lock or a slow system call. + /// + /// When using Tokio, since tasks are not threads, tasks that are not + /// currently running will not be sampled by a wall clock sample. However, + /// a wall clock sample is still very useful in Tokio, since it is what + /// you want to catch tasks that are blocking a thread by waiting on + /// synchronous operations. + /// + /// The default is to do a wall-clock sample every second. + /// + /// The async-profiler agent collects both CPU time and wall-clock time + /// samples, so this function should normally be used along with + /// [ProfilerOptionsBuilder::with_cpu_interval]. + /// + /// [async-profiler `wall` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format + /// + /// ### Examples + /// + /// This will sample allocations for every 10 CPU milliseconds (when running) + /// and 100 wall-clock milliseconds (running or sleeping): + /// + /// ``` + /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder}; + /// # use async_profiler_agent::profiler::SpawnError; + /// # use std::time::Duration; + /// # fn main() -> Result<(), SpawnError> { + /// let opts = ProfilerOptionsBuilder::default() + /// .with_cpu_interval(Duration::from_millis(10)) + /// .with_wall_clock_interval(Duration::from_millis(10)) + /// .build(); + /// let profiler = ProfilerBuilder::default() + /// .with_profiler_options(opts) + /// .with_local_reporter("/tmp/profiles") + /// .build(); + /// # if false { // don't spawn the profiler in doctests + /// profiler.spawn()?; + /// # } + /// # Ok(()) + /// # } + /// ``` + pub fn with_wall_clock_interval(mut self, wall_clock: Duration) -> Self { + self.wall_clock_millis = Some(wall_clock.as_millis()); self } @@ -169,6 +310,8 @@ impl ProfilerOptionsBuilder { pub fn build(self) -> ProfilerOptions { ProfilerOptions { native_mem: self.native_mem, + wall_clock_millis: self.wall_clock_millis, + cpu_interval: self.cpu_interval, } } } @@ -184,25 +327,144 @@ pub struct ProfilerBuilder { } impl ProfilerBuilder { - /// Sets the reporting interval. + /// Sets the reporting interval (default: 30 seconds). + /// + /// This is the interval that samples are *reported* to the backend, + /// and is unrelated to the interval at which the application + /// is *sampled* by async profiler, which is controlled by + /// [ProfilerOptionsBuilder::with_cpu_interval] and + /// [ProfilerOptionsBuilder::with_wall_clock_interval]. + /// + /// Most users should not change this setting. + /// + /// ## Example + /// + /// ```no_run + /// # use std::path::PathBuf; + /// # use std::time::Duration; + /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError}; + /// # let path = PathBuf::from("."); + /// let agent = ProfilerBuilder::default() + /// .with_local_reporter(path) + /// .with_reporting_interval(Duration::from_secs(15)) + /// .build() + /// .spawn()?; + /// # Ok::<_, SpawnError>(()) + /// ``` pub fn with_reporting_interval(mut self, i: Duration) -> ProfilerBuilder { self.reporting_interval = Some(i); self } - /// Sets the reporter. + /// Sets the [`Reporter`], which is used to upload the collected profiling + /// data. Common reporters are [`LocalReporter`], and, with the `s3-no-defaults` + /// feature enabled, + #[cfg_attr(not(feature = "s3-no-defaults"), doc = "`S3Reporter`.")] + #[cfg_attr(feature = "s3-no-defaults", doc = "[`S3Reporter`].")] + /// It is also possible to write your own [`Reporter`]. + /// + /// It's normally easier to use [`LocalReporter`] directly via + /// [`ProfilerBuilder::with_local_reporter`]. + /// + /// If you want to output to multiple reporters, you can use + /// [`MultiReporter`]. + /// + /// [`LocalReporter`]: crate::reporter::local::LocalReporter + /// [`MultiReporter`]: crate::reporter::multi::MultiReporter + #[cfg_attr( + feature = "s3-no-defaults", + doc = "[`S3Reporter`]: crate::reporter::s3::S3Reporter" + )] + /// + #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example.md"))] pub fn with_reporter(mut self, r: impl Reporter + Send + Sync + 'static) -> ProfilerBuilder { self.reporter = Some(Box::new(r)); self } + /// Sets the profiler to ues [LocalReporter], which will write `.jfr` files to `path`, + /// and disables metadata auto-detection (see [`ProfilerBuilder::with_custom_agent_metadata`]) + /// since the [LocalReporter] does not need that. + /// + /// This is useful for testing, since metadata auto-detection currently only works + /// on [Amazon EC2] or [Amazon Fargate] instances. + /// + /// The local reporter should normally not be used in production, since it will + /// not clean up JFR files. Instead, you can use a pre-existing [`Reporter`] + /// or write your own (see [`ProfilerBuilder::with_reporter`]). + /// + /// [Amazon EC2]: https://aws.amazon.com/ec2 + /// [Amazon Fargate]: https://aws.amazon.com/fargate + /// + /// ## Example + /// + /// This will write profiles as `.jfr` files to `./path-to-profiles`: + /// + /// ```no_run + /// # use std::path::PathBuf; + /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError}; + /// # use async_profiler_agent::reporter::local::LocalReporter; + /// # use async_profiler_agent::metadata::AgentMetadata; + /// let path = PathBuf::from("./path-to-profiles"); + /// let agent = ProfilerBuilder::default() + /// .with_local_reporter(path) + /// .build() + /// .spawn()?; + /// # Ok::<_, SpawnError>(()) + /// ``` + pub fn with_local_reporter(mut self, path: impl Into) -> ProfilerBuilder { + self.reporter = Some(Box::new(LocalReporter::new(path.into()))); + self.with_custom_agent_metadata(AgentMetadata::NoMetadata) + } + /// Provide custom agent metadata. + /// + /// The async-profiler Rust agent sends metadata to the [Reporter] with + /// the identity of the current host and process, which is normally + /// transmitted as `metadata.json` within the generated `.zip` file, + /// using the schema format [`reporter::s3::MetadataJson`]. + /// + /// That metadata can later be used by tooling to be able to sort + /// profiling reports by host. + /// + /// async-profiler Rust agent will by default try to fetch the metadata + /// using [IMDS] when running on [Amazon EC2] or [Amazon Fargate], and + /// will error if it's unable to find it. If you are running the + /// async-profiler agent on any other form of compute, + /// you will need to create and attach your own metadata + /// by calling this function. + /// + #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example-custom-metadata.md"))] + /// [`reporter::s3::MetadataJson`]: crate::reporter::s3::MetadataJson + /// [Amazon EC2]: https://aws.amazon.com/ec2 + /// [Amazon Fargate]: https://aws.amazon.com/fargate + /// [IMDS]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html pub fn with_custom_agent_metadata(mut self, j: AgentMetadata) -> ProfilerBuilder { self.agent_metadata = Some(j); self } /// Provide custom profiler options. + /// + /// ### Example + /// + /// This will sample allocations for every 10 megabytes allocated: + /// + /// ``` + /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder}; + /// # use async_profiler_agent::profiler::SpawnError; + /// # fn main() -> Result<(), SpawnError> { + /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build(); + /// let profiler = ProfilerBuilder::default() + /// .with_profiler_options(opts) + /// .with_local_reporter("/tmp/profiles") + /// .build(); + /// # if false { // don't spawn the profiler in doctests + /// profiler.spawn()?; + /// # } + /// # Ok(()) + /// # } + /// ``` pub fn with_profiler_options(mut self, c: ProfilerOptions) -> ProfilerBuilder { self.profiler_options = Some(c); self @@ -327,12 +589,24 @@ enum TickError { #[non_exhaustive] /// An error that happened spawning a profiler pub enum SpawnError { - /// Error interactive with async-profiler + /// Error from async-profiler #[error(transparent)] AsProf(#[from] asprof::AsProfError), /// Error writing to a tempfile - #[error("tempfile error: {0}")] - TempFile(io::Error), + #[error("tempfile error")] + TempFile(#[source] io::Error), +} + +#[derive(Debug, Error)] +#[non_exhaustive] +/// An error from [`Profiler::spawn_thread`] +pub enum SpawnThreadError { + /// Error from async-profiler + #[error(transparent)] + AsProf(#[from] SpawnError), + /// Error constructing Tokio runtime + #[error("constructing Tokio runtime")] + ConstructRt(#[source] io::Error), } // no control messages currently @@ -382,10 +656,93 @@ impl RunningProfiler { pub fn detach(self) { self.detach_inner(); } + + /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime, + /// and returns a [RunningProfilerThread] attached to it. + fn spawn_attached( + self, + runtime: tokio::runtime::Runtime, + spawn_fn: impl FnOnce(Box) -> std::thread::JoinHandle<()>, + ) -> RunningProfilerThread { + RunningProfilerThread { + stop_channel: self.stop_channel, + join_handle: spawn_fn(Box::new(move || { + let _ = runtime.block_on(self.join_handle); + })), + } + } + + /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime, + /// and detaches it. + fn spawn_detached( + self, + runtime: tokio::runtime::Runtime, + spawn_fn: impl FnOnce(Box) -> std::thread::JoinHandle<()>, + ) { + spawn_fn(Box::new(move || { + let _stop_channel = self.stop_channel; + let _ = runtime.block_on(self.join_handle); + })); + } +} + +/// A handle to a running profiler, running on a separate thread. +/// +/// Currently just allows for stopping the profiler. +/// +/// Dropping this handle will request that the profiler will stop. +#[must_use = "dropping this stops the profiler, call .detach() to detach"] +pub struct RunningProfilerThread { + stop_channel: tokio::sync::oneshot::Sender, + join_handle: std::thread::JoinHandle<()>, +} + +impl RunningProfilerThread { + /// Request that the current profiler stops and wait until it exits. + /// + /// This will cause the currently-pending profile information to be flushed. + /// + /// After this function returns, it is correct and safe to [spawn] a new + /// [Profiler], possibly with a different configuration. Therefore, + /// this function can be used to "reconfigure" a profiler by stopping + /// it and then starting a new one with a different configuration. + /// + /// [spawn]: Profiler::spawn_controllable + pub fn stop(self) { + drop(self.stop_channel); + let _ = self.join_handle.join(); + } } /// Rust profiler based on [async-profiler]. /// +/// Spawning a profiler can be done either in an attached (controllable) +/// mode, which allows for stopping the profiler (and, in fact, stops +/// it when the relevant handle is dropped), or in detached mode, +/// in which the profiler keeps running forever. Applications that can +/// shut down the profiler at run-time, for example applications that +/// support reconfiguration of a running profiler, generally want to use +/// controllable mode. Other applications (most of them) should use +/// detached mode. +/// +/// In addition, the profiler can either be spawned into the current Tokio +/// runtime, or into a new one. Normally, applications should spawn +/// the profiler into their own Tokio runtime, but applications that +/// don't have a default Tokio runtime should spawn it into a +/// different one +/// +/// This leaves 4 functions: +/// 1. [Self::spawn] - detached, same runtime +/// 2. [Self::spawn_thread_to_runtime] - detached, different runtime +/// 3. [Self::spawn_controllable] - controllable, same runtime +/// 4. [Self::spawn_controllable_thread_to_runtime] - controllable, different runtime +/// +/// In addition, there's a helper function that just spawns the profiler +/// to a new runtime in a new thread, for applications that don't have +/// a Tokio runtime and don't need complex control: +/// +/// 5. [Self::spawn_thread] - detached, new runtime in a new thread +/// /// [async-profiler]: https://github.com/async-profiler/async-profiler pub struct Profiler { reporting_interval: Duration, @@ -408,20 +765,23 @@ impl Profiler { /// /// [JoinHandle]: tokio::task::JoinHandle /// - /// ### Example + /// ### Tokio Runtime /// - /// This example uses a [LocalReporter] which reports the profiles to - /// a directory. It works with any other [Reporter]. + /// This function must be run within a Tokio runtime, otherwise it will panic. If + /// your application does not have a `main` Tokio runtime, see + /// [Profiler::spawn_thread]. /// - /// [LocalReporter]: crate::reporter::local::LocalReporter + /// ### Example + /// + /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to + /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter]. /// /// ``` /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError}; - /// # use async_profiler_agent::reporter::local::LocalReporter; /// # #[tokio::main] /// # async fn main() -> Result<(), SpawnError> { /// let profiler = ProfilerBuilder::default() - /// .with_reporter(LocalReporter::new("/tmp/profiles")) + /// .with_local_reporter("/tmp/profiles") /// .build(); /// # if false { // don't spawn the profiler in doctests /// profiler.spawn()?; @@ -433,6 +793,108 @@ impl Profiler { self.spawn_controllable().map(RunningProfiler::detach_inner) } + /// Like [Self::spawn], but instead of spawning within the current Tokio + /// runtime, spawns within a set Tokio runtime and then runs a thread that calls + /// [block_on](tokio::runtime::Runtime::block_on) on that runtime. + /// + /// If your configuration is standard, use [Profiler::spawn_thread]. + /// + /// If you want to be able to stop the resulting profiler, use + /// [Profiler::spawn_controllable_thread_to_runtime]. + /// + /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to + /// allow for configuring thread properties, for example thread names). + /// + /// This is to be used when your program does not have a "main" Tokio runtime already set up. + /// + /// ### Example + /// + /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to + /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter]. + /// + /// ```no_run + /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError}; + /// let rt = tokio::runtime::Builder::new_current_thread() + /// .enable_all() + /// .build()?; + /// let profiler = ProfilerBuilder::default() + /// .with_local_reporter("/tmp/profiles") + /// .build(); + /// + /// profiler.spawn_thread_to_runtime( + /// rt, + /// |t| { + /// std::thread::Builder::new() + /// .name("asprof-agent".to_owned()) + /// .spawn(t) + /// .expect("thread name contains nuls") + /// } + /// )?; + /// # Ok::<_, anyhow::Error>(()) + /// ``` + pub fn spawn_thread_to_runtime( + self, + runtime: tokio::runtime::Runtime, + spawn_fn: impl FnOnce(Box) -> std::thread::JoinHandle<()>, + ) -> Result<(), SpawnError> { + self.spawn_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn) + } + + /// Like [Self::spawn], but instead of spawning within the current Tokio + /// runtime, spawns within a new Tokio runtime and then runs a thread that calls + /// [block_on](tokio::runtime::Runtime::block_on) on that runtime, setting up the runtime + /// by itself. + /// + /// If your configuration is less standard, use [Profiler::spawn_thread_to_runtime]. Calling + /// [Profiler::spawn_thread] is equivalent to calling [Profiler::spawn_thread_to_runtime] + /// with the following: + /// 1. a current thread runtime with background worker threads (these exist + /// for blocking IO) named "asprof-worker" + /// 2. a controller thread (the "main" thread of the runtime) named "asprof-agent" + /// + /// If you want to be able to stop the resulting profiler, use + /// [Profiler::spawn_controllable_thread_to_runtime]. + /// + /// This is to be used when your program does not have a "main" Tokio runtime already set up. + /// + /// ### Example + /// + /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to + /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter]. + /// + /// ```no_run + /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError}; + /// # use async_profiler_agent::reporter::local::LocalReporter; + /// let profiler = ProfilerBuilder::default() + /// .with_local_reporter("/tmp/profiles") + /// .build(); + /// + /// profiler.spawn_thread()?; + /// # Ok::<_, anyhow::Error>(()) + /// ``` + pub fn spawn_thread(self) -> Result<(), SpawnThreadError> { + // using "asprof" in thread name to deal with 15 character + \0 length limit + let rt = tokio::runtime::Builder::new_current_thread() + .thread_name("asprof-worker".to_owned()) + .enable_all() + .build() + .map_err(SpawnThreadError::ConstructRt)?; + let builder = std::thread::Builder::new().name("asprof-agent".to_owned()); + self.spawn_thread_to_runtime(rt, |t| builder.spawn(t).expect("thread name contains nuls")) + .map_err(SpawnThreadError::AsProf) + } + + fn spawn_thread_inner( + self, + asprof: E, + runtime: tokio::runtime::Runtime, + spawn_fn: impl FnOnce(Box) -> std::thread::JoinHandle<()>, + ) -> Result<(), SpawnError> { + let handle: RunningProfiler = runtime.block_on(async move { self.spawn_inner(asprof) })?; + handle.spawn_detached(runtime, spawn_fn); + Ok(()) + } + /// Like [Self::spawn], but returns a [RunningProfiler] that allows for controlling /// (currently only stopping) the profiler. /// @@ -448,20 +910,23 @@ impl Profiler { /// This function will fail if it is unable to start async-profiler, for example /// if it can't find or load `libasyncProfiler.so`. /// - /// ### Example + /// ### Tokio Runtime /// - /// This example uses a [LocalReporter] which reports the profiles to - /// a directory. It works with any other [Reporter]. + /// This function must be run within a Tokio runtime, otherwise it will panic. If + /// your application does not have a `main` Tokio runtime, see + /// [Profiler::spawn_controllable_thread_to_runtime]. + /// + /// ### Example /// - /// [LocalReporter]: crate::reporter::local::LocalReporter + /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to + /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter]. /// /// ```no_run /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError}; - /// # use async_profiler_agent::reporter::local::LocalReporter; /// # #[tokio::main] /// # async fn main() -> Result<(), SpawnError> { /// let profiler = ProfilerBuilder::default() - /// .with_reporter(LocalReporter::new("/tmp/profiles")) + /// .with_local_reporter("/tmp/profiles") /// .build(); /// /// let profiler = profiler.spawn_controllable()?; @@ -485,6 +950,66 @@ impl Profiler { self.spawn_inner(asprof::AsProf::builder().build()) } + /// Like [Self::spawn_controllable], but instead of spawning within the current Tokio + /// runtime, spawns within a set Tokio runtime and then runs a thread that calls + /// [block_on](tokio::runtime::Runtime::block_on) on that runtime. + /// + /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to + /// allow for configuring thread properties, for example thread names). + /// + /// This is to be used when your program does not have a "main" Tokio runtime already set up. + /// + /// ### Example + /// + /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to + /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter]. + /// + /// ```no_run + /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError}; + /// let rt = tokio::runtime::Builder::new_current_thread() + /// .enable_all() + /// .build()?; + /// let profiler = ProfilerBuilder::default() + /// .with_local_reporter("/tmp/profiles") + /// .build(); + /// + /// let profiler = profiler.spawn_controllable_thread_to_runtime( + /// rt, + /// |t| { + /// std::thread::Builder::new() + /// .name("asprof-agent".to_owned()) + /// .spawn(t) + /// .expect("thread name contains nuls") + /// } + /// )?; + /// + /// # fn got_request_to_disable_profiling() -> bool { false } + /// // spawn a task that will disable profiling if requested + /// std::thread::spawn(move || { + /// if got_request_to_disable_profiling() { + /// profiler.stop(); + /// } + /// }); + /// # Ok::<_, anyhow::Error>(()) + /// ``` + pub fn spawn_controllable_thread_to_runtime( + self, + runtime: tokio::runtime::Runtime, + spawn_fn: impl FnOnce(Box) -> std::thread::JoinHandle<()>, + ) -> Result { + self.spawn_controllable_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn) + } + + fn spawn_controllable_thread_inner( + self, + asprof: E, + runtime: tokio::runtime::Runtime, + spawn_fn: impl FnOnce(Box) -> std::thread::JoinHandle<()>, + ) -> Result { + let handle = runtime.block_on(async move { self.spawn_inner(asprof) })?; + Ok(handle.spawn_attached(runtime, spawn_fn)) + } + fn spawn_inner(self, asprof: E) -> Result { // Initialize async profiler - needs to be done once. E::init_async_profiler()?; @@ -590,7 +1115,7 @@ async fn profiler_tick( #[cfg(feature = "aws-metadata-no-defaults")] let md = crate::metadata::aws::load_agent_metadata().await?; #[cfg(not(feature = "aws-metadata-no-defaults"))] - let md = crate::metadata::AgentMetadata::Other; + let md = crate::metadata::AgentMetadata::NoMetadata; tracing::debug!("loaded metadata"); agent_metadata.replace(md); } @@ -720,6 +1245,64 @@ mod tests { assert_eq!(e_md, md); } + #[test_case(false; "uncontrollable")] + #[test_case(true; "controllable")] + fn test_profiler_local_rt(controllable: bool) { + let e_md = AgentMetadata::Ec2AgentMetadata { + aws_account_id: "0".into(), + aws_region_id: "us-east-1".into(), + ec2_instance_id: "i-fake".into(), + }; + let (agent, mut rx) = make_mock_profiler(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .start_paused(true) + .build() + .unwrap(); + // spawn the profiler, doing this before spawning a thread to allow + // capturing errors from `spawn` + let handle = if controllable { + Some( + agent + .spawn_controllable_thread_inner::( + MockProfilerEngine { + counter: AtomicU32::new(0), + }, + rt, + std::thread::spawn, + ) + .unwrap(), + ) + } else { + agent + .spawn_thread_inner::( + MockProfilerEngine { + counter: AtomicU32::new(0), + }, + rt, + std::thread::spawn, + ) + .unwrap(); + None + }; + + let (jfr, md) = rx.blocking_recv().unwrap(); + assert_eq!(jfr, "JFR0"); + assert_eq!(e_md, md); + let (jfr, md) = rx.blocking_recv().unwrap(); + assert_eq!(jfr, "JFR1"); + assert_eq!(e_md, md); + + if let Some(handle) = handle { + let drain_thread = + std::thread::spawn(move || while let Some(_) = rx.blocking_recv() {}); + // request a stop + handle.stop(); + // the drain thread should be done + drain_thread.join().unwrap(); + } + } + enum StopKind { Delibrate, Drop, @@ -847,6 +1430,8 @@ mod tests { fn test_profiler_options_to_args_string_with_native_mem() { let opts = ProfilerOptions { native_mem: Some("10m".to_string()), + wall_clock_millis: None, + cpu_interval: None, }; let dummy_path = Path::new("/tmp/test.jfr"); let args = opts.to_args_string(dummy_path); @@ -856,9 +1441,36 @@ mod tests { #[test] fn test_profiler_options_builder() { let opts = ProfilerOptionsBuilder::default() - .with_native_mem("5m".to_string()) + .with_native_mem_bytes(5000000) + .build(); + + assert_eq!(opts.native_mem, Some("5000000".to_string())); + } + + #[test] + fn test_profiler_options_builder_all_options() { + let opts = ProfilerOptionsBuilder::default() + .with_native_mem_bytes(5000000) + .with_cpu_interval(Duration::from_secs(1)) + .with_wall_clock_interval(Duration::from_secs(10)) .build(); - assert_eq!(opts.native_mem, Some("5m".to_string())); + let dummy_path = Path::new("/tmp/test.jfr"); + let args = opts.to_args_string(dummy_path); + assert_eq!(args, "start,event=cpu,interval=1000000000,wall=10000ms,jfr,cstack=dwarf,file=/tmp/test.jfr,nativemem=5000000"); + } + + #[test] + fn test_local_reporter_has_no_metadata() { + // Check that with_local_reporter sets some configuration + let reporter = ProfilerBuilder::default().with_local_reporter("."); + assert_eq!( + format!("{:?}", reporter.reporter), + r#"Some(LocalReporter { directory: "." })"# + ); + match reporter.agent_metadata { + Some(AgentMetadata::NoMetadata) => {} + bad => panic!("{bad:?}"), + }; } } diff --git a/src/reporter/local.rs b/src/reporter/local.rs index c1831b4..3a53f6c 100644 --- a/src/reporter/local.rs +++ b/src/reporter/local.rs @@ -22,6 +22,30 @@ enum LocalReporterError { /// A reporter that reports into a directory. /// /// The files are reported with the filename `yyyy-mm-ddTHH-MM-SSZ.jfr` +/// +/// It does not currently use the metadata, so if you are using +/// [LocalReporter] alone, rather than inside a [MultiReporter], you +/// can just use [AgentMetadata::NoMetadata] as metadata. +/// +/// [AgentMetadata::NoMetadata]: crate::metadata::AgentMetadata::NoMetadata +/// [MultiReporter]: crate::reporter::multi::MultiReporter +/// +/// ### Example +/// +/// ``` +/// # use async_profiler_agent::metadata::AgentMetadata; +/// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError}; +/// # #[tokio::main] +/// # async fn main() -> Result<(), SpawnError> { +/// let profiler = ProfilerBuilder::default() +/// .with_local_reporter("/tmp/profiles") +/// .build(); +/// # if false { // don't spawn the profiler in doctests +/// profiler.spawn()?; +/// # } +/// # Ok(()) +/// # } +/// ``` #[derive(Debug)] pub struct LocalReporter { directory: PathBuf, diff --git a/src/reporter/multi.rs b/src/reporter/multi.rs index ef72f77..66f1ff4 100644 --- a/src/reporter/multi.rs +++ b/src/reporter/multi.rs @@ -34,6 +34,47 @@ impl fmt::Display for MultiError { /// A reporter that reports profiling results to several destinations. /// /// If one of the destinations errors, it will continue reporting to the other ones. +/// +/// ## Example +/// +/// Output to both S3 and a local directory: +/// +#[cfg_attr(feature = "s3-no-defaults", doc = "```no_run")] +#[cfg_attr(not(feature = "s3-no-defaults"), doc = "```compile_fail")] +/// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError}; +/// # use async_profiler_agent::reporter::Reporter; +/// # use async_profiler_agent::reporter::local::LocalReporter; +/// # use async_profiler_agent::reporter::multi::MultiReporter; +/// # use async_profiler_agent::reporter::s3::{S3Reporter, S3ReporterConfig}; +/// # use aws_config::BehaviorVersion; +/// # use std::path::PathBuf; +/// # +/// # #[tokio::main] +/// # async fn main() -> Result<(), SpawnError> { +/// let bucket_owner = ""; +/// let bucket_name = ""; +/// let profiling_group = "a-name-to-give-the-uploaded-data"; +/// let path = PathBuf::from("path/to/write/jfrs"); +/// +/// let sdk_config = aws_config::defaults(BehaviorVersion::latest()).load().await; +/// +/// let reporter = MultiReporter::new(vec![ +/// Box::new(LocalReporter::new(path)), +/// Box::new(S3Reporter::new(S3ReporterConfig { +/// sdk_config: &sdk_config, +/// bucket_owner: bucket_owner.into(), +/// bucket_name: bucket_name.into(), +/// profiling_group_name: profiling_group.into(), +/// })), +/// ]); +/// let profiler = ProfilerBuilder::default() +/// .with_reporter(reporter) +/// .build(); +/// +/// profiler.spawn()?; +/// # Ok(()) +/// # } +/// ``` pub struct MultiReporter { reporters: Vec>, } diff --git a/src/reporter/s3.rs b/src/reporter/s3.rs index 6bb208f..e00f78d 100644 --- a/src/reporter/s3.rs +++ b/src/reporter/s3.rs @@ -145,7 +145,9 @@ fn make_s3_file_name( let task_arn = ecs_task_arn.replace("/", "-").replace("_", "-"); format!("ecs_{task_arn}_") } + #[allow(deprecated)] AgentMetadata::Other => "onprem__".to_string(), + AgentMetadata::NoMetadata => "unknown__".to_string(), }; let time: chrono::DateTime = time.into(); let time = time @@ -252,7 +254,8 @@ mod test { ); } - #[test_case(AgentMetadata::Other, "profile_pg_onprem____