Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watcher: Reload properly on remote shard change #33167

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -22,13 +21,16 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand All @@ -45,7 +47,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
Setting.boolSetting("xpack.watcher.require_manual_start", false, Property.NodeScope);

private final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STARTED);
private final AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList());
private final AtomicReference<List<ShardRouting>> previousShardRoutings = new AtomicReference<>(Collections.emptyList());
private final boolean requireManualStart;
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
private volatile WatcherService watcherService;
Expand Down Expand Up @@ -144,15 +146,20 @@ public void clusterChanged(ClusterChangedEvent event) {
return;
}

List<String> currentAllocationIds = localShards.stream()
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.sorted()
// also check if non local shards have changed, as loosing a shard on a
// remote node or adding a replica on a remote node needs to trigger a reload too
Set<ShardId> localShardIds = localShards.stream().map(ShardRouting::shardId).collect(Collectors.toSet());
List<ShardRouting> allShards = event.state().routingTable().index(watchIndex).shardsWithState(STARTED);
allShards.addAll(event.state().routingTable().index(watchIndex).shardsWithState(RELOCATING));
List<ShardRouting> localAffectedShardRoutings = allShards.stream()
.filter(shardRouting -> localShardIds.contains(shardRouting.shardId()))
// shardrouting is not comparable, so we need some order mechanism
.sorted(Comparator.comparing(ShardRouting::hashCode))
.collect(Collectors.toList());

if (previousAllocationIds.get().equals(currentAllocationIds) == false) {
if (previousShardRoutings.get().equals(localAffectedShardRoutings) == false) {
if (watcherService.validate(event.state())) {
previousAllocationIds.set(Collections.unmodifiableList(currentAllocationIds));
previousShardRoutings.set(localAffectedShardRoutings);
if (state.get() == WatcherState.STARTED) {
watcherService.reload(event.state(), "new local watcher shard allocation ids");
} else if (state.get() == WatcherState.STOPPED) {
Expand Down Expand Up @@ -187,13 +194,13 @@ private boolean isWatcherStoppedManually(ClusterState state) {
* @return true, if existing allocation ids were cleaned out, false otherwise
*/
private boolean clearAllocationIds() {
List<String> previousIds = previousAllocationIds.getAndSet(Collections.emptyList());
return previousIds.equals(Collections.emptyList()) == false;
List<ShardRouting> previousIds = previousShardRoutings.getAndSet(Collections.emptyList());
return previousIds.isEmpty() == false;
}

// for testing purposes only
List<String> allocationIds() {
return previousAllocationIds.get();
List<ShardRouting> shardRoutings() {
return previousShardRoutings.get();
}

public WatcherState getState() {
Expand Down
Expand Up @@ -254,9 +254,12 @@ public void testReplicaWasAddedOrRemoved() {
.add(newNode("node_2"))
.build();

ShardRouting firstShardOnSecondNode = TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED);
ShardRouting secondShardOnFirstNode = TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED);

IndexRoutingTable previousWatchRoutingTable = IndexRoutingTable.builder(watchIndex)
.addShard(TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED))
.addShard(TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED))
.addShard(secondShardOnFirstNode)
.addShard(firstShardOnSecondNode)
.build();

IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX)
Expand All @@ -273,10 +276,19 @@ public void testReplicaWasAddedOrRemoved() {
.metaData(MetaData.builder().put(indexMetaData, false))
.build();

// add a replica in the local node
boolean addShardOnLocalNode = randomBoolean();
final ShardRouting addedShardRouting;
if (addShardOnLocalNode) {
addedShardRouting = TestShardRouting.newShardRouting(shardId, "node_1", false, STARTED);
} else {
addedShardRouting = TestShardRouting.newShardRouting(secondShardId, "node_2", false, STARTED);
}

IndexRoutingTable currentWatchRoutingTable = IndexRoutingTable.builder(watchIndex)
.addShard(TestShardRouting.newShardRouting(shardId, "node_1", false, STARTED))
.addShard(TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED))
.addShard(TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED))
.addShard(secondShardOnFirstNode)
.addShard(firstShardOnSecondNode)
.addShard(addedShardRouting)
.build();

ClusterState stateWithReplicaAdded = ClusterState.builder(new ClusterName("my-cluster"))
Expand Down Expand Up @@ -477,7 +489,67 @@ public void testDataNodeWithoutDataCanStart() {
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
}

private ClusterState startWatcher() {
// this emulates a node outage somewhere in the cluster that carried a watcher shard
// the number of shards remains the same, but we need to ensure that watcher properly reloads
// previously we only checked the local shard allocations, but we also need to check if shards in the cluster have changed
public void testWatcherReloadsOnNodeOutageWithWatcherShard() {
Index watchIndex = new Index(Watch.INDEX, "foo");
ShardId shardId = new ShardId(watchIndex, 0);
String localNodeId = randomFrom("node_1", "node_2");
String outageNodeId = localNodeId.equals("node_1") ? "node_2" : "node_1";
DiscoveryNodes previousDiscoveryNodes = new DiscoveryNodes.Builder().masterNodeId(localNodeId).localNodeId(localNodeId)
.add(newNode(localNodeId))
.add(newNode(outageNodeId))
.build();

ShardRouting replicaShardRouting = TestShardRouting.newShardRouting(shardId, localNodeId, false, STARTED);
ShardRouting primartShardRouting = TestShardRouting.newShardRouting(shardId, outageNodeId, true, STARTED);
IndexRoutingTable previousWatchRoutingTable = IndexRoutingTable.builder(watchIndex)
.addShard(replicaShardRouting)
.addShard(primartShardRouting)
.build();

IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX)
.settings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)
).build();

ClusterState previousState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(previousDiscoveryNodes)
.routingTable(RoutingTable.builder().add(previousWatchRoutingTable).build())
.metaData(MetaData.builder().put(indexMetaData, false))
.build();

ShardRouting nowPrimaryShardRouting = replicaShardRouting.moveActiveReplicaToPrimary();
IndexRoutingTable currentWatchRoutingTable = IndexRoutingTable.builder(watchIndex)
.addShard(nowPrimaryShardRouting)
.build();

DiscoveryNodes currentDiscoveryNodes = new DiscoveryNodes.Builder().masterNodeId(localNodeId).localNodeId(localNodeId)
.add(newNode(localNodeId))
.build();

ClusterState currentState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(currentDiscoveryNodes)
.routingTable(RoutingTable.builder().add(currentWatchRoutingTable).build())
.metaData(MetaData.builder().put(indexMetaData, false))
.build();

// initialize the previous state, so all the allocation ids are loaded
when(watcherService.validate(anyObject())).thenReturn(true);
lifeCycleService.clusterChanged(new ClusterChangedEvent("whatever", previousState, currentState));

reset(watcherService);
when(watcherService.validate(anyObject())).thenReturn(true);
ClusterChangedEvent event = new ClusterChangedEvent("whatever", currentState, previousState);
lifeCycleService.clusterChanged(event);
verify(watcherService).reload(eq(event.state()), anyString());
}

private void startWatcher() {
Index index = new Index(Watch.INDEX, "uuid");
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addShard(
Expand Down Expand Up @@ -506,12 +578,10 @@ private ClusterState startWatcher() {
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, emptyState));
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
verify(watcherService, times(1)).reload(eq(state), anyString());
assertThat(lifeCycleService.allocationIds(), hasSize(1));
assertThat(lifeCycleService.shardRoutings(), hasSize(1));

// reset the mock, the user has to mock everything themselves again
reset(watcherService);

return state;
}

private List<String> randomIndexPatterns() {
Expand Down