Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling timeout exceptions on watcher startup #90421

Merged
merged 4 commits into from Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/90421.yaml
@@ -0,0 +1,6 @@
pr: 90421
summary: Handling timeout exceptions on watcher startup
area: Watcher
type: bug
issues:
- 44981
Expand Up @@ -101,7 +101,11 @@ public void clusterChanged(ClusterChangedEvent event) {
// if this is not a data node, we need to start it ourselves possibly
if (event.state().nodes().getLocalNode().canContainData() == false && isWatcherStoppedManually == false && isStoppedOrStopping) {
this.state.set(WatcherState.STARTING);
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
watcherService.start(
event.state(),
() -> this.state.set(WatcherState.STARTED),
(exception -> this.state.set(WatcherState.STOPPED))
);
return;
}

Expand Down Expand Up @@ -165,7 +169,10 @@ public void clusterChanged(ClusterChangedEvent event) {
watcherService.reload(event.state(), "new local watcher shard allocation ids");
} else if (isStoppedOrStopping) {
this.state.set(WatcherState.STARTING);
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED), (exception) -> {
clearAllocationIds();
this.state.set(WatcherState.STOPPED);
});
}
} else {
clearAllocationIds();
Expand Down
Expand Up @@ -230,14 +230,17 @@ void reload(ClusterState state, String reason) {
* @param state the current cluster state
* @param postWatchesLoadedCallback the callback to be triggered, when watches where loaded successfully
*/
public void start(ClusterState state, Runnable postWatchesLoadedCallback) {
public void start(ClusterState state, Runnable postWatchesLoadedCallback, Consumer<Exception> exceptionConsumer) {
executionService.unPause();
processedClusterStateVersion.set(state.getVersion());
executor.execute(wrapWatcherService(() -> {
if (reloadInner(state, "starting", true)) {
postWatchesLoadedCallback.run();
}
}, e -> logger.error("error starting watcher", e)));
}, e -> {
logger.error("error starting watcher", e);
exceptionConsumer.accept(e);
}));
}

/**
Expand Down Expand Up @@ -311,13 +314,7 @@ private Collection<Watch> loadWatches(ClusterState clusterState) {
SearchResponse response = null;
List<Watch> watches = new ArrayList<>();
try {
RefreshResponse refreshResponse = client.admin()
.indices()
.refresh(new RefreshRequest(INDEX))
.actionGet(TimeValue.timeValueSeconds(5));
if (refreshResponse.getSuccessfulShards() < indexMetadata.getNumberOfShards()) {
throw illegalState("not all required shards have been refreshed");
}
refreshWatches(indexMetadata);

// find out local shards
String watchIndexName = indexMetadata.getIndex().getName();
Expand Down Expand Up @@ -403,6 +400,17 @@ private Collection<Watch> loadWatches(ClusterState clusterState) {
return watches;
}

// Non private for unit testing purposes
void refreshWatches(IndexMetadata indexMetadata) {
RefreshResponse refreshResponse = client.admin()
.indices()
.refresh(new RefreshRequest(INDEX))
.actionGet(TimeValue.timeValueSeconds(5));
if (refreshResponse.getSuccessfulShards() < indexMetadata.getNumberOfShards()) {
throw illegalState("not all required shards have been refreshed");
}
}

/**
* Find out if the watch with this id, should be parsed and triggered on this node
*
Expand Down
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.watcher;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand Down Expand Up @@ -41,13 +42,17 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.Arrays.asList;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -184,14 +189,79 @@ public void testManualStartStop() {
reset(watcherService);
when(watcherService.validate(clusterState)).thenReturn(true);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, stoppedClusterState));
verify(watcherService, times(1)).start(eq(clusterState), any());
verify(watcherService, times(1)).start(eq(clusterState), any(), any());

// no change, keep going
reset(watcherService);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
verifyNoMoreInteractions(watcherService);
}

@SuppressWarnings("unchecked")
public void testExceptionOnStart() {
/*
* This tests that if watcher fails to start because of some exception (for example a timeout while refreshing indices) that it
* will fail gracefully, and will start the next time there is a cluster change event if there is no exception that time.
*/
Index index = new Index(Watch.INDEX, "uuid");
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addShard(
TestShardRouting.newShardRouting(new ShardId(index, 0), "node_1", true, ShardRoutingState.STARTED)
);
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(Watch.INDEX)
.settings(settings(Version.CURRENT).put(IndexMetadata.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required
.numberOfShards(1)
.numberOfReplicas(0);
Metadata.Builder metadataBuilder = Metadata.builder()
.put(indexMetadataBuilder)
.put(IndexTemplateMetadata.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns()));
if (randomBoolean()) {
metadataBuilder.putCustom(WatcherMetadata.TYPE, new WatcherMetadata(false));
}
Metadata metadata = metadataBuilder.build();
IndexRoutingTable indexRoutingTable = indexRoutingTableBuilder.build();
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")))
.routingTable(RoutingTable.builder().add(indexRoutingTable).build())
.metadata(metadata)
.build();

// mark watcher manually as stopped
ClusterState stoppedClusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")))
.routingTable(RoutingTable.builder().add(indexRoutingTable).build())
.metadata(Metadata.builder(metadata).putCustom(WatcherMetadata.TYPE, new WatcherMetadata(true)).build())
.build();

lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stoppedClusterState, clusterState));
assertThat(lifeCycleService.getState().get(), equalTo(WatcherState.STOPPING));

// Now attempt to start watcher with a simulated TimeoutException. Should be stopped
when(watcherService.validate(clusterState)).thenReturn(true);
AtomicBoolean exceptionHit = new AtomicBoolean(false);
doAnswer(invocation -> {
Consumer<Exception> exceptionConsumer = invocation.getArgument(2);
exceptionConsumer.accept(new ElasticsearchTimeoutException(new TimeoutException("Artificial timeout")));
exceptionHit.set(true);
return null;
}).when(watcherService).start(any(), any(), any(Consumer.class));
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
assertTrue("Expected simulated timeout not hit", exceptionHit.get());
assertThat(lifeCycleService.getState().get(), equalTo(WatcherState.STOPPED));

// And now attempt to start watcher with no exception. It should start up.
AtomicBoolean runnableCalled = new AtomicBoolean(false);
doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(1);
runnable.run();
runnableCalled.set(true);
return null;
}).when(watcherService).start(any(), any(Runnable.class), any());
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
assertTrue("Runnable not called", runnableCalled.get());
assertThat(lifeCycleService.getState().get(), equalTo(WatcherState.STARTED));
}

public void testNoLocalShards() {
Index watchIndex = new Index(Watch.INDEX, "foo");
ShardId shardId = new ShardId(watchIndex, 0);
Expand Down Expand Up @@ -453,7 +523,7 @@ public void testWatcherServiceDoesNotStartIfIndexTemplatesAreMissing() throws Ex
when(watcherService.validate(eq(state))).thenReturn(true);

lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state));
verify(watcherService, times(0)).start(any(ClusterState.class), any());
verify(watcherService, times(0)).start(any(ClusterState.class), any(), any());
}

public void testWatcherStopsWhenMasterNodeIsMissing() {
Expand Down
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.watcher;

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
Expand Down Expand Up @@ -64,7 +65,10 @@
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -244,7 +248,7 @@ void stopExecutor() {}
return null;
}).when(client).execute(eq(ClearScrollAction.INSTANCE), any(ClearScrollRequest.class), anyActionListener());

service.start(clusterState, () -> {});
service.start(clusterState, () -> {}, exception -> {});

ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
verify(triggerService).start(captor.capture());
Expand All @@ -253,6 +257,45 @@ void stopExecutor() {}
assertThat(watches, hasSize(activeWatchCount));
}

public void testExceptionHandling() {
/*
* This tests that if the WatcherService throws an exception while refreshing indices that the exception is handled by the
* exception consumer rather than being propagated higher in the stack.
*/
TriggerService triggerService = mock(TriggerService.class);
TriggeredWatchStore triggeredWatchStore = mock(TriggeredWatchStore.class);
ExecutionService executionService = mock(ExecutionService.class);
WatchParser parser = mock(WatchParser.class);
final ElasticsearchTimeoutException exception = new ElasticsearchTimeoutException(new TimeoutException("Artifical timeout"));
WatcherService service = new WatcherService(
Settings.EMPTY,
triggerService,
triggeredWatchStore,
executionService,
parser,
client,
EsExecutors.DIRECT_EXECUTOR_SERVICE
) {
@Override
void refreshWatches(IndexMetadata indexMetadata) {
throw exception;
}
};

ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
Metadata.Builder metadataBuilder = Metadata.builder();
Settings indexSettings = settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metadataBuilder.put(IndexMetadata.builder(Watch.INDEX).settings(indexSettings));
csBuilder.metadata(metadataBuilder);
ClusterState clusterState = csBuilder.build();

AtomicReference<Exception> exceptionReference = new AtomicReference<>();
service.start(clusterState, () -> { fail("Excepted an exception"); }, exceptionReference::set);
assertThat(exceptionReference.get(), equalTo(exception));
}

@SuppressWarnings({ "unchecked", "rawtypes" })
public void testPausingWatcherServiceAlsoPausesTriggerService() {
String engineType = "foo";
Expand Down