Skip to content

Commit

Permalink
Watcher fix ActivateWatchTests (#82157) (#82208)
Browse files Browse the repository at this point in the history
This commit attempts to fix a set of Watcher tests that can
fail due to unexpected execution of Watches after Watcher has been stopped.

The theory here is that a Watch can be queued but not fully executed
then Watcher is shutdown, the test does some clean up, then the
queued Watch finishes execution and causes some some additional cleanup
to fail.

The change here ensures that when Watcher is stopped from AbstractWatcherIntegrationTestCase
that it will also wait until there are no more current Watches executing.

closes #66495
  • Loading branch information
jakelandis committed Jan 4, 2022
1 parent b280694 commit 9918ebd
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,8 @@ protected void ensureLicenseEnabled() throws Exception {

protected void stopWatcher() throws Exception {
assertBusy(() -> {
WatcherStatsResponse watcherStatsResponse = new WatcherStatsRequestBuilder(client()).get();

WatcherStatsResponse watcherStatsResponse = new WatcherStatsRequestBuilder(client()).setIncludeCurrentWatches(true).get();
assertThat(watcherStatsResponse.hasFailures(), is(false));
List<Tuple<String, WatcherState>> currentStatesFromStatsRequest = watcherStatsResponse.getNodes()
.stream()
Expand All @@ -580,7 +581,8 @@ protected void stopWatcher() throws Exception {
.collect(Collectors.toList());
List<WatcherState> states = currentStatesFromStatsRequest.stream().map(Tuple::v2).collect(Collectors.toList());

logger.info("waiting to stop watcher, current states {}", currentStatesFromStatsRequest);
long currentWatches = watcherStatsResponse.getNodes().stream().mapToLong(n -> n.getSnapshots().size()).sum();
logger.info("waiting to stop watcher, current states {}, current watches [{}]", currentStatesFromStatsRequest, currentWatches);

boolean isAllStateStarted = states.stream().allMatch(w -> w == WatcherState.STARTED);
if (isAllStateStarted) {
Expand All @@ -594,7 +596,7 @@ protected void stopWatcher() throws Exception {
}

boolean isAllStateStopped = states.stream().allMatch(w -> w == WatcherState.STOPPED);
if (isAllStateStopped) {
if (isAllStateStopped && currentWatches == 0) {
return;
}

Expand Down

0 comments on commit 9918ebd

Please sign in to comment.