Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequence number based replica allocation #46959

Merged
merged 14 commits into from
Oct 13, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@

package org.elasticsearch.gateway;

import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
Expand All @@ -41,6 +38,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData;
Expand All @@ -49,15 +47,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BooleanSupplier;

import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;

public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
/**
* Process existing recoveries of replicas and see if we need to cancel them if we find a better
* match. Today, a better match is one that has full sync id match compared to not having one in
* the previous recovery.
* match. Today, a better match is one that has full sync id match or peer recovery retention lease
* compared to not having one in the previous recovery.
*/
public void processExistingRecoveries(RoutingAllocation allocation) {
MetaData metaData = allocation.metaData();
Expand Down Expand Up @@ -100,18 +98,24 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
if (matchingNodes.getNodeWithHighestMatch() != null) {
DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId());
DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch();
// current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider
final String currentSyncId;
if (shardStores.getData().containsKey(currentNode)) {
currentSyncId = shardStores.getData().get(currentNode).storeFilesMetaData().syncId();
} else {
currentSyncId = null;
}
BooleanSupplier currentNodeCanSkipPhase1 = () -> {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeMetadata =
shardStores.getData().get(currentNode).storeFilesMetaData();
// current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider
if (storeMetadata == null) {
return false;
}
IndexMetaData indexMetadata = allocation.metaData().index(shard.index());
if (canPerformOperationBasedRecovery(indexMetadata, primaryStore, currentNode, storeMetadata)) {
return true;
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
}
final String currentSyncId = storeMetadata.syncId();
return currentSyncId != null && currentSyncId.equals(primaryStore.syncId());
};

if (currentNode.equals(nodeWithHighestMatch) == false
&& Objects.equals(currentSyncId, primaryStore.syncId()) == false
&& matchingNodes.isNodeMatchBySyncID(nodeWithHighestMatch)) {
// we found a better match that has a full sync id match, the existing allocation is not fully synced
// so we found a better one, cancel this one
&& matchingNodes.canSkipPhase1(nodeWithHighestMatch) && currentNodeCanSkipPhase1.getAsBoolean() == false) {
// we found a better match that can skip phase 1, cancel the existing allocation.
logger.debug("cancelling allocation of replica on [{}], sync id match found on node [{}]",
currentNode, nodeWithHighestMatch);
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
Expand Down Expand Up @@ -313,7 +317,7 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data,
boolean explain) {
ObjectLongMap<DiscoveryNode> nodesToSize = new ObjectLongHashMap<>();
Map<DiscoveryNode, MatchingNode> matchingNodes = new HashMap<>();
Map<String, NodeAllocationResult> nodeDecisions = explain ? new HashMap<>() : null;
for (Map.Entry<DiscoveryNode, NodeStoreFilesMetaData> nodeStoreEntry : data.getData().entrySet()) {
DiscoveryNode discoNode = nodeStoreEntry.getKey();
Expand All @@ -332,33 +336,38 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
final IndexMetaData indexMetaData = allocation.metaData().index(primaryStore.shardId().getIndex());

long matchingBytes = -1;
MatchingNode matchingNode = null;
if (explain) {
matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData);
ShardStoreInfo shardStoreInfo = new ShardStoreInfo(matchingBytes);
matchingNode = new MatchingNode(computeMatchingBytes(primaryStore, storeFilesMetaData),
canPerformOperationBasedRecovery(indexMetaData, primaryStore, discoNode, storeFilesMetaData));
ShardStoreInfo shardStoreInfo = new ShardStoreInfo(matchingNode.matchingBytes);
nodeDecisions.put(node.nodeId(), new NodeAllocationResult(discoNode, shardStoreInfo, decision));
}

if (decision.type() == Decision.Type.NO) {
continue;
}

if (matchingBytes < 0) {
matchingBytes = computeMatchingBytes(primaryStore, storeFilesMetaData);
if (matchingNode == null) {
matchingNode = new MatchingNode(computeMatchingBytes(primaryStore, storeFilesMetaData),
canPerformOperationBasedRecovery(indexMetaData, primaryStore, discoNode, storeFilesMetaData));
}
nodesToSize.put(discoNode, matchingBytes);
matchingNodes.put(discoNode, matchingNode);
if (logger.isTraceEnabled()) {
if (matchingBytes == Long.MAX_VALUE) {
if (matchingNode.matchingBytes == Long.MAX_VALUE) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
logger.trace("{}: node [{}] has same sync id {} as primary", shard, discoNode.getName(), storeFilesMetaData.syncId());
} else if (matchingNode.operationBasedRecovery){
logger.trace("{}: node [{}] can perform operation-based recovery", shard, discoNode.getName());
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
} else {
logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data",
shard, discoNode.getName(), new ByteSizeValue(matchingBytes), matchingBytes);
shard, discoNode.getName(), new ByteSizeValue(matchingNode.matchingBytes), matchingNode.matchingBytes);
}
}
}

return new MatchingNodes(nodesToSize, nodeDecisions);
return new MatchingNodes(matchingNodes, nodeDecisions);
}

private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
Expand All @@ -380,30 +389,63 @@ private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.St
}
}

private static boolean canPerformOperationBasedRecovery(
IndexMetaData indexMetaData, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
DiscoveryNode replicaNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData replicaStore) {
if (replicaStore.isEmpty()) {
// a corrupted store - it won't be able to perform operation-based recovery
return false;
}
// If an index is closed or frozen, we can perform an operation-based recovery only if the last commit on the replica is safe and
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
// has the same operations as the last commit on the primary. Here we must ignore the peer recovery retention lease as we don't
// have the persisted global checkpoint from the replica to determine if the last commit is safe. However, the likelihood that
// the last commit unsafe is very small. It's probably okay to use the retention lease if the sequence numbers of the last commit
// from the primary and replica equal.
if (indexMetaData.getState() == IndexMetaData.State.CLOSE ||
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.get(indexMetaData.getSettings())) {
return false;
}
final String retentionLeaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaNode.getId());
return primaryStore.peerRecoveryRetentionLeases().stream().anyMatch(lease -> lease.id().equals(retentionLeaseId));
}


protected abstract AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation);

/**
* Returns a boolean indicating whether fetching shard data has been triggered at any point for the given shard.
*/
protected abstract boolean hasInitiatedFetching(ShardRouting shard);

private static class MatchingNode {
final long matchingBytes;
final boolean operationBasedRecovery;

MatchingNode(long matchingBytes, boolean operationBasedRecovery) {
this.matchingBytes = matchingBytes;
this.operationBasedRecovery = operationBasedRecovery;
}
}

static class MatchingNodes {
private final ObjectLongMap<DiscoveryNode> nodesToSize;
private final Map<DiscoveryNode, MatchingNode> matchingNodes;
private final DiscoveryNode nodeWithHighestMatch;
@Nullable
private final Map<String, NodeAllocationResult> nodeDecisions;

MatchingNodes(ObjectLongMap<DiscoveryNode> nodesToSize, @Nullable Map<String, NodeAllocationResult> nodeDecisions) {
this.nodesToSize = nodesToSize;
MatchingNodes(Map<DiscoveryNode, MatchingNode> matchingNodes, @Nullable Map<String, NodeAllocationResult> nodeDecisions) {
this.matchingNodes = matchingNodes;
this.nodeDecisions = nodeDecisions;

long highestMatchSize = 0;
MatchingNode highestMatchValue = new MatchingNode(0, false);
DiscoveryNode highestMatchNode = null;

for (ObjectLongCursor<DiscoveryNode> cursor : nodesToSize) {
if (cursor.value > highestMatchSize) {
highestMatchSize = cursor.value;
highestMatchNode = cursor.key;
for (Map.Entry<DiscoveryNode, MatchingNode> entry : matchingNodes.entrySet()) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
if (highestMatchValue.operationBasedRecovery == false && entry.getValue().operationBasedRecovery ||
(highestMatchValue.operationBasedRecovery == entry.getValue().operationBasedRecovery &&
highestMatchValue.matchingBytes < entry.getValue().matchingBytes)){
highestMatchValue = entry.getValue();
highestMatchNode = entry.getKey();
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
}
}
this.nodeWithHighestMatch = highestMatchNode;
Expand All @@ -418,15 +460,16 @@ public DiscoveryNode getNodeWithHighestMatch() {
return this.nodeWithHighestMatch;
}

public boolean isNodeMatchBySyncID(DiscoveryNode node) {
return nodesToSize.get(node) == Long.MAX_VALUE;
boolean canSkipPhase1(DiscoveryNode node) {
final MatchingNode matchingNode = matchingNodes.get(node);
return matchingNode.matchingBytes == Long.MAX_VALUE || matchingNode.operationBasedRecovery;
}

/**
* Did we manage to find any data, regardless how well they matched or not.
*/
public boolean hasAnyData() {
return nodesToSize.isEmpty() == false;
return matchingNodes.isEmpty() == false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<Repli
/**
* Id for a peer recovery retention lease for the given node. See {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
*/
static String getPeerRecoveryRetentionLeaseId(String nodeId) {
public static String getPeerRecoveryRetentionLeaseId(String nodeId) {
return "peer_recovery/" + nodeId;
}

Expand All @@ -541,6 +541,15 @@ public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting)
return getPeerRecoveryRetentionLeaseId(shardRouting.currentNodeId());
}

/**
* Returns a list of peer recovery retention leases installed in this replication group
*/
public List<RetentionLease> getPeerRecoveryRetentionLeases() {
return getRetentionLeases().leases().stream()
.filter(lease -> PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(lease.source()))
.collect(Collectors.toUnmodifiableList());
}

/**
* Advance the peer-recovery retention leases for all assigned shard copies to discard history below the corresponding global
* checkpoint, and renew any leases that are approaching expiry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2633,6 +2633,13 @@ public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<Repli
replicationTracker.removePeerRecoveryRetentionLease(nodeId, listener);
}

/**
* Returns a list of retention leases for peer recovery installed in this shard copy.
*/
public List<RetentionLease> getPeerRecoveryRetentionLeases() {
henningandersen marked this conversation as resolved.
Show resolved Hide resolved
return replicationTracker.getPeerRecoveryRetentionLeases();
}

private SafeCommitInfo getSafeCommitInfo() {
final Engine engine = getEngineOrNull();
return engine == null ? SafeCommitInfo.EMPTY : engine.getSafeCommitInfo();
Expand Down