Skip to content

Fix flaky TestDataSampler/GetSamplesForPCollectionsTooManySamples#38736

Merged
shunping merged 1 commit into
apache:masterfrom
shunping:fix-test-data-sampler
May 29, 2026
Merged

Fix flaky TestDataSampler/GetSamplesForPCollectionsTooManySamples#38736
shunping merged 1 commit into
apache:masterfrom
shunping:fix-test-data-sampler

Conversation

@shunping
Copy link
Copy Markdown
Collaborator

Since GetSamples is a destructive operation that clears stored samples, polling it in a loop is flaky. If it is called before all asynchronous samples have finished processing, it consumes the early samples and clears them, causing verifySampledElements to fail.

In this PR, we wait for the sampler to finish before reading to ensure all elements are successfully captured.

Failed test:
https://github.com/apache/beam/actions/runs/26611540188/job/78418312179?pr=38713

--- FAIL: TestDataSampler (2.04s)
    --- FAIL: TestDataSampler/GetSamplesForPCollectionsTooManySamples (0.01s)
        datasampler_test.go:122: got an unexpected sampled elements: map[pid1:[0x30405c9c2b40]], want: map[pid1:[0x30405c9c29c0 0x30405c9c2a00 0x30405c9c27c0 0x30405c9c2800 0x30405c9c2840 0x30405c9c2880 0x30405c9c28c0 0x30405c9c2900 0x30405c9c2940 0x30405c9c2980]]
beam:coder:iterable:v1-singleChunk
beam:coder:iterable:v1-multiChunk
beam:coder:state_backed_iterable:v1
beam:coder:iterable:v1-singleChunk
beam:coder:iterable:v1-multiChunk
beam:coder:state_backed_iterable:v1
2026/05/29 01:09:19 INFO Unsuccessful split: failed to split DataSource (at index: 0, last index: 0) at fraction 0.0000 with requested splits (2 indices from 0 to 3)
2026/05/29 01:09:19 INFO Unsuccessful split: failed to split DataSource (at index: 0, last index: 9223372036854775807) at fraction 0.0000 with requested splits (1 indices from 0 to 0)
2026/05/29 01:09:19 INFO Unsuccessful split: failed to split DataSource (at index: 5, last index: 9223372036854775807) at fraction 0.0000 with requested splits (1 indices from 0 to 0)
2026/05/29 01:09:19 INFO Unsuccessful split: failed to split DataSource (at index: 5, last index: 0) at fraction 0.0000 with requested splits (1 indices from 0 to 0)
FAIL

Since GetSamples is a destructive operation that clears stored samples,
polling it in a loop is flaky. If it is called before all asynchronous
samples have finished processing, it consumes the early samples and clears
them, causing verifySampledElements to fail.

In this PR, we wait for the sampler to finish before reading to ensure
all elements are successfully captured.
@github-actions github-actions Bot added the go label May 29, 2026
@shunping shunping marked this pull request as ready for review May 29, 2026 02:40
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a race condition in the DataSampler test suite where the destructive nature of the GetSamples operation caused tests to fail when polled prematurely. By removing the loop and waiting for the sampler to complete its work, the test stability is significantly improved.

Highlights

  • Test Flakiness Resolution: Removed the polling loop in the TestDataSampler test suite to prevent premature consumption of samples, which was causing intermittent failures.
  • Synchronization Improvement: Introduced a single wait period to ensure all asynchronous samples are fully processed before calling the destructive GetSamples method.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request simplifies the test logic in TestDataSampler by replacing a polling loop with a hardcoded sleep of 1 second. The reviewer correctly notes that a hardcoded sleep is a test anti-pattern that can lead to flakiness in CI environments and unnecessary delays. They recommend implementing a polling and accumulation loop with a much smaller sleep interval (10ms) to ensure both speed and reliability.

Comment on lines +107 to +109
time.Sleep(1 * time.Second)
samples := dataSampler.GetSamples(test.pids)
samplesCount := len(samples)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using a hardcoded time.Sleep(1 * time.Second) in unit tests is an anti-pattern. It introduces unnecessary delays when tests run quickly, and can still cause flakiness under heavy CI load if the asynchronous operations take longer than 1 second.

Since GetSamples is a destructive operation that clears the internal buffer, we can resolve this by polling and accumulating the samples in a loop with a small sleep interval (e.g., 10ms) until all expected samples are received. This makes the test both faster and highly resilient.

			samples := make(map[string][]*DataSample)
			for i := 0; i < 100; i++ {
				for pid, s := range dataSampler.GetSamples(test.pids) {
					samples[pid] = append(samples[pid], s...)
				}
				match := true
				for pid, wantSamples := range test.want {
					if len(samples[pid]) < len(wantSamples) {
						match = false
						break
					}
				}
				if match {
					break
				}
				time.Sleep(10 * time.Millisecond)
			}
			samplesCount := len(samples)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really anti-pattern?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is. And 1 second is especially long.

These days one might try and use the synctest package which subverts time, but the nature of beam kinda prevents the necessary goroutine shenanigans.

I need to look at this on a real screen before i stamp it. Synctest might work if this isn't in a pipeline execution context...

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not in a pipeline execution context. Only a unit test for data sampler.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, we could probably use synctest, but outside of the academic value of doing so, I don't think there's a good point at this stage.

This is still better than it was before (one sleep instead of several lossy samples. The main issue with the previous test is the lossiness since I guess it assumed that it was all or nothing and didn't maintain the intermediate samples during it's five checks. That said this feels like it might be just as flaky as before...

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

The 1-second sleep acts as a hard timeout to ensure we don't miss anything. Gemini's proposal is only a more efficient implementation of that same timeout, where it allows to finish faster if all samples arrive early.

The core issue is that we don't have a deterministic signal for when all asynchronous samples have finished processing. And it remains unsolved here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's see if this will reduce the flakiness. If not, then we will revisit the test and try synctest maybe.

@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @lostluck for label go.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Comment on lines +107 to +109
time.Sleep(1 * time.Second)
samples := dataSampler.GetSamples(test.pids)
samplesCount := len(samples)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, we could probably use synctest, but outside of the academic value of doing so, I don't think there's a good point at this stage.

This is still better than it was before (one sleep instead of several lossy samples. The main issue with the previous test is the lossiness since I guess it assumed that it was all or nothing and didn't maintain the intermediate samples during it's five checks. That said this feels like it might be just as flaky as before...

@shunping shunping merged commit cbc2c21 into apache:master May 29, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants