Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions sdks/go/pkg/beam/core/runtime/exec/datasampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,9 @@ func TestDataSampler(t *testing.T) {
for _, sample := range test.samples {
dataSampler.SendSample(sample.PCollectionID, sample.Element, sample.Timestamp)
}
var samplesCount = -1
var samples map[string][]*DataSample
for i := 0; i < 5; i++ {
samples = dataSampler.GetSamples(test.pids)
if len(samples) == len(test.want) {
samplesCount = len(samples)
break
}
time.Sleep(time.Second)
}
time.Sleep(1 * time.Second)
samples := dataSampler.GetSamples(test.pids)
samplesCount := len(samples)
Comment on lines +107 to +109
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using a hardcoded time.Sleep(1 * time.Second) in unit tests is an anti-pattern. It introduces unnecessary delays when tests run quickly, and can still cause flakiness under heavy CI load if the asynchronous operations take longer than 1 second.

Since GetSamples is a destructive operation that clears the internal buffer, we can resolve this by polling and accumulating the samples in a loop with a small sleep interval (e.g., 10ms) until all expected samples are received. This makes the test both faster and highly resilient.

			samples := make(map[string][]*DataSample)
			for i := 0; i < 100; i++ {
				for pid, s := range dataSampler.GetSamples(test.pids) {
					samples[pid] = append(samples[pid], s...)
				}
				match := true
				for pid, wantSamples := range test.want {
					if len(samples[pid]) < len(wantSamples) {
						match = false
						break
					}
				}
				if match {
					break
				}
				time.Sleep(10 * time.Millisecond)
			}
			samplesCount := len(samples)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really anti-pattern?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is. And 1 second is especially long.

These days one might try and use the synctest package which subverts time, but the nature of beam kinda prevents the necessary goroutine shenanigans.

I need to look at this on a real screen before i stamp it. Synctest might work if this isn't in a pipeline execution context...

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not in a pipeline execution context. Only a unit test for data sampler.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, we could probably use synctest, but outside of the academic value of doing so, I don't think there's a good point at this stage.

This is still better than it was before (one sleep instead of several lossy samples. The main issue with the previous test is the lossiness since I guess it assumed that it was all or nothing and didn't maintain the intermediate samples during it's five checks. That said this feels like it might be just as flaky as before...

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

The 1-second sleep acts as a hard timeout to ensure we don't miss anything. Gemini's proposal is only a more efficient implementation of that same timeout, where it allows to finish faster if all samples arrive early.

The core issue is that we don't have a deterministic signal for when all asynchronous samples have finished processing. And it remains unsolved here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's see if this will reduce the flakiness. If not, then we will revisit the test and try synctest maybe.

cancel()
if samplesCount != len(test.want) {
t.Errorf("got an unexpected number of sampled elements: %v, want: %v", samplesCount, len(test.want))
Expand Down
Loading