Feature/otlp ksm node enrichment#2072
Conversation
…or AI agents (~5,100 tokens) Adds a sparse tree of AGENTS.md files across the repository to help AI coding agents navigate the codebase effectively. Each file surfaces hidden context that isn't obvious from reading code alone — architectural invariants, registration traps, pitfall patterns, and cross-cutting contracts. Key context captured: - Dual runtime architecture (Telegraf + OTel) and the adapter bridge between them - Silent failure traps: OTel component registration, Telegraf blank imports - CloudWatch Logs concurrency trap (concurrency >1 introduces HOL blocking) - fd_release + auto_removal log loss interaction - Prometheus dual pipeline (completely different component chains for CW vs AMP) - EntityStore service name resolution priority chain - OTLP/HTTP validator as a security boundary - SigV4 handler ordering constraint (compression before signing) - Forked dependency warnings (go.mod replace directives) - Partition-aware credential handling All claims verified against current source code.
… enrichment Adds end-to-end pipeline for enriching KSM metrics with per-node IMDS host attributes (host.id, host.name, host.type, host.image.id, cloud.availability_zone) via Kubernetes Leases. - Expand EC2Info struct with InstanceType, ImageID, AvailabilityZone, Hostname fields - Rename setInstanceIDAccountID to setEC2Metadata to reflect expanded scope - Add LeaseWriter that publishes IMDS metadata as Lease annotations per node - Add nodemetadatacache extension that watches Leases via informer and caches metadata - Add nodemetadataenricher processor that enriches KSM metrics from the cache - Register nodemetadatacache and nodemetadataenricher in default components - Extract getEnv and getK8sConfig as package-level vars for testability - Add startLeaseWriter tests for missing env var and K8s config failure paths
12e724b to
511cca2
Compare
511cca2 to
c8128bb
Compare
| lw.logger.Info("Node metadata Lease already exists, adopting via update", | ||
| zap.String("name", lw.leaseName()), | ||
| ) | ||
| existing, getErr := lw.client.Leases(lw.namespace).Get(context.Background(), lw.leaseName(), metav1.GetOptions{}) |
There was a problem hiding this comment.
When AlreadyExists, the code does Get then Update. If Get fails (any non-NotFound error), it logs and calls continue — which immediately retries Create, gets AlreadyExists again, tries Get again, fails again. There is no backoff and no done channel check in this path. The same applies when Update fails: continue goes back to Create with no sleep. Under K8s API throttling or transient errors, this becomes a tight loop hammering the API server until the agent is killed.
There was a problem hiding this comment.
Good catch. The bare continue statements bypass the backoff entirely. Fixed by restructuring the AlreadyExists branch so that Get/Update failures fall through to the shared backoff+done-check block at the bottom of the loop instead of jumping back to the top.
I considered introducing wait.Backoff but opted against it, the function already has its own backoff state and mixing two independent backoff mechanisms in the same retry loop would be harder to follow. The fall-through reuses the single existing backoff variable for all failure paths.
The trade-off is that after a Get/Update failure, the next iteration calls Create again (which will return AlreadyExists). This is intentional — the redundant Create is one extra call per backoff interval, and it handles the edge case where the Lease was deleted between retries (Create would succeed directly instead of looping on Get for a nonexistent object).
| annotationHostName = "cwagent.amazonaws.com/host.name" | ||
| annotationHostType = "cwagent.amazonaws.com/host.type" | ||
| annotationImageID = "cwagent.amazonaws.com/host.image.id" | ||
| annotationAZ = "cwagent.amazonaws.com/cloud.availability_zone" |
There was a problem hiding this comment.
can these consts be moved to a shared location? they are defined in both leasewriter and here
There was a problem hiding this comment.
Created lease.go as the single source of truth — internal/k8sCommon/ is where the codebase already keeps shared K8s utilities (k8sclient, k8sutil, kubeletutil). Both the writer and reader now import from there. Kept it as a dedicated lease sub-package rather than mixing into k8sutil, since these constants define the Lease contract between two specific components.
| } | ||
| c.mutex.Lock() | ||
| defer c.mutex.Unlock() | ||
| c.cache = make(map[string]*NodeMetadata) |
There was a problem hiding this comment.
Shutdown closes stopCh (signaling the informer to stop) and then immediately acquires the write lock to clear the cache. But closing stopCh only signals the informer — it does not wait for in-flight event handler goroutines to exit. An onLeaseAdd or onLeaseUpdate call that was waiting for the write lock will acquire it after Shutdown releases it, re-populating the cache. After Shutdown returns, the cache may contain stale data. Downstream code that calls Get() after Shutdown could receive non-nil results.
There was a problem hiding this comment.
Added an atomic.Bool shutdown flag. Shutdown() sets it before closing stopCh. Get() returns nil after shutdown, and handleLeaseEvent()/onLeaseDelete() bail out early — so in-flight handlers can't repopulate the cache after it's cleared. The check is a single atomic read, so the overhead on the event handler path is negligible.
| // Check staleness: renewTime + leaseDuration must be >= now | ||
| expiry := entry.RenewTime.Add(time.Duration(entry.LeaseDuration) * time.Second) | ||
| if time.Now().After(expiry) { | ||
| return nil |
There was a problem hiding this comment.
this returns nil for stale entries but does not remove them from the map. If a node is decommissioned and its lease expires naturally (TTL) without a delete event (e.g., the informer missed the delete, or the lease was force-deleted), the entry stays in the map forever. In a large cluster with frequent node churn, this is unbounded memory growth. The informer's 5-minute resync period helps with missed deletes, but does not guarantee cleanup of TTL-expired entries.
There was a problem hiding this comment.
Considered three approaches here: delete-on-read (write-lock upgrade in Get()), background eviction goroutine, and no change.
Going with no change. The correctness concern is already handled — Get() checks renewTime + leaseDuration and returns nil for stale entries, so no wrong data is ever served. The memory concern is the inert map entries for decommissioned nodes. Each entry is ~300 bytes (5 short strings + a timestamp + an int32). Even 1000 decommissioned nodes over the process lifetime is ~300KB, and the map resets on any pod restart (deployment rollout, Helm upgrade, OOM, etc.).
Adding eviction logic (either delete-on-read or a background sweep) would duplicate the TTL-based staleness handling that motivated choosing Leases over ConfigMaps in the first place. The delete-on-read approach also only helps for nodes that are still actively queried — truly decommissioned nodes (no KSM metrics) would never be cleaned up anyway.
Happy to add a background sweep if you feel the memory bound isn't tight enough, but I think the current design is the right trade-off.
| // IMDS metadata as a Kubernetes Lease. Must be called after ec2Info is | ||
| // initialized (the LeaseWriter's waitForEC2Info handles the race). | ||
| func (e *EntityStore) startLeaseWriter() { | ||
| nodeName := getEnv("K8S_NODE_NAME") |
There was a problem hiding this comment.
should check if LW has already been initialized
There was a problem hiding this comment.
Added the nil guard. The current call site only invokes it once, but it's a cheap defensive check against future refactors.
| annotations := lease.Annotations | ||
|
|
||
| // All five annotations must be present | ||
| hostID, ok1 := annotations[annotationHostID] |
There was a problem hiding this comment.
should there be a check for any empty or nil values?
There was a problem hiding this comment.
The ok1–ok5 checks handle the nil/missing case (key not present in the annotations map). Added empty string checks as well — none of these IMDS fields can legitimately be empty, so rejecting empty values is strictly more correct. The LeaseWriter won't write empty annotations in practice, but it's good defensive validation on the reader side.
| cache := p.cache.Load() | ||
| if cache == nil { | ||
| // Extension may not have been ready at creation time — retry. | ||
| cache = nodemetadatacache.GetNodeMetadataCache() |
There was a problem hiding this comment.
could add logs here to help with debugging
There was a problem hiding this comment.
Added a debug log on successful lazy init. Intentionally skipped logging the "not yet available" case — processMetrics is called per metric batch, so that message would be too noisy.
| // Stop stops the renewal goroutine, waits for it to exit, then performs a | ||
| // best-effort delete of the Lease. | ||
| func (lw *LeaseWriter) Stop() { | ||
| close(lw.done) |
There was a problem hiding this comment.
If this gets called twice (e.g., from a test that calls Stop() directly and then EntityStore.Shutdown() also calls it), the second close panics. There is no sync.Once or closed-flag guard.
There was a problem hiding this comment.
Wrapped in sync.Once
| var _ extension.Extension = (*NodeMetadataCache)(nil) | ||
|
|
||
| // SetForTest populates the cache with test data. Exported for cross-package test use. | ||
| func (c *NodeMetadataCache) SetForTest(nodeName string, metadata *NodeMetadata) { |
There was a problem hiding this comment.
Can this be moved to a testutil package for export purposes ?
There was a problem hiding this comment.
Looked into this — a testutil package can't access the unexported cache map and mutex, so it would need reflect/unsafe or a new exported Set() method (which is worse for the public API). Moving to a _test.go file doesn't work either since processor_test.go in the enricher package needs cross-package access. SetNodeMetadataCacheForTest in factory.go already follows the same pattern. Happy to move them if you see a cleaner approach, but I think the ForTest suffix + doc comments is the least-bad option here
|
|
||
| // leaseWriter creates and renews a Kubernetes Lease with IMDS metadata | ||
| // for KSM node metadata enrichment | ||
| leaseWriter *LeaseWriter |
There was a problem hiding this comment.
How are we going to configure this extension in the helm-chart yaml? Specifically fields such as kubernetes_mode. The helm-chart could be used in EKS / K8s on EC2 / K8s on Prem etc.
This is handled in the config translation currently today with CWA but that does not exist for OTel CI.
There was a problem hiding this comment.
The entitystore config (including kubernetes_mode, mode, region) continues to come from the existing CWA config translator, this PR doesn't change that path. The LeaseWriter doesn't depend on the specific kubernetes_mode value — it only gates on mode == EC2 && kubernetesMode != "", so it works identically on EKS, K8s-on-EC2, or any EC2-backed K8s cluster. On-prem (no EC2) is excluded because mode != EC2 means no IMDS. The LeaseWriter doesn't use region, profile, or credential config - it reads IMDS via the existing EC2Info and uses in-cluster K8s config. No new entitystore config fields or Helm chart config needed.
There was a problem hiding this comment.
We are then now adding a hard dependency on the json config for OTel CI to work - so itll no longer be a pure yaml experience.
There was a problem hiding this comment.
Have you any ideas of what way to change this or is it ok for now?
There was a problem hiding this comment.
I think lets just copy the extension as-is into our yaml as well.. as long as it is identical to what the agent json translation generates, the merge shouldnt complain.
@mitali-salvi is going to validate that.
There was a problem hiding this comment.
Agree with Kaushik here, we need to ensure this stays as a pure YAML experience.
I validated that copying the entitystore extension to the YAML wont cause merge conflicts during agent startup. The agent will merge/combine the 2 instances of the extension and instantiate it as a singleton
eb7612c to
ba2cf39
Compare
Description of the issue
KSM (kube-state-metrics) metrics scraped by the cluster-scraper lack per-node
host attributes (instance ID, instance type, AMI ID, availability zone, hostname).
These attributes are only available via IMDS on each node, but the cluster-scraper
runs as a single Deployment and cannot access IMDS for every node.
Description of changes
Introduces a three-component pipeline to bridge IMDS data from DaemonSet nodes
to the cluster-scraper:
LeaseWriter (
extension/entitystore/leasewriter.go): Runs on eachDaemonSet node. After EC2Info is populated from IMDS, creates a Kubernetes
Lease (
cwagent-node-metadata-<nodeName>) with host attributes as annotations.Includes jitter to prevent thundering herd, exponential backoff on failures,
and best-effort cleanup on shutdown.
nodemetadatacache extension (
extension/nodemetadatacache/): Runs on thecluster-scraper. Watches Leases via a K8s informer scoped to the addon namespace.
Maintains an in-memory cache keyed by node name with staleness checks
(renewTime + leaseDuration). Degrades gracefully if K8s client setup fails.
nodemetadataenricher processor (
plugins/processors/nodemetadataenricher/):Runs in the cluster-scraper's KSM pipeline. For each ResourceMetrics with a
k8s.node.nameattribute, looks up the node in the cache and setshost.id,host.name,host.type,host.image.id, andcloud.availability_zone.Uses
atomic.Pointerfor thread-safe lazy initialization of the cache reference.Supporting changes:
setInstanceIDAccountIDrenamed tosetEC2MetadatastartLeaseWritercalled in K8s mode with testablegetEnv/getK8sConfigvarsLicense
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution,
under the terms of your choice.
Tests
ec2Info_test.go: Updated
TestSetEC2Metadatatable-driven tests for all 6 fields,added
TestGettersReturnEmptyBeforeInit,TestHostnameFailureProceedsWithoutItextension_test.go: Added
TestStartLeaseWriter_MissingNodeName,TestStartLeaseWriter_K8sConfigFailure,TestStartLeaseWriter_DefaultNamespaceleasewriter_test.go: Tests for buildLease, create, renewal (calls actual
renewLeaseWithRetry), stop/delete, leaseName, jitter bounds, default valuesextension_test.go (nodemetadatacache): Tests for cache hit/miss, stale lease,
missing annotations, missing renewTime/leaseDuration, concurrent read/write,
tombstone handling, update overwrite, prefix filtering
processor_test.go: Tests for enrichment with cache hit, pass-through on cache
miss, no node name, empty node name, metric count preservation, AZ overwrite,
mixed metrics (enriched + pass-through)
Deployed to a 13-node EKS cluster with standard, GPU, Neuron, EFA, and attr-limit node groups. Verified all 13 DaemonSet agents created Leases with correct IMDS annotations (host.id, host.name, host.type, host.image.id, cloud.availability_zone) and leaseDurationSeconds=7200. Confirmed Lease adoption on pod restart (AlreadyExists handling) and renewal via agent logs. Verified the cluster-scraper's nodemetadatacache extension started and synced the Lease informer.
Ran the full integration test suite (2769 tests, 0 failures) which validates: