Skip to content

Commit

Permalink
Sequence number based replica allocation (#46959)
Browse files Browse the repository at this point in the history
With this change, shard allocation prefers allocating replicas on a node 
that already has a copy of the shard that is as close as possible to the
primary, so that it is as cheap as possible to bring the new replica in
sync with the primary. Furthermore, if we find a copy that is identical
to the primary then we cancel an ongoing recovery because the new copy
which is identical to the primary needs no work to recover as a replica.

We no longer need to perform a synced flush before performing a rolling 
upgrade or full cluster start with this improvement.

Closes #46318
  • Loading branch information
dnhatn committed Oct 13, 2019
1 parent d8f5a3d commit e628f35
Show file tree
Hide file tree
Showing 7 changed files with 508 additions and 108 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<Repli
/**
* Id for a peer recovery retention lease for the given node. See {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
*/
static String getPeerRecoveryRetentionLeaseId(String nodeId) {
public static String getPeerRecoveryRetentionLeaseId(String nodeId) {
return "peer_recovery/" + nodeId;
}

Expand All @@ -541,6 +541,15 @@ public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting)
return getPeerRecoveryRetentionLeaseId(shardRouting.currentNodeId());
}

/**
* Returns a list of peer recovery retention leases installed in this replication group
*/
public List<RetentionLease> getPeerRecoveryRetentionLeases() {
return getRetentionLeases().leases().stream()
.filter(lease -> PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(lease.source()))
.collect(Collectors.toUnmodifiableList());
}

/**
* Advance the peer-recovery retention leases for all assigned shard copies to discard history below the corresponding global
* checkpoint, and renew any leases that are approaching expiry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2619,6 +2619,13 @@ public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<Repli
replicationTracker.removePeerRecoveryRetentionLease(nodeId, listener);
}

/**
* Returns a list of retention leases for peer recovery installed in this shard copy.
*/
public List<RetentionLease> getPeerRecoveryRetentionLeases() {
return replicationTracker.getPeerRecoveryRetentionLeases();
}

private SafeCommitInfo getSafeCommitInfo() {
final Engine engine = getEngineOrNull();
return engine == null ? SafeCommitInfo.EMPTY : engine.getSafeCommitInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -43,6 +44,8 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
Expand All @@ -54,6 +57,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -119,15 +123,16 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException
IndexShard indexShard = indexService.getShardOrNull(shardId.id());
if (indexShard != null) {
try {
final StoreFilesMetaData storeFilesMetaData = new StoreFilesMetaData(shardId, indexShard.snapshotStoreMetadata());
final StoreFilesMetaData storeFilesMetaData = new StoreFilesMetaData(shardId,
indexShard.snapshotStoreMetadata(), indexShard.getPeerRecoveryRetentionLeases());
exists = true;
return storeFilesMetaData;
} catch (org.apache.lucene.index.IndexNotFoundException e) {
logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e);
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY);
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
} catch (IOException e) {
logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e);
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY);
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
}
}
}
Expand All @@ -142,20 +147,23 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException
}
if (metaData == null) {
logger.trace("{} node doesn't have meta data for the requests index, responding with empty", shardId);
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY);
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
}
final IndexSettings indexSettings = indexService != null ? indexService.getIndexSettings() :
new IndexSettings(metaData, settings);
final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings);
if (shardPath == null) {
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY);
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
}
// note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means:
// 1) a shard is being constructed, which means the master will not use a copy of this replica
// 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the master may not
// reuse local resources.
return new StoreFilesMetaData(shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId,
nodeEnv::shardLock, logger));
final Store.MetadataSnapshot metadataSnapshot =
Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger);
// We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when
// we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard.
return new StoreFilesMetaData(shardId, metadataSnapshot, Collections.emptyList());
} finally {
TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS);
if (exists) {
Expand All @@ -167,17 +175,34 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException
}

public static class StoreFilesMetaData implements Iterable<StoreFileMetaData>, Writeable {
private ShardId shardId;
Store.MetadataSnapshot metadataSnapshot;
private final ShardId shardId;
private final Store.MetadataSnapshot metadataSnapshot;
private final List<RetentionLease> peerRecoveryRetentionLeases;

public StoreFilesMetaData(ShardId shardId, Store.MetadataSnapshot metadataSnapshot,
List<RetentionLease> peerRecoveryRetentionLeases) {
this.shardId = shardId;
this.metadataSnapshot = metadataSnapshot;
this.peerRecoveryRetentionLeases = peerRecoveryRetentionLeases;
}

public StoreFilesMetaData(StreamInput in) throws IOException {
this.shardId = new ShardId(in);
this.metadataSnapshot = new Store.MetadataSnapshot(in);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
this.peerRecoveryRetentionLeases = in.readList(RetentionLease::new);
} else {
this.peerRecoveryRetentionLeases = Collections.emptyList();
}
}

public StoreFilesMetaData(ShardId shardId, Store.MetadataSnapshot metadataSnapshot) {
this.shardId = shardId;
this.metadataSnapshot = metadataSnapshot;
@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
metadataSnapshot.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeList(peerRecoveryRetentionLeases);
}
}

public ShardId shardId() {
Expand All @@ -201,10 +226,18 @@ public StoreFileMetaData file(String name) {
return metadataSnapshot.asMap().get(name);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
metadataSnapshot.writeTo(out);
/**
* Returns the retaining sequence number of the peer recovery retention lease for a given node if exists; otherwise, returns -1.
*/
public long getPeerRecoveryRetentionLeaseRetainingSeqNo(DiscoveryNode node) {
assert node != null;
final String retentionLeaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId());
return peerRecoveryRetentionLeases.stream().filter(lease -> lease.id().equals(retentionLeaseId))
.mapToLong(RetentionLease::retainingSequenceNumber).findFirst().orElse(-1L);
}

public List<RetentionLease> peerRecoveryRetentionLeases() {
return peerRecoveryRetentionLeases;
}

/**
Expand Down

0 comments on commit e628f35

Please sign in to comment.