Skip to content

Commit

Permalink
[7.17] Handling exceptions on watcher reload (#105442) (#106209)
Browse files Browse the repository at this point in the history
  • Loading branch information
sakurai-youhei committed Mar 13, 2024
1 parent 2e97844 commit 057843e
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 11 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/105442.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 105442
summary: Handling exceptions on watcher reload
area: Watcher
type: bug
issues:
- 69842
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ public void clusterChanged(ClusterChangedEvent event) {
if (watcherService.validate(event.state())) {
previousShardRoutings.set(localAffectedShardRoutings);
if (state.get() == WatcherState.STARTED) {
watcherService.reload(event.state(), "new local watcher shard allocation ids");
watcherService.reload(event.state(), "new local watcher shard allocation ids", (exception) -> {
clearAllocationIds(); // will cause reload again
});
} else if (isStoppedOrStopping) {
this.state.set(WatcherState.STARTING);
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED), (exception) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ void stopExecutor() {
* Reload the watcher service, does not switch the state from stopped to started, just keep going
* @param state cluster state, which is needed to find out about local shards
*/
void reload(ClusterState state, String reason) {
void reload(ClusterState state, String reason, Consumer<Exception> exceptionConsumer) {
boolean hasValidWatcherTemplates = WatcherIndexTemplateRegistry.validate(state);
if (hasValidWatcherTemplates == false) {
logger.warn("missing watcher index templates");
Expand All @@ -222,7 +222,10 @@ void reload(ClusterState state, String reason) {
int cancelledTaskCount = executionService.clearExecutionsAndQueue(() -> {});
logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);

executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false), e -> logger.error("error reloading watcher", e)));
executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false), e -> {
logger.error("error reloading watcher", e);
exceptionConsumer.accept(e);
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,91 @@ public void testExceptionOnStart() {
assertThat(lifeCycleService.getState().get(), equalTo(WatcherState.STARTED));
}

public void testReloadWithIdenticalRoutingTable() {
/*
* This tests that the identical routing table causes reload only once.
*/
startWatcher();

ClusterChangedEvent[] events = masterChangeScenario();
assertThat(events[1].previousState(), equalTo(events[0].state()));
assertFalse(events[1].routingTableChanged());

for (ClusterChangedEvent event : events) {
when(watcherService.validate(event.state())).thenReturn(true);
lifeCycleService.clusterChanged(event);
}
// reload should occur on the first event
verify(watcherService).reload(eq(events[0].state()), anyString(), any());
// but it shouldn't on the second event unless routing table changes
verify(watcherService, never()).reload(eq(events[1].state()), anyString(), any());
}

public void testReloadWithIdenticalRoutingTableAfterException() {
/*
* This tests that even the identical routing table causes reload again if some exception (for example a timeout while loading
* watches) interrupted the previous one.
*/
startWatcher();

ClusterChangedEvent[] events = masterChangeScenario();
assertThat(events[1].previousState(), equalTo(events[0].state()));
assertFalse(events[1].routingTableChanged());

// simulate exception on the first event
doAnswer(invocation -> {
Consumer<Exception> exceptionConsumer = invocation.getArgument(2);
exceptionConsumer.accept(new ElasticsearchTimeoutException(new TimeoutException("Artificial timeout")));
return null;
}).when(watcherService).reload(eq(events[0].state()), anyString(), any());

for (ClusterChangedEvent event : events) {
when(watcherService.validate(event.state())).thenReturn(true);
lifeCycleService.clusterChanged(event);
}
// reload should occur on the first event but it fails
verify(watcherService).reload(eq(events[0].state()), anyString(), any());
// reload should occur again on the second event because the previous one failed
verify(watcherService).reload(eq(events[1].state()), anyString(), any());
}

private ClusterChangedEvent[] masterChangeScenario() {
DiscoveryNodes nodes = new DiscoveryNodes.Builder().localNodeId("node_1").add(newNode("node_1")).add(newNode("node_2")).build();

Index index = new Index(Watch.INDEX, "uuid");
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addShard(
TestShardRouting.newShardRouting(new ShardId(index, 0), "node_1", true, ShardRoutingState.STARTED)
);
RoutingTable routingTable = RoutingTable.builder().add(indexRoutingTableBuilder.build()).build();

IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(Watch.INDEX)
.settings(settings(Version.CURRENT).put(IndexMetadata.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format,
// required
.numberOfShards(1)
.numberOfReplicas(0);
Metadata metadata = Metadata.builder()
.put(IndexTemplateMetadata.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(indexMetadataBuilder)
.build();

ClusterState emptyState = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).metadata(metadata).build();
ClusterState stateWithMasterNode1 = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(DiscoveryNodes.builder(nodes).masterNodeId("node_1"))
.metadata(metadata)
.routingTable(routingTable)
.build();
ClusterState stateWithMasterNode2 = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(DiscoveryNodes.builder(nodes).masterNodeId("node_2"))
.metadata(metadata)
.routingTable(routingTable)
.build();

return new ClusterChangedEvent[] {
new ClusterChangedEvent("any", stateWithMasterNode1, emptyState),
new ClusterChangedEvent("any", stateWithMasterNode2, stateWithMasterNode1) };
}

public void testNoLocalShards() {
Index watchIndex = new Index(Watch.INDEX, "foo");
ShardId shardId = new ShardId(watchIndex, 0);
Expand Down Expand Up @@ -301,7 +386,7 @@ public void testNoLocalShards() {
when(watcherService.validate(eq(clusterStateWithLocalShards))).thenReturn(true);
when(watcherService.validate(eq(clusterStateWithoutLocalShards))).thenReturn(false);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithLocalShards, clusterStateWithoutLocalShards));
verify(watcherService, times(1)).reload(eq(clusterStateWithLocalShards), eq("new local watcher shard allocation ids"));
verify(watcherService, times(1)).reload(eq(clusterStateWithLocalShards), eq("new local watcher shard allocation ids"), any());
verify(watcherService, times(1)).validate(eq(clusterStateWithLocalShards));
verifyNoMoreInteractions(watcherService);

Expand Down Expand Up @@ -386,12 +471,12 @@ public void testReplicaWasAddedOrRemoved() {

when(watcherService.validate(eq(firstEvent.state()))).thenReturn(true);
lifeCycleService.clusterChanged(firstEvent);
verify(watcherService).reload(eq(firstEvent.state()), anyString());
verify(watcherService).reload(eq(firstEvent.state()), anyString(), any());

reset(watcherService);
when(watcherService.validate(eq(secondEvent.state()))).thenReturn(true);
lifeCycleService.clusterChanged(secondEvent);
verify(watcherService).reload(eq(secondEvent.state()), anyString());
verify(watcherService).reload(eq(secondEvent.state()), anyString(), any());
}

// make sure that cluster state changes can be processed on nodes that do not hold data
Expand Down Expand Up @@ -457,7 +542,7 @@ public void testNonDataNode() {

lifeCycleService.clusterChanged(new ClusterChangedEvent("any", currentState, previousState));
verify(watcherService, times(0)).pauseExecution(any());
verify(watcherService, times(0)).reload(any(), any());
verify(watcherService, times(0)).reload(any(), any(), any());
}

public void testThatMissingWatcherIndexMetadataOnlyResetsOnce() {
Expand Down Expand Up @@ -490,7 +575,7 @@ public void testThatMissingWatcherIndexMetadataOnlyResetsOnce() {

// first add the shard allocation ids, by going from empty cs to CS with watcher index
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithWatcherIndex, clusterStateWithoutWatcherIndex));
verify(watcherService).reload(eq(clusterStateWithWatcherIndex), anyString());
verify(watcherService).reload(eq(clusterStateWithWatcherIndex), anyString(), any());

// now remove watches index, and ensure that pausing is only called once, no matter how often called (i.e. each CS update)
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithoutWatcherIndex, clusterStateWithWatcherIndex));
Expand Down Expand Up @@ -629,7 +714,7 @@ public void testWatcherReloadsOnNodeOutageWithWatcherShard() {
when(watcherService.validate(any())).thenReturn(true);
ClusterChangedEvent event = new ClusterChangedEvent("whatever", currentState, previousState);
lifeCycleService.clusterChanged(event);
verify(watcherService).reload(eq(event.state()), anyString());
verify(watcherService).reload(eq(event.state()), anyString(), any());
}

private void startWatcher() {
Expand Down Expand Up @@ -658,7 +743,7 @@ private void startWatcher() {

lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, emptyState));
assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
verify(watcherService, times(1)).reload(eq(state), anyString());
verify(watcherService, times(1)).reload(eq(state), anyString(), any());
assertThat(lifeCycleService.shardRoutings(), hasSize(1));

// reset the mock, the user has to mock everything themselves again
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -348,12 +349,38 @@ void stopExecutor() {}
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
csBuilder.metadata(Metadata.builder());

service.reload(csBuilder.build(), "whatever");
service.reload(csBuilder.build(), "whatever", exception -> {});
verify(executionService).clearExecutionsAndQueue(any());
verify(executionService, never()).pause(any());
verify(triggerService).pauseExecution();
}

// the trigger service should not start unless watches are loaded successfully
public void testReloadingWatcherDoesNotStartTriggerServiceIfFailingToLoadWatches() {
ExecutionService executionService = mock(ExecutionService.class);
TriggerService triggerService = mock(TriggerService.class);
WatcherService service = new WatcherService(
Settings.EMPTY,
triggerService,
mock(TriggeredWatchStore.class),
executionService,
mock(WatchParser.class),
client,
EsExecutors.DIRECT_EXECUTOR_SERVICE
) {
@Override
void stopExecutor() {}
};

ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
Metadata metadata = spy(Metadata.builder().build());
when(metadata.getIndicesLookup()).thenThrow(RuntimeException.class); // simulate exception in WatcherService's private loadWatches()

service.reload(csBuilder.metadata(metadata).build(), "whatever", exception -> {});
verify(triggerService).pauseExecution();
verify(triggerService, never()).start(any());
}

private static DiscoveryNode newNode() {
return new DiscoveryNode(
"node",
Expand Down

0 comments on commit 057843e

Please sign in to comment.