chore(agent-data-plane): update environment provider to be supervisor-driven#1636
Conversation
Binary Size Analysis (Agent Data Plane)Target: 55bca14 (baseline) vs 7b93e62 (comparison) diff
|
| Module | File Size | Symbols |
|---|---|---|
core |
+53.51 KiB | 16661 |
tracing |
-20.84 KiB | 174 |
tonic |
+19.20 KiB | 722 |
tokio |
-18.15 KiB | 4798 |
agent_data_plane::internal::env |
+13.25 KiB | 201 |
memory_accounting::allocator::Tracked |
+12.79 KiB | 40 |
anon.8e4511dca9ace64b4814a5690da94cb2.15.llvm.6671550231517592206 |
+12.09 KiB | 1 |
anon.a8a2978b932154393ec8fe9bd19f68c6.15.llvm.2818126597763627731 |
-12.09 KiB | 1 |
saluki_core::runtime::supervisor |
+11.01 KiB | 145 |
anyhow |
+9.54 KiB | 1660 |
saluki_env::features::containerd |
+7.59 KiB | 14 |
tracing_subscriber |
-7.35 KiB | 144 |
h2 |
-6.91 KiB | 2475 |
agent_data_plane::state::metrics |
-6.83 KiB | 24 |
saluki_core::state::reflector |
+6.57 KiB | 6 |
tower_layer |
-6.48 KiB | 22 |
alloc |
+6.46 KiB | 3006 |
agent_data_plane::cli::run |
-6.12 KiB | 134 |
saluki_env::features::find_first_available_unix_socket |
-5.99 KiB | 13 |
std |
-5.73 KiB | 746 |
Detailed Symbol Changes
FILE SIZE VM SIZE
-------------- --------------
[NEW] +142Ki [NEW] +142Ki agent_data_plane::cli::run::handle_run_command::_{{closure}}::hd83fd04a3d04258b
+0.4% +89.2Ki +0.4% +70.3Ki [55406 Others]
[NEW] +68.0Ki [NEW] +67.8Ki agent_data_plane::cli::run::create_topology::_{{closure}}::he61859445306a93d
[NEW] +65.0Ki [NEW] +64.8Ki saluki_core::topology::built::BuiltTopology::spawn::_{{closure}}::h025944b7eb226ac7
[NEW] +58.7Ki [NEW] +58.5Ki saluki_core::topology::blueprint::TopologyBlueprint::build::_{{closure}}::h82ca535fb9c54454
[NEW] +58.2Ki [NEW] +58.0Ki agent_data_plane::cli::dogstatsd::handle_dogstatsd_command::_{{closure}}::h516585f27dd77050
[NEW] +57.9Ki [NEW] +57.7Ki agent_data_plane::cli::debug::handle_debug_command::_{{closure}}::h6686f68cda6dc29d
[NEW] +54.1Ki [NEW] +53.7Ki agent_data_plane::main::_{{closure}}::h7ff0db362195a43e
[NEW] +43.8Ki [NEW] +43.7Ki core::ops::function::FnOnce::call_once::hfda257168ef3aae4
[NEW] +42.2Ki [NEW] +42.0Ki _<saluki_components::forwarders::otlp::OtlpForwarder as saluki_core::components::forwarders::Forwarder>::run::_{{closure}}::h4073a9a8d14f9740
[NEW] +42.2Ki [NEW] +42.1Ki saluki_components::common::datadog::io::run_endpoint_io_loop::_{{closure}}::hf91970e888d2b0a7
[DEL] -42.2Ki [DEL] -42.0Ki _<saluki_components::forwarders::otlp::OtlpForwarder as saluki_core::components::forwarders::Forwarder>::run::_{{closure}}::he8da937121408613
[DEL] -43.8Ki [DEL] -43.7Ki core::ops::function::FnOnce::call_once::hc103ec02647c8c0b
[DEL] -54.0Ki [DEL] -53.7Ki agent_data_plane::main::_{{closure}}::h86819b7d7b2935c2
[DEL] -55.8Ki [DEL] -55.6Ki agent_data_plane::cli::dogstatsd::handle_dogstatsd_command::_{{closure}}::h849013f75cde3f44
[DEL] -57.3Ki [DEL] -57.1Ki saluki_core::topology::blueprint::TopologyBlueprint::build::_{{closure}}::h3b0c80cba457ff0c
[DEL] -57.6Ki [DEL] -57.5Ki agent_data_plane::cli::debug::handle_debug_command::_{{closure}}::hc45833ca87bdd357
[DEL] -67.3Ki [DEL] -66.9Ki saluki_core::topology::built::BuiltTopology::spawn::_{{closure}}::h481c4e9256bc80aa
[DEL] -67.7Ki [DEL] -67.6Ki agent_data_plane::cli::run::create_topology::_{{closure}}::h783721517ee4e28b
[DEL] -87.4Ki [DEL] -87.2Ki agent_data_plane::internal::env::workload::RemoteAgentWorkloadProvider::from_configuration::_{{closure}}::hd3fcc3c20a4b82ff
[DEL] -151Ki [DEL] -151Ki agent_data_plane::cli::run::handle_run_command::_{{closure}}::h9eae97011b73ac41
+0.1% +37.3Ki +0.1% +18.7Ki TOTAL
Regression Detector (Agent Data Plane)Run ID: Optimization Goals: ✅ No significant changes detectedFine details of change detection per experiment (35)Experiments configured
Bounds Checks: ✅ Passed (5)
ExplanationA change is flagged as a regression when |Δ mean %| > 5.00% in the regressing direction for its optimization goal AND SMP marks the experiment as a regression ( |
3d3db4c to
8b1e85b
Compare
724d7b4 to
bdbb7f5
Compare
08c8463 to
cf5360f
Compare
bdbb7f5 to
d07d4d2
Compare
d07d4d2 to
26d0c84
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 26d0c84958
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| let result = collector.watch(operations_tx).await; | ||
| if let Err(e) = &result { | ||
| warn!( | ||
| error = %e, | ||
| collector_name = collector.name(), | ||
| "Failed to collect metadata. Sleeping 2s before retrying...", | ||
| ); | ||
| sleep(Duration::from_secs(2)).await; | ||
| } |
There was a problem hiding this comment.
Retry clean collector exits inside the worker
When a collector stream ends cleanly (watch returns Ok(())), this worker now returns Ok(()) to the supervisor instead of looping locally as the old run method did. The workload supervisor is configured with a finite restart budget (5 restarts in 30s in bin/agent-data-plane/src/internal/env/workload/mod.rs), so repeated clean EOFs from the remote tagger/workloadmeta streams can exhaust the budget and take down the internal supervision tree, shutting down the topology even though the previous behavior treated EOF as retriable. Handle Ok(()) like the previous loop (including backoff) or otherwise avoid counting normal stream termination against the supervisor limit.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
For this code in
lib/saluki-env/src/workload/collectors/cgroups.rs (line 80)
poller_handle = Some(tokio::task::spawn_blocking(move || cgroups_manager.poll(operations_tx)));
When the collector (now supervised) is dropped, I don't think it drops the poller it spawns?
| // If the supervisor already exited (i.e., the select! above matched its branch), both the send | ||
| // and await resolve immediately — the send is a no-op and the future is already complete. | ||
| // and await resolve immediately — the send is a no-op and the JoinHandle is already ready. | ||
| let _ = internal_shutdown_tx.send(()); |
There was a problem hiding this comment.
Does the early return on errors on line 221-222 prevent the shutdown from being run?
let built_topology = blueprint.build().await?;
let mut running_topology = built_topology.spawn(&health_registry, memory_limiter).await?;
There was a problem hiding this comment.
When we're at the point of waiting for everything to become healthy in the internal supervisor, we haven't spawned the primary topology yet so there's nothing to shutdown.
…e spawning topology (#1661) ## Summary This PR updates ADP to wait for all registered components in the health registry to become ready after spawning the internal supervisor _before_ spawning the primary topology. In #1636, the switch over for asynchronous tasks spawned by the environment provider to be supervisor-based led to a small issue where we spawned the topology before the actual workers in the supervisor were spawned, leading to metrics that weren't enriched properly. We fixed that by manually spawning the internal supervisor first, which worked... but that's not a permanent fix, and still has timing-related risks. This PR explicitly fixes the issue by actually waiting until all registered components (and most background-y tasks in the environment provider do register themselves in the health registry) are ready after spawning the internal supervisor, and only after that occurs do we proceed with spawning the primary topology. As part of this, we update `HealthRegistry::all_ready` to be async so that we can properly notify as soon as we detect all registered components are ready without having to do naive time-based polling. ## Change Type - [ ] Bug fix - [ ] New feature - [x] Non-functional (chore, refactoring, docs) - [ ] Performanc ## How did you test this PR? - [x] Updated the existing `readiness` unit test to handle the changes to `all_ready`. - [x] Existing unit, integration, and correctness tests all passing. ## References DADP-2 Co-authored-by: toby.lawrence <toby.lawrence@datadoghq.com>
…e spawning topology (#1661) ## Summary This PR updates ADP to wait for all registered components in the health registry to become ready after spawning the internal supervisor _before_ spawning the primary topology. In #1636, the switch over for asynchronous tasks spawned by the environment provider to be supervisor-based led to a small issue where we spawned the topology before the actual workers in the supervisor were spawned, leading to metrics that weren't enriched properly. We fixed that by manually spawning the internal supervisor first, which worked... but that's not a permanent fix, and still has timing-related risks. This PR explicitly fixes the issue by actually waiting until all registered components (and most background-y tasks in the environment provider do register themselves in the health registry) are ready after spawning the internal supervisor, and only after that occurs do we proceed with spawning the primary topology. As part of this, we update `HealthRegistry::all_ready` to be async so that we can properly notify as soon as we detect all registered components are ready without having to do naive time-based polling. ## Change Type - [ ] Bug fix - [ ] New feature - [x] Non-functional (chore, refactoring, docs) - [ ] Performanc ## How did you test this PR? - [x] Updated the existing `readiness` unit test to handle the changes to `all_ready`. - [x] Existing unit, integration, and correctness tests all passing. ## References DADP-2 Co-authored-by: toby.lawrence <toby.lawrence@datadoghq.com> 55119e3

Summary
This PR updates the ADP environment provider, as well as some of the underlying shared pieces from
saluki-env, to be supervisor-based.Environment providers are generally composed of data stores which are dynamically updated in some way, either by querying the system directly for information or receiving it from external processes/systems. These updates happen asynchronously, and so environment providers (really, their underlying subproviders like workload and autodiscovery) will naturally spawn asynchronous tasks to handle this work. We want to bring these tasks under supervision.
We've made a number of small changes to support this:
MetadataAggregatorandMetadataCollectorWorker-- can now be added to a supervision tree directly. They play nicely with restarts.RemoteAgentWorkloadProvidernow returns its own supervisor that drives the aforementioned metadata workers, and it runs a dedicated worker for dynamically publishing the API routes related to querying the workload metadata stores (tags, External Data, etc).RemoteAgentAutodiscoveryProviderwill also return its own supervisor that drives the underlying workers that handle forwarding AD events to subscribersADPEnvironmentProviderreturns an optional supervisor (depending on if standalone mode is enabled) that is a rollup of the new supervisors returned by the underlying workload and AD providersEssentially, we've turned all the spots where we previously spawned asynchronous tasks into supervised workers, and rolled them up into nested supervisors.
Change Type
How did you test this PR?
Existing unit, integration, and correctness tests.
References
DADP-2