Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create missing PRRLs after primary activation #44009

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.rest.action.document.RestUpdateAction;
Expand All @@ -40,10 +45,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
Expand All @@ -53,6 +60,7 @@
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isIn;
Expand Down Expand Up @@ -382,6 +390,113 @@ public void testRecoveryWithSoftDeletes() throws Exception {
ensureGreen(index);
}

public void testRetentionLeasesEstablishedWhenPromotingPrimary() throws Exception {
final String index = "recover_and_create_leases_in_promotion";
if (CLUSTER_TYPE == ClusterType.OLD) {
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5))
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), between(1, 2)) // triggers nontrivial promotion
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
createIndex(index, settings.build());
int numDocs = randomInt(10);
indexDocs(index, 0, numDocs);
if (randomBoolean()) {
client().performRequest(new Request("POST", "/" + index + "/_flush"));
}
}
ensureGreen(index);
if (CLUSTER_TYPE == ClusterType.UPGRADED) {
assertAllCopiesHaveRetentionLeases(index);
}
}

public void testRetentionLeasesEstablishedWhenRelocatingPrimary() throws Exception {
final String index = "recover_and_create_leases_in_relocation";
switch (CLUSTER_TYPE) {
case OLD:
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5))
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), between(0, 1))
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
createIndex(index, settings.build());
int numDocs = randomInt(10);
indexDocs(index, 0, numDocs);
if (randomBoolean()) {
client().performRequest(new Request("POST", "/" + index + "/_flush"));
}
ensureGreen(index);
break;

case MIXED:
// trigger a primary relocation by excluding the last old node with a shard filter
final Map<?, ?> nodesMap
= ObjectPath.createFromResponse(client().performRequest(new Request("GET", "/_nodes"))).evaluate("nodes");
final List<String> oldNodeNames = new ArrayList<>();
for (Object nodeDetails : nodesMap.values()) {
final Map<?, ?> nodeDetailsMap = (Map<?, ?>) nodeDetails;
final String versionString = (String) nodeDetailsMap.get("version");
if (versionString.equals(Version.CURRENT.toString()) == false) {
oldNodeNames.add((String) nodeDetailsMap.get("name"));
}
}

if (oldNodeNames.size() == 1) {
final String oldNodeName = oldNodeNames.get(0);
logger.info("--> excluding index [{}] from node [{}]", index, oldNodeName);
final Request putSettingsRequest = new Request("PUT", "/" + index + "/_settings");
putSettingsRequest.setJsonEntity("{\"index.routing.allocation.exclude._name\":\"" + oldNodeName + "\"}");
assertOK(client().performRequest(putSettingsRequest));
ensureGreen(index);
assertAllCopiesHaveRetentionLeases(index);
} else {
ensureGreen(index);
}
break;

case UPGRADED:
ensureGreen(index);
assertAllCopiesHaveRetentionLeases(index);
break;
}
}

private void assertAllCopiesHaveRetentionLeases(String index) throws Exception {
assertBusy(() -> {
final Request statsRequest = new Request("GET", "/" + index + "/_stats");
statsRequest.addParameter("level", "shards");
final Map<?, ?> shardsStats = ObjectPath.createFromResponse(client().performRequest(statsRequest))
.evaluate("indices." + index + ".shards");
for (Map.Entry<?, ?> shardCopiesEntry : shardsStats.entrySet()) {
final List<?> shardCopiesList = (List<?>) shardCopiesEntry.getValue();

final Set<String> expectedLeaseIds = new HashSet<>();
for (Object shardCopyStats : shardCopiesList) {
final String nodeId
= Objects.requireNonNull((String) ((Map<?, ?>) (((Map<?, ?>) shardCopyStats).get("routing"))).get("node"));
expectedLeaseIds.add(ReplicationTracker.getPeerRecoveryRetentionLeaseId(
ShardRouting.newUnassigned(new ShardId("_na_", "test", 0), false, RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test")).initialize(nodeId, null, 0L)));
}

final Set<String> actualLeaseIds = new HashSet<>();
for (Object shardCopyStats : shardCopiesList) {
final List<?> leases
= (List<?>) ((Map<?, ?>) (((Map<?, ?>) shardCopyStats).get("retention_leases"))).get("leases");
for (Object lease : leases) {
actualLeaseIds.add(Objects.requireNonNull((String) (((Map<?, ?>) lease).get("id"))));
}
}
assertThat("[" + index + "][" + shardCopiesEntry.getKey() + "] has leases " + actualLeaseIds
+ " but expected " + expectedLeaseIds,
actualLeaseIds, hasItems(expectedLeaseIds.toArray(new String[0])));
}
});
}

/**
* This test creates an index in the non upgraded cluster and closes it. It then checks that the index
* is effectively closed and potentially replicated (if the version the index was created on supports
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.carrotsearch.hppc.ObjectLongMap;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
Expand Down Expand Up @@ -201,6 +202,14 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
private long persistedRetentionLeasesVersion;

/**
* Whether there should be a peer recovery retention lease (PRRL) for every tracked shard copy. Always true on indices created from
* {@link Version#V_7_4_0} onwards, because these versions create PRRLs properly. May be false on indices created in an earlier version
* if we recently did a rolling upgrade and {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)} has not
* yet completed. Is only permitted to change from false to true; can be removed once support for pre-PRRL indices is no longer needed.
*/
private boolean hasAllPeerRecoveryRetentionLeases;

/**
* Get all retention leases tracked on this shard.
*
Expand Down Expand Up @@ -486,10 +495,10 @@ public synchronized void renewPeerRecoveryRetentionLeases() {
if (retentionLease == null) {
/*
* If this shard copy is tracked then we got here here via a rolling upgrade from an older version that doesn't
* create peer recovery retention leases for every shard copy. TODO create leases lazily in that situation.
* create peer recovery retention leases for every shard copy.
*/
assert checkpoints.get(shardRouting.allocationId().getId()).tracked == false
|| indexSettings.getIndexVersionCreated().before(Version.V_7_4_0);
|| hasAllPeerRecoveryRetentionLeases == false;
return false;
}
return retentionLease.timestamp() <= renewalTimeMillis
Expand Down Expand Up @@ -753,7 +762,7 @@ private boolean invariant() {
if (primaryMode
&& indexSettings.isSoftDeleteEnabled()
&& indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN
&& indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0)) {
&& hasAllPeerRecoveryRetentionLeases) {
// all tracked shard copies have a corresponding peer-recovery retention lease
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
if (checkpoints.get(shardRouting.allocationId().getId()).tracked) {
Expand Down Expand Up @@ -820,6 +829,7 @@ public ReplicationTracker(
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0);
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
assert invariant();
}
Expand Down Expand Up @@ -914,30 +924,51 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) {
updateGlobalCheckpointOnPrimary();

if (indexSettings.isSoftDeleteEnabled()) {
addPeerRecoveryRetentionLeaseForSolePrimary();
}

assert invariant();
}

/**
* Creates a peer recovery retention lease for this shard, if one does not already exist and this shard is the sole shard copy in the
* replication group. If one does not already exist and yet there are other shard copies in this group then we must have just done
* a rolling upgrade from a version before {@link Version#V_7_4_0}, in which case the missing leases should be created asynchronously
* by the caller using {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)}.
*/
private void addPeerRecoveryRetentionLeaseForSolePrimary() {
assert primaryMode;
assert Thread.holdsLock(this);

if (indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN) {
final ShardRouting primaryShard = routingTable.primaryShard();
final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard);
if (retentionLeases.get(leaseId) == null) {
/*
* We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
* leases for every shard copy, but in this case we do not expect any leases to exist.
*/
if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0)) {
// We are starting up the whole replication group from scratch: if we were not (i.e. this is a replica promotion) then
// this copy must already be in-sync and active and therefore holds a retention lease for itself.
assert routingTable.activeShards().equals(Collections.singletonList(primaryShard)) : routingTable.activeShards();
if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) {
assert primaryShard.allocationId().getId().equals(shardAllocationId)
: routingTable.activeShards() + " vs " + shardAllocationId;
assert replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard));

: routingTable.assignedShards() + " vs " + shardAllocationId;
// Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
// group.
logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId);
innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
hasAllPeerRecoveryRetentionLeases = true;
} else {
/*
* We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
* leases for every shard copy, but in this case we do not expect any leases to exist.
*/
assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases;
logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases);
}
} else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting ->
retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
|| checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) {
// Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we
// don't need to do any more work.
hasAllPeerRecoveryRetentionLeases = true;
}
}

assert invariant();
}

/**
Expand Down Expand Up @@ -1240,9 +1271,54 @@ public synchronized void activateWithPrimaryContext(PrimaryContext primaryContex
// note that if there was no cluster state update between start of the engine of this shard and the call to
// initializeWithPrimaryContext, we might still have missed a cluster state update. This is best effort.
runAfter.run();

if (indexSettings.isSoftDeleteEnabled()) {
addPeerRecoveryRetentionLeaseForSolePrimary();
}

assert invariant();
}

private synchronized void setHasAllPeerRecoveryRetentionLeases() {
hasAllPeerRecoveryRetentionLeases = true;
assert invariant();
}

/**
* Create any required peer-recovery retention leases that do not currently exist because we just did a rolling upgrade from a version
* prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases.
*/
public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener<Void> listener) {
if (hasAllPeerRecoveryRetentionLeases == false) {
final List<ShardRouting> shardRoutings = routingTable.assignedShards();
final GroupedActionListener<ReplicationResponse> groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> {
setHasAllPeerRecoveryRetentionLeases();
listener.onResponse(null);
}, listener::onFailure), shardRoutings.size());
for (ShardRouting shardRouting : shardRoutings) {
if (retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))) {
groupedActionListener.onResponse(null);
} else {
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
if (checkpointState.tracked == false) {
groupedActionListener.onResponse(null);
} else {
logger.trace("createMissingPeerRecoveryRetentionLeases: adding missing lease for {}", shardRouting);
try {
addPeerRecoveryRetentionLease(shardRouting.currentNodeId(),
Math.max(SequenceNumbers.NO_OPS_PERFORMED, checkpointState.globalCheckpoint), groupedActionListener);
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
groupedActionListener.onFailure(e);
}
}
}
}
} else {
logger.trace("createMissingPeerRecoveryRetentionLeases: nothing to do");
listener.onResponse(null);
}
}

private Runnable getMasterUpdateOperationFromCurrentState() {
assert primaryMode == false;
final long lastAppliedClusterStateVersion = appliedClusterStateVersion;
Expand Down