Skip to content

refactor: add Context parameter to Backoff.Wait method#50428

Open
andrzej-stencel wants to merge 13 commits intoelastic:mainfrom
andrzej-stencel:add-context-to-backoff-wait
Open

refactor: add Context parameter to Backoff.Wait method#50428
andrzej-stencel wants to merge 13 commits intoelastic:mainfrom
andrzej-stencel:add-context-to-backoff-wait

Conversation

@andrzej-stencel
Copy link
Copy Markdown
Contributor

Proposed commit message

Changes the API of the Backoff interface, adding context.Context to its Wait method.

Previously, the Wait method used a context (or more specifically, a done channel) that was passed during creation of a Backoff instance, specifically in NewExpBackoff and NewEqualJitterBackoff functions.
This was not a correct design, as the proper context might not be available during creation of the Backoff instance. This led to confusion, see for example this discussion.

The changes are split into separate commits for each affected package. See the individual commit messages for details.

I had the changes planned and executed by Claude. I then went through each package one by one, scrutinizing each change and modifying where needed.

Checklist

  • My code follows the style guidelines of this project
  • [ ] I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files
  • [ ] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the stresstest.sh script to run them under stress conditions and race detector to verify their stability.
  • [ ] I have added an entry in ./changelog/fragments using the changelog tool.

Disruptive User Impact

None expected, this is an internal API change.

The only theoretically functional change is in the Kafka input implementation, where the connectDelay now uses current available context, but previously it used context.Background().

How to test this PR locally

I ran unit tests for each modified package.

Removes `done` channel from backoff constructor functions.
Uses the `Context` instance passed as parameter to `Backoff.Wait` method.
Moves retryBackoff instantiation to `MakeOtelConsumer`.
Each `Backoff.Wait` method now uses the context from the current `Publish` call,
removing confusion.
The call to `Backoff.Wait` gets the `f.readerCtx`
which is functionally equivalent to the `canceler.Done()` channel used previously.
To create an instance of `context.Context` from `input.Context` (v1),
I applied the same approach that exists for `input.Context` (v2) in `filebeat/input/kafka/input.go`.
Note that in `makeESClient`, the `connectDelay` previously used `context.Background`.
This change uses the context passed to the `makeESClient` function.
@andrzej-stencel andrzej-stencel requested review from a team as code owners April 30, 2026 12:12
@andrzej-stencel andrzej-stencel added cleanup backport-skip Skip notification from the automated backport with mergify Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team labels Apr 30, 2026
@botelastic botelastic Bot added the needs_team Indicates that the issue/PR needs a Team:* label label Apr 30, 2026
@infra-vault-gh-plugin-prod
Copy link
Copy Markdown

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@botelastic botelastic Bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Apr 30, 2026
@github-actions
Copy link
Copy Markdown
Contributor

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)
  • /test : Run the Buildkite pipeline.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 30, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Enterprise

Run ID: fb83ca47-541b-4ac0-beea-7f943ea2844a

📥 Commits

Reviewing files that changed from the base of the PR and between c42e1c4 and 1b75ee8.

📒 Files selected for processing (1)
  • heartbeat/beater/heartbeat.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • heartbeat/beater/heartbeat.go

📝 Walkthrough

Walkthrough

The backoff API was changed to make waits cancelable via contexts: Backoff.Wait(ctx context.Context) bool. Factory functions (NewExpBackoff, NewEqualJitterBackoff) no longer take a done channel. WaitOnError now accepts a context.Context. Call sites across Filebeat, Heartbeat, libbeat, outputs (including Redis), MQTT, Kafka, journald, AWS S3, Azure Event Hub, otelconsumer, and tests were updated to create backoffs without a done channel and to pass contexts into Wait(...) calls.

🚥 Pre-merge checks | ✅ 2
✅ Passed checks (2 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • 🛠️ Update Documentation: Commit on current branch
  • 🛠️ Update Documentation: Create PR

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 7/8 reviews remaining, refill in 7 minutes and 30 seconds.

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions

This comment has been minimized.

For the forbidigo errors forbidding usage of global logger,
I just added nolint directives to minimize the amount of unrelated changes in this PR.
In my view, the loggers issue should be fixed separately and globally
(not only in the files touched by this PR).
@github-actions

This comment has been minimized.

@github-actions
Copy link
Copy Markdown
Contributor

TL;DR

Both failed Buildkite jobs are failing on the same Filebeat integration test (TestDataAddedAfterCloseInactive) due to a timing-sensitive log assertion, not a failure in this PR’s changed code. The immediate fix is to relax or replace the "Harvester already running" log assertion with a state-based check.

Remediation

  • In filebeat/input/filestream/input_integration_test.go:1413, replace env.WaitLogsContains("Harvester already running", 2*time.Second) with a deterministic assertion based on behavior/state (for example: assert no duplicate harvester starts for the same source, or gate this assertion behind a stronger precondition that the old harvester is still active at write-event time).
  • If keeping the log assertion, increase the timing margin and add synchronization around the write-event/close-inactive boundary so the expected branch is guaranteed.
  • Re-run:
    • cd filebeat && mage goIntegTest
    • cd filebeat && mage goFIPSOnlyIntegTest
Investigation details

Root Cause

The two failing jobs report the identical failure:

  • filebeat/input/filestream/input_integration_test.go:1413 expects log line "Harvester already running" within 2*time.Second.
  • That log is emitted only in filebeat/input/filestream/internal/input-logfile/harvester.go:233 and filebeat/input/filestream/internal/input-logfile/harvester.go:271.
  • The test timing config (close.on_state_change.inactive: 1s, backoff.init/max: 3s) creates a race window around the inactive-close and subsequent write event (input_integration_test.go:1385-1389, 1407-1413). On slower/variant scheduling (including FIPS runs), the old harvester can be past that branch before the assertion window closes, so behavior can still be correct while this specific log line is absent.

This PR commit (1b75ee861f6dfbb1f11f75efc880e3a013965737) modifies heartbeat/beater/heartbeat.go and does not touch filebeat/input/filestream, which supports this being unrelated test flakiness.

Evidence

  • Build: https://buildkite.com/elastic/beats/builds/45255

  • Failed jobs:

    • :ubuntu: Filebeat: Go Integration Tests
    • :ubuntu: Filebeat: Go fips140=only Integration Tests
  • Key log excerpt (both jobs):

    === FAIL: filebeat/input/filestream TestDataAddedAfterCloseInactive (4.01s)

    input_integration_test.go:1413: did not find 'Harvester already running' in the logs

Verification

  • Ran locally: cd filebeat && go test -tags integration -run TestDataAddedAfterCloseInactive -count=5 ./input/filestream
  • Result: passed locally (non-reproducible here), consistent with a timing-sensitive/flaky assertion.

Follow-up

If you want, I can narrow this further to a concrete replacement assertion that validates single-harvester semantics without depending on debug log timing.

Note

🔒 Integrity filter blocked 2 items

The following items were blocked because they don't meet the GitHub integrity level.

To allow these resources, lower min-integrity in your GitHub frontmatter:

tools:
  github:
    min-integrity: approved  # merged | approved | unapproved | none

What is this? | From workflow: PR Buildkite Detective

Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not.

startedAt := time.Now()
assert.True(t, WaitOnError(b, errors.New("bad bad")))
assert.GreaterOrEqual(t, time.Now().Sub(startedAt), init)
assert.True(t, WaitOnError(context.Background(), b, errors.New("bad bad")))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert.True(t, WaitOnError(context.Background(), b, errors.New("bad bad")))
assert.True(t, WaitOnError(t.Context(), b, errors.New("bad bad")))

b := f(c)
close(c)
assert.False(t, b.Wait())
ctx, cancel := context.WithCancel(context.Background())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use t.Context()?

Copy link
Copy Markdown
Member

@mauri870 mauri870 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codewise looks good, pending the CI issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-skip Skip notification from the automated backport with mergify cleanup skip-changelog Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants