Skip to content

Commit

Permalink
Re-fetch shard info of primary when new node joins (#47035)
Browse files Browse the repository at this point in the history
Today, we don't clear the shard info of the primary shard when a new
node joins; then we might risk of making replica allocation decisions
based on the stale information of the primary. The serious problem is
that we can cancel the current recovery which is more advanced than the
copy on the new node due to the old info we have from the primary.

With this change, we ensure the shard info from the primary is not older
than any node when allocating replicas.

Relates #46959

This work was done by Henning in #42518.

Co-authored-by: Henning Andersen <henning.andersen@elastic.co>
  • Loading branch information
dnhatn and henningandersen committed Oct 2, 2019
1 parent ff15495 commit 5cfcd7c
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,13 @@ protected synchronized void processAsyncFetch(List<T> responses, List<FailedNode
*/
protected abstract void reroute(ShardId shardId, String reason);

/**
* Clear cache for node, ensuring next fetch will fetch a fresh copy.
*/
synchronized void clearCacheForNode(String nodeId) {
cache.remove(nodeId);
}

/**
* Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from
* it nodes that are no longer part of the state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

package org.elasticsearch.gateway;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -35,11 +38,16 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class GatewayAllocator {

Expand All @@ -54,6 +62,7 @@ public class GatewayAllocator {
asyncFetchStarted = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData>>
asyncFetchStore = ConcurrentCollections.newConcurrentMap();
private Set<String> lastSeenEphemeralIds = Collections.emptySet();

@Inject
public GatewayAllocator(RerouteService rerouteService,
Expand Down Expand Up @@ -106,6 +115,7 @@ public void applyFailedShards(final RoutingAllocation allocation, final List<Fai
public void allocateUnassigned(final RoutingAllocation allocation) {
assert primaryShardAllocator != null;
assert replicaShardAllocator != null;
ensureAsyncFetchStorePrimaryRecency(allocation);
innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator);
}

Expand Down Expand Up @@ -138,6 +148,43 @@ public AllocateUnassignedDecision decideUnassignedShardAllocation(ShardRouting u
}
}

/**
* Clear the fetched data for the primary to ensure we do not cancel recoveries based on excessively stale data.
*/
private void ensureAsyncFetchStorePrimaryRecency(RoutingAllocation allocation) {
DiscoveryNodes nodes = allocation.nodes();
if (hasNewNodes(nodes)) {
final Set<String> newEphemeralIds = StreamSupport.stream(nodes.getDataNodes().spliterator(), false)
.map(node -> node.value.getEphemeralId()).collect(Collectors.toSet());
// Invalidate the cache if a data node has been added to the cluster. This ensures that we do not cancel a recovery if a node
// drops out, we fetch the shard data, then some indexing happens and then the node rejoins the cluster again. There are other
// ways we could decide to cancel a recovery based on stale data (e.g. changing allocation filters or a primary failure) but
// making the wrong decision here is not catastrophic so we only need to cover the common case.
logger.trace(() -> new ParameterizedMessage(
"new nodes {} found, clearing primary async-fetch-store cache", Sets.difference(newEphemeralIds, lastSeenEphemeralIds)));
asyncFetchStore.values().forEach(fetch -> clearCacheForPrimary(fetch, allocation));
// recalc to also (lazily) clear out old nodes.
this.lastSeenEphemeralIds = newEphemeralIds;
}
}

private static void clearCacheForPrimary(AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetch,
RoutingAllocation allocation) {
ShardRouting primary = allocation.routingNodes().activePrimary(fetch.shardId);
if (primary != null) {
fetch.clearCacheForNode(primary.currentNodeId());
}
}

private boolean hasNewNodes(DiscoveryNodes nodes) {
for (ObjectObjectCursor<String, DiscoveryNode> node : nodes.getDataNodes()) {
if (lastSeenEphemeralIds.contains(node.value.getEphemeralId()) == false) {
return true;
}
}
return false;
}

class InternalAsyncFetch<T extends BaseNodeResponse> extends AsyncShardFetch<T> {

InternalAsyncFetch(Logger logger, String type, ShardId shardId, Lister<? extends BaseNodesResponse<T>, T> action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class AsyncShardFetchTests extends ESTestCase {
private final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(),
Collections.singleton(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT);
private final Response response1 = new Response(node1);
private final Response response1_2 = new Response(node1);
private final Throwable failure1 = new Throwable("simulated failure 1");
private final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), Collections.emptyMap(),
Collections.singleton(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT);
Expand Down Expand Up @@ -274,6 +275,85 @@ public void testTwoNodesAddedInBetween() throws Exception {
assertThat(fetchData.getData().get(node2), sameInstance(response2));
}

public void testClearCache() throws Exception {
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).build();
test.addSimulation(node1.getId(), response1);

// must work also with no data
test.clearCacheForNode(node1.getId());

// no fetched data, request still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(1));

// verify we get back right data from node
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));

// second fetch gets same data
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));

test.clearCacheForNode(node1.getId());

// prepare next request
test.addSimulation(node1.getId(), response1_2);

// no fetched data, new request on going
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(2));

// verify we get new data back
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1_2));
}

public void testConcurrentRequestAndClearCache() throws Exception {
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).build();
test.addSimulation(node1.getId(), response1);

// no fetched data, request still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));

// clear cache while request is still on going, before it is processed
test.clearCacheForNode(node1.getId());

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(1));

// prepare next request
test.addSimulation(node1.getId(), response1_2);

// verify still no fetched data, request still on going
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(2));

// verify we get new data back
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1_2));

}

static class TestFetch extends AsyncShardFetch<Response> {

static class Entry {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.gateway;

import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class ReplicaShardAllocatorIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class);
}

public void testRecentPrimaryInformation() throws Exception {
String indexName = "test";
String nodeWithPrimary = internalCluster().startNode();
assertAcked(
client().admin().indices().prepareCreate(indexName)
.setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 1.0f)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "1ms")));
String nodeWithReplica = internalCluster().startDataOnlyNode();
Settings nodeWithReplicaSettings = internalCluster().dataPathSettings(nodeWithReplica);
ensureGreen(indexName);
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(10, 100))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList()));
assertBusy(() -> {
SyncedFlushResponse syncedFlushResponse = client().admin().indices().prepareSyncedFlush(indexName).get();
assertThat(syncedFlushResponse.successfulShards(), equalTo(2));
});
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeWithReplica));
if (randomBoolean()) {
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(10, 100))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList()));
}
CountDownLatch blockRecovery = new CountDownLatch(1);
CountDownLatch recoveryStarted = new CountDownLatch(1);
MockTransportService transportServiceOnPrimary
= (MockTransportService) internalCluster().getInstance(TransportService.class, nodeWithPrimary);
transportServiceOnPrimary.addSendBehavior((connection, requestId, action, request, options) -> {
if (PeerRecoveryTargetService.Actions.FILES_INFO.equals(action)) {
recoveryStarted.countDown();
try {
blockRecovery.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
connection.sendRequest(requestId, action, request, options);
});
String newNode = internalCluster().startDataOnlyNode();
recoveryStarted.await();
// destroy sync_id after the recovery on the new node has started
client().admin().indices().prepareFlush(indexName).setForce(true).get();
// AllocationService only calls GatewayAllocator if there are unassigned shards
assertAcked(client().admin().indices().prepareCreate("dummy-index").setWaitForActiveShards(0)
.setSettings(Settings.builder().put("index.routing.allocation.require.attr", "not-found")));
internalCluster().startDataOnlyNode(nodeWithReplicaSettings);
// need to wait for events to ensure the reroute has happened since we perform it async when a new node joins.
client().admin().cluster().prepareHealth(indexName).setWaitForYellowStatus().setWaitForEvents(Priority.LANGUID).get();
blockRecovery.countDown();
ensureGreen(indexName);
assertThat(internalCluster().nodesInclude(indexName), hasItem(newNode));
transportServiceOnPrimary.clearAllRules();
}
}

0 comments on commit 5cfcd7c

Please sign in to comment.