Skip to content

Commit

Permalink
Watcher: Ensure state is cleaned properly in watcher life cycle servi…
Browse files Browse the repository at this point in the history
…ce (elastic/x-pack-elasticsearch#3770)

The WatcherLifeCycleService is responsible for deciding if watches need
to be reloaded while running. In order to do this, the service stores
the currently local shard allocation ids in a List.

This data structure however was not properly updated all the time, when
it should have been - for example when a master node is not available.

This lead to unintended reloads, even though there was no change in the
allocated shards. This in turn lead to unwanted executions and unwanted
loading of triggered watches.

This commit should also fix one of the more nasty ongoing test failures,
where the test returns with an exception that only parts of watcher have
been started. The AbstractWatcherIntegrationTestCase now properly waits
until watcher is started before starting the test case itself.

Original commit: elastic/x-pack-elasticsearch@097f12a900b879d3a9fab5aeaf57f5b4be10def0
  • Loading branch information
spinscale committed Jan 30, 2018
1 parent da7560a commit c65528b
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 85 deletions.
Expand Up @@ -21,11 +21,11 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.upgrade.UpgradeField;
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.core.upgrade.UpgradeField;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;

Expand All @@ -43,7 +43,6 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
private final WatcherService watcherService;
private final ExecutorService executor;
private AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList());
private volatile WatcherMetaData watcherMetaData;
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.


Expand All @@ -61,7 +60,6 @@ public void beforeStop() {
shutDown();
}
});
watcherMetaData = new WatcherMetaData(!settings.getAsBoolean("xpack.watcher.start_immediately", true));
}

public synchronized void stop(String reason) {
Expand All @@ -73,7 +71,7 @@ synchronized void shutDown() {
stop("shutdown initiated");
}

private synchronized void start(ClusterState state, boolean manual) {
private synchronized void start(ClusterState state) {
if (shutDown) {
return;
}
Expand All @@ -86,7 +84,8 @@ private synchronized void start(ClusterState state, boolean manual) {

// If we start from a cluster state update we need to check if previously we stopped manually
// otherwise Watcher would start upon the next cluster state update while the user instructed Watcher to not run
if (!manual && watcherMetaData != null && watcherMetaData.manuallyStopped()) {
WatcherMetaData watcherMetaData = state.getMetaData().custom(WatcherMetaData.TYPE);
if (watcherMetaData != null && watcherMetaData.manuallyStopped()) {
logger.debug("not starting watcher. watcher was stopped manually and therefore cannot be auto-started");
return;
}
Expand All @@ -99,8 +98,10 @@ private synchronized void start(ClusterState state, boolean manual) {
}

if (watcherService.validate(state)) {
logger.trace("starting... (based on cluster state version [{}]) (manual [{}])", state.getVersion(), manual);
logger.trace("starting... (based on cluster state version [{}])", state.getVersion());
try {
// we need to populate the allocation ids before the next cluster state listener comes in
checkAndSetAllocationIds(state, false);
watcherService.start(state);
} catch (Exception e) {
logger.warn("failed to start watcher. please wait for the cluster to become ready or try to start Watcher manually", e);
Expand All @@ -120,68 +121,30 @@ private synchronized void start(ClusterState state, boolean manual) {
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) || shutDown) {
clearAllocationIds();
// wait until the gateway has recovered from disk, otherwise we think may not have .watches and
// a .triggered_watches index, but they may not have been restored from the cluster state on disk
return;
}

if (Strings.isNullOrEmpty(event.state().nodes().getMasterNodeId())) {
clearAllocationIds();
executor.execute(() -> this.stop("no master node"));
return;
}

if (event.state().getBlocks().hasGlobalBlock(ClusterBlockLevel.WRITE)) {
clearAllocationIds();
executor.execute(() -> this.stop("write level cluster block"));
return;
}

// find out if watcher was stopped or started manually due to this cluster state change
WatcherMetaData watcherMetaData = event.state().getMetaData().custom(WatcherMetaData.TYPE);

if (watcherMetaData != null) {
this.watcherMetaData = watcherMetaData;
}

boolean currentWatcherStopped = watcherMetaData != null && watcherMetaData.manuallyStopped() == true;
if (currentWatcherStopped) {
if (isWatcherStoppedManually(event.state())) {
clearAllocationIds();
executor.execute(() -> this.stop("watcher manually marked to shutdown by cluster state update"));
} else {
if (watcherService.state() == WatcherState.STARTED && event.state().nodes().getLocalNode().isDataNode()) {
DiscoveryNode localNode = event.state().nodes().getLocalNode();
RoutingNode routingNode = event.state().getRoutingNodes().node(localNode.getId());
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());

// no watcher index, time to pause, as there are for sure no shards on this node
if (watcherIndexMetaData == null) {
if (previousAllocationIds.get().isEmpty() == false) {
previousAllocationIds.set(Collections.emptyList());
executor.execute(() -> watcherService.pauseExecution("no watcher index found"));
}
return;
}

String watchIndex = watcherIndexMetaData.getIndex().getName();
List<ShardRouting> localShards = routingNode.shardsWithState(watchIndex, RELOCATING, STARTED);

// no local shards, empty out watcher and not waste resources!
if (localShards.isEmpty()) {
if (previousAllocationIds.get().isEmpty() == false) {
executor.execute(() -> watcherService.pauseExecution("no local watcher shards"));
previousAllocationIds.set(Collections.emptyList());
}
return;
}

List<String> currentAllocationIds = localShards.stream()
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.collect(Collectors.toList());
Collections.sort(currentAllocationIds);

if (previousAllocationIds.get().equals(currentAllocationIds) == false) {
previousAllocationIds.set(currentAllocationIds);
executor.execute(() -> watcherService.reload(event.state(), "different shards allocated on this node"));
}
checkAndSetAllocationIds(event.state(), true);
} else if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) {
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
IndexMetaData triggeredWatchesIndexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStoreField.INDEX_NAME,
Expand All @@ -191,7 +154,8 @@ public void clusterChanged(ClusterChangedEvent event) {
boolean isIndexInternalFormatTriggeredWatchIndex = triggeredWatchesIndexMetaData == null ||
UpgradeField.checkInternalIndexFormat(triggeredWatchesIndexMetaData);
if (isIndexInternalFormatTriggeredWatchIndex && isIndexInternalFormatWatchIndex) {
executor.execute(() -> start(event.state(), false));
checkAndSetAllocationIds(event.state(), false);
executor.execute(() -> start(event.state()));
} else {
logger.warn("not starting watcher, upgrade API run required: .watches[{}], .triggered_watches[{}]",
isIndexInternalFormatWatchIndex, isIndexInternalFormatTriggeredWatchIndex);
Expand All @@ -200,7 +164,75 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}

public WatcherMetaData watcherMetaData() {
return watcherMetaData;
/**
* check if watcher has been stopped manually via the stop API
*/
private boolean isWatcherStoppedManually(ClusterState state) {
WatcherMetaData watcherMetaData = state.getMetaData().custom(WatcherMetaData.TYPE);
return watcherMetaData != null && watcherMetaData.manuallyStopped();
}

/**
* check and optionally set the current allocation ids
*
* @param state the current cluster state
* @param callWatcherService should the watcher service be called for starting/stopping/reloading or should this be treated as a
* dryrun so that the caller is responsible for this
*/
private void checkAndSetAllocationIds(ClusterState state, boolean callWatcherService) {
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, state.metaData());
if (watcherIndexMetaData == null) {
if (clearAllocationIds() && callWatcherService) {
executor.execute(() -> watcherService.pauseExecution("no watcher index found"));
}
return;
}

DiscoveryNode localNode = state.nodes().getLocalNode();
RoutingNode routingNode = state.getRoutingNodes().node(localNode.getId());
// this can happen if the node does not hold any data
if (routingNode == null) {
if (clearAllocationIds() && callWatcherService) {
executor.execute(() -> watcherService.pauseExecution("no routing node for local node found, network issue?"));
}
return;
}

String watchIndex = watcherIndexMetaData.getIndex().getName();
List<ShardRouting> localShards = routingNode.shardsWithState(watchIndex, RELOCATING, STARTED);
// no local shards, empty out watcher and dont waste resources!
if (localShards.isEmpty()) {
if (clearAllocationIds() && callWatcherService) {
executor.execute(() -> watcherService.pauseExecution("no local watcher shards found"));
}
return;
}

List<String> currentAllocationIds = localShards.stream()
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.collect(Collectors.toList());
Collections.sort(currentAllocationIds);

if (previousAllocationIds.get().equals(currentAllocationIds) == false) {
previousAllocationIds.set(Collections.unmodifiableList(currentAllocationIds));
if (callWatcherService) {
executor.execute(() -> watcherService.reload(state, "new local watcher shard allocation ids"));
}
}
}

/**
* clear out current allocation ids if not already happened
* @return true, if existing allocation ids were cleaned out, false otherwise
*/
private boolean clearAllocationIds() {
List<String> previousIds = previousAllocationIds.getAndSet(Collections.emptyList());
return previousIds.equals(Collections.emptyList()) == false;
}

// for testing purposes only
List<String> allocationIds() {
return previousAllocationIds.get();
}
}
Expand Up @@ -14,10 +14,10 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsRequest;
import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse;
import org.elasticsearch.xpack.watcher.WatcherLifeCycleService;
import org.elasticsearch.xpack.watcher.WatcherService;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
Expand All @@ -33,27 +33,24 @@ public class TransportWatcherStatsAction extends TransportNodesAction<WatcherSta
private final WatcherService watcherService;
private final ExecutionService executionService;
private final TriggerService triggerService;
private final WatcherLifeCycleService lifeCycleService;

@Inject
public TransportWatcherStatsAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, WatcherService watcherService,
ExecutionService executionService, TriggerService triggerService,
WatcherLifeCycleService lifeCycleService) {
ExecutionService executionService, TriggerService triggerService) {
super(settings, WatcherStatsAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
WatcherStatsRequest::new, WatcherStatsRequest.Node::new, ThreadPool.Names.MANAGEMENT,
WatcherStatsResponse.Node.class);
this.watcherService = watcherService;
this.executionService = executionService;
this.triggerService = triggerService;
this.lifeCycleService = lifeCycleService;
}

@Override
protected WatcherStatsResponse newResponse(WatcherStatsRequest request, List<WatcherStatsResponse.Node> nodes,
List<FailedNodeException> failures) {
return new WatcherStatsResponse(clusterService.getClusterName(), lifeCycleService.watcherMetaData(), nodes, failures);
return new WatcherStatsResponse(clusterService.getClusterName(), getWatcherMetaData(), nodes, failures);
}

@Override
Expand Down Expand Up @@ -83,4 +80,11 @@ protected WatcherStatsResponse.Node nodeOperation(WatcherStatsRequest.Node reque
return statsResponse;
}

private WatcherMetaData getWatcherMetaData() {
WatcherMetaData watcherMetaData = clusterService.state().getMetaData().custom(WatcherMetaData.TYPE);
if (watcherMetaData == null) {
watcherMetaData = new WatcherMetaData(false);
}
return watcherMetaData;
}
}
Expand Up @@ -20,6 +20,8 @@
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -50,6 +52,7 @@
import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME;
import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.TRIGGERED_TEMPLATE_NAME;
import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.WATCHES_TEMPLATE_NAME;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
Expand Down Expand Up @@ -278,15 +281,15 @@ public void testNoLocalShards() throws Exception {

// set current allocation ids
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithLocalShards, clusterStateWithoutLocalShards));
verify(watcherService, times(0)).pauseExecution(eq("no local watcher shards"));
verify(watcherService, times(0)).pauseExecution(eq("no local watcher shards found"));

// no more local hards, lets pause execution
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithoutLocalShards, clusterStateWithLocalShards));
verify(watcherService, times(1)).pauseExecution(eq("no local watcher shards"));
verify(watcherService, times(1)).pauseExecution(eq("no local watcher shards found"));

// no further invocations should happen if the cluster state does not change in regard to local shards
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithoutLocalShards, clusterStateWithoutLocalShards));
verify(watcherService, times(1)).pauseExecution(eq("no local watcher shards"));
verify(watcherService, times(1)).pauseExecution(eq("no local watcher shards found"));
}

public void testReplicaWasAddedOrRemoved() throws Exception {
Expand Down Expand Up @@ -539,6 +542,41 @@ public void testWatcherStopsOnClusterLevelBlock() {
verify(watcherService, times(1)).stop(eq("write level cluster block"));
}

public void testStateIsSetImmediately() throws Exception {
Index index = new Index(Watch.INDEX, "foo");
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addShard(
TestShardRouting.newShardRouting(Watch.INDEX, 0, "node_1", true, ShardRoutingState.STARTED));
IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(Watch.INDEX).settings(settings(Version.CURRENT)
.put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required
.numberOfShards(1).numberOfReplicas(0);
ClusterState state = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1")
.add(newNode("node_1")))
.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build())
.metaData(MetaData.builder()
.put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(indexMetaDataBuilder)
.build())
.build();
when(watcherService.validate(state)).thenReturn(true);
when(watcherService.state()).thenReturn(WatcherState.STOPPED);

lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, state));
verify(watcherService, times(1)).start(eq(state));
assertThat(lifeCycleService.allocationIds(), hasSize(1));

// now do any cluster state upgrade, see that reload gets triggers, but should not
when(watcherService.state()).thenReturn(WatcherState.STARTED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, state));
verify(watcherService, never()).pauseExecution(anyString());

verify(watcherService, never()).reload(eq(state), anyString());
assertThat(lifeCycleService.allocationIds(), hasSize(1));
}

private List<String> randomIndexPatterns() {
return IntStream.range(0, between(1, 10))
.mapToObj(n -> randomAlphaOfLengthBetween(1, 100))
Expand Down

0 comments on commit c65528b

Please sign in to comment.