Skip to content

Commit

Permalink
Adjust BWC for peer recovery retention leases (#50351)
Browse files Browse the repository at this point in the history
Relates #50351
  • Loading branch information
dnhatn committed Dec 24, 2019
1 parent 99b3562 commit 5e0030e
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,7 @@ public void testOperationBasedRecovery() throws Exception {
}
}
flush(index, true);
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, false);
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
// less than 10% of the committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING).
int uncommittedDocs = randomIntBetween(0, (int) (committedDocs * 0.1));
for (int i = 0; i < uncommittedDocs; i++) {
Expand All @@ -1288,7 +1288,7 @@ public void testOperationBasedRecovery() throws Exception {
} else {
ensureGreen(index);
assertNoFileBasedRecovery(index, n -> true);
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, true);
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
}
}

Expand All @@ -1313,7 +1313,7 @@ public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
ensureGreen(index);
flush(index, true);
assertEmptyTranslog(index);
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, true);
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -511,28 +511,6 @@ private static Version indexVersionCreated(final String indexName) throws IOExce
return Version.fromId(Integer.parseInt(ObjectPath.createFromResponse(response).evaluate(versionCreatedSetting)));
}

/**
* Returns the minimum node version among all nodes of the cluster
*/
private static Version minimumNodeVersion() throws IOException {
final Request request = new Request("GET", "_nodes");
request.addParameter("filter_path", "nodes.*.version");

final Response response = client().performRequest(request);
final Map<String, Object> nodes = ObjectPath.createFromResponse(response).evaluate("nodes");

Version minVersion = null;
for (Map.Entry<String, Object> node : nodes.entrySet()) {
@SuppressWarnings("unchecked")
Version nodeVersion = Version.fromString((String) ((Map<String, Object>) node.getValue()).get("version"));
if (minVersion == null || minVersion.after(nodeVersion)) {
minVersion = nodeVersion;
}
}
assertNotNull(minVersion);
return minVersion;
}

/**
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
* that the index has started shards.
Expand Down Expand Up @@ -695,7 +673,7 @@ public void testOperationBasedRecovery() throws Exception {
ensureGreen(index);
indexDocs(index, 0, randomIntBetween(100, 200));
flush(index, randomBoolean());
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, false);
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
// uncommitted docs must be less than 10% of committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING).
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3));
} else {
Expand All @@ -705,9 +683,7 @@ public void testOperationBasedRecovery() throws Exception {
|| nodeName.startsWith(CLUSTER_NAME + "-0")
|| (nodeName.startsWith(CLUSTER_NAME + "-1") && Booleans.parseBoolean(System.getProperty("tests.first_round")) == false));
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3));
if (CLUSTER_TYPE == ClusterType.UPGRADED) {
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index, true);
}
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,11 +895,10 @@ public ReplicationTracker(
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0)
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0)
|| (indexSettings.isSoftDeleteEnabled() &&
(indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) ||
(indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) &&
indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN)));
indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) &&
indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN);

this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
this.safeCommitInfoSupplier = safeCommitInfoSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
Expand Down Expand Up @@ -1132,18 +1133,20 @@ public void assertEmptyTranslog(String index) throws Exception {
* Peer recovery retention leases are renewed and synced to replicas periodically (every 30 seconds). This ensures
* that we have renewed every PRRL to the global checkpoint of the corresponding copy and properly synced to all copies.
*/
public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index, boolean alwaysExists) throws Exception {
public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index) throws Exception {
boolean mustHavePRRLs = minimumNodeVersion().onOrAfter(Version.V_7_6_0);
assertBusy(() -> {
Map<String, Object> stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards")));
@SuppressWarnings("unchecked") Map<String, List<Map<String, ?>>> shards =
(Map<String, List<Map<String, ?>>>) XContentMapValues.extractValue("indices." + index + ".shards", stats);
for (List<Map<String, ?>> shard : shards.values()) {
for (Map<String, ?> copy : shard) {
Integer globalCheckpoint = (Integer) XContentMapValues.extractValue("seq_no.global_checkpoint", copy);
assertThat(XContentMapValues.extractValue("seq_no.max_seq_no", copy), equalTo(globalCheckpoint));
assertNotNull(globalCheckpoint);
@SuppressWarnings("unchecked") List<Map<String, ?>> retentionLeases =
(List<Map<String, ?>>) XContentMapValues.extractValue("retention_leases.leases", copy);
if (alwaysExists == false && retentionLeases == null) {
if (mustHavePRRLs == false && retentionLeases == null) {
continue;
}
assertNotNull(retentionLeases);
Expand All @@ -1152,7 +1155,7 @@ public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index, bool
assertThat(retentionLease.get("retaining_seq_no"), equalTo(globalCheckpoint + 1));
}
}
if (alwaysExists) {
if (mustHavePRRLs) {
List<String> existingLeaseIds = retentionLeases.stream().map(lease -> (String) lease.get("id"))
.collect(Collectors.toList());
List<String> expectedLeaseIds = shard.stream()
Expand All @@ -1165,4 +1168,26 @@ public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index, bool
}
}, 60, TimeUnit.SECONDS);
}

/**
* Returns the minimum node version among all nodes of the cluster
*/
protected static Version minimumNodeVersion() throws IOException {
final Request request = new Request("GET", "_nodes");
request.addParameter("filter_path", "nodes.*.version");

final Response response = client().performRequest(request);
final Map<String, Object> nodes = ObjectPath.createFromResponse(response).evaluate("nodes");

Version minVersion = null;
for (Map.Entry<String, Object> node : nodes.entrySet()) {
@SuppressWarnings("unchecked")
Version nodeVersion = Version.fromString((String) ((Map<String, Object>) node.getValue()).get("version"));
if (minVersion == null || minVersion.after(nodeVersion)) {
minVersion = nodeVersion;
}
}
assertNotNull(minVersion);
return minVersion;
}
}

0 comments on commit 5e0030e

Please sign in to comment.