Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-fetch shard info of primary when new node joins #47035

Merged
merged 11 commits into from
Sep 28, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ protected synchronized void processAsyncFetch(List<T> responses, List<FailedNode
*/
protected abstract void reroute(ShardId shardId, String reason);

/**
* Clear cache for node, ensuring next fetch will fetch a fresh copy.
*/
synchronized void clearCacheForNode(String nodeId) {
cache.remove(nodeId);
}

/**
* Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from
* it nodes that are no longer part of the state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.gateway;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand All @@ -27,6 +28,7 @@
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -37,14 +39,19 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.AsyncShardFetch.Lister;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class GatewayAllocator {

Expand All @@ -59,6 +66,7 @@ public class GatewayAllocator {
asyncFetchStarted = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<ShardId, AsyncShardFetch<NodeStoreFilesMetaData>>
asyncFetchStore = ConcurrentCollections.newConcurrentMap();
private Set<String> lastSeenEphemeralIds = Collections.emptySet();

@Inject
public GatewayAllocator(RerouteService rerouteService, NodeClient client) {
Expand Down Expand Up @@ -109,6 +117,7 @@ public void applyFailedShards(final RoutingAllocation allocation, final List<Fai
public void allocateUnassigned(final RoutingAllocation allocation) {
assert primaryShardAllocator != null;
assert replicaShardAllocator != null;
ensureAsyncFetchStorePrimaryRecency(allocation);
innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator);
}

Expand Down Expand Up @@ -138,6 +147,43 @@ public AllocateUnassignedDecision decideUnassignedShardAllocation(ShardRouting u
}
}

/**
* Clear the fetched data for the primary to ensure we do not cancel recoveries based on excessively stale data.
*/
private void ensureAsyncFetchStorePrimaryRecency(RoutingAllocation allocation) {
DiscoveryNodes nodes = allocation.nodes();
if (hasNewNodes(nodes)) {
final Set<String> newEphemeralIds = StreamSupport.stream(nodes.getDataNodes().spliterator(), false)
.map(node -> node.value.getEphemeralId()).collect(Collectors.toSet());
// Invalidate the cache if a data node has been added to the cluster. This ensures that we do not cancel a recovery if a node
// drops out, we fetch the shard data, then some indexing happens and then the node rejoins the cluster again. There are other
// ways we could decide to cancel a recovery based on stale data (e.g. changing allocation filters or a primary failure) but
// making the wrong decision here is not catastrophic so we only need to cover the common case.
logger.trace(() -> new ParameterizedMessage(
"new nodes {} found, clearing primary async-fetch-store cache", Sets.difference(newEphemeralIds, lastSeenEphemeralIds)));
asyncFetchStore.values().forEach(fetch -> clearCacheForPrimary(fetch, allocation));
// recalc to also (lazily) clear out old nodes.
this.lastSeenEphemeralIds = newEphemeralIds;
}
}

private static void clearCacheForPrimary(AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetch,
RoutingAllocation allocation) {
ShardRouting primary = allocation.routingNodes().activePrimary(fetch.shardId);
if (primary != null) {
fetch.clearCacheForNode(primary.currentNodeId());
}
}

private boolean hasNewNodes(DiscoveryNodes nodes) {
for (ObjectObjectCursor<String, DiscoveryNode> node : nodes.getDataNodes()) {
if (lastSeenEphemeralIds.contains(node.value.getEphemeralId()) == false) {
return true;
}
}
return false;
}

class InternalAsyncFetch<T extends BaseNodeResponse> extends AsyncShardFetch<T> {

InternalAsyncFetch(Logger logger, String type, ShardId shardId, Lister<? extends BaseNodesResponse<T>, T> action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class AsyncShardFetchTests extends ESTestCase {
private final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(),
Collections.singleton(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT);
private final Response response1 = new Response(node1);
private final Response response1_2 = new Response(node1);
private final Throwable failure1 = new Throwable("simulated failure 1");
private final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), Collections.emptyMap(),
Collections.singleton(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT);
Expand Down Expand Up @@ -274,6 +275,85 @@ public void testTwoNodesAddedInBetween() throws Exception {
assertThat(fetchData.getData().get(node2), sameInstance(response2));
}

public void testClearCache() throws Exception {
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).build();
test.addSimulation(node1.getId(), response1);

// must work also with no data
test.clearCacheForNode(node1.getId());

// no fetched data, request still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(1));

// verify we get back right data from node
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));

// second fetch gets same data
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));

test.clearCacheForNode(node1.getId());

// prepare next request
test.addSimulation(node1.getId(), response1_2);

// no fetched data, new request on going
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(2));

// verify we get new data back
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1_2));
}

public void testConcurrentRequestAndClearCache() throws Exception {
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).build();
test.addSimulation(node1.getId(), response1);

// no fetched data, request still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));

// clear cache while request is still on going, before it is processed
test.clearCacheForNode(node1.getId());

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(1));

// prepare next request
test.addSimulation(node1.getId(), response1_2);

// verify still no fetched data, request still on going
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));

test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(2));

// verify we get new data back
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1_2));

}

static class TestFetch extends AsyncShardFetch<Response> {

static class Entry {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.gateway;

import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class ReplicaShardAllocatorIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class);
}

public void testRecentPrimaryInformation() throws Exception {
String indexName = "test";
String nodeWithPrimary = internalCluster().startNode();
assertAcked(
client().admin().indices().prepareCreate(indexName)
.setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.getKey(), 1.0f)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "1ms")));
String nodeWithReplica = internalCluster().startDataOnlyNode();
Settings nodeWithReplicaSettings = internalCluster().dataPathSettings(nodeWithReplica);
ensureGreen(indexName);
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(10, 100))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList()));
assertBusy(() -> {
SyncedFlushResponse syncedFlushResponse = client().admin().indices().prepareSyncedFlush(indexName).get();
assertThat(syncedFlushResponse.successfulShards(), equalTo(2));
});
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeWithReplica));
if (randomBoolean()) {
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(10, 100))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList()));
}
CountDownLatch blockRecovery = new CountDownLatch(1);
CountDownLatch recoveryStarted = new CountDownLatch(1);
MockTransportService transportServiceOnPrimary
= (MockTransportService) internalCluster().getInstance(TransportService.class, nodeWithPrimary);
transportServiceOnPrimary.addSendBehavior((connection, requestId, action, request, options) -> {
if (PeerRecoveryTargetService.Actions.FILES_INFO.equals(action)) {
recoveryStarted.countDown();
try {
blockRecovery.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
connection.sendRequest(requestId, action, request, options);
});
String newNode = internalCluster().startDataOnlyNode();
recoveryStarted.await();
// destroy sync_id after the recovery on the new node has started
client().admin().indices().prepareFlush(indexName).setForce(true).get();
// AllocationService only calls GatewayAllocator if there are unassigned shards
assertAcked(client().admin().indices().prepareCreate("dummy-index").setWaitForActiveShards(0)
.setSettings(Settings.builder().put("index.routing.allocation.require.attr", "not-found")));
internalCluster().startDataOnlyNode(nodeWithReplicaSettings);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
// need to wait for events to ensure the reroute has happened since we perform it async when a new node joins.
client().admin().cluster().prepareHealth(indexName).setWaitForYellowStatus().setWaitForEvents(Priority.LANGUID).get();
blockRecovery.countDown();
ensureGreen(indexName);
assertThat(internalCluster().nodesInclude(indexName), hasItem(newNode));
transportServiceOnPrimary.clearAllRules();
}
}