Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use index for peer recovery instead of translog #45136

Merged
merged 48 commits into from Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
2ec1483
Create peer-recovery retention leases (#43190)
DaveCTurner Jun 19, 2019
dfa22bc
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jun 21, 2019
f68fac4
Treat UNASSIGNED_SEQUENCE_NUMBER as NO_OPS_PERFORMED
DaveCTurner Jun 24, 2019
cb39840
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jun 26, 2019
f5fdb75
Add missing GCP update (#43632)
DaveCTurner Jun 26, 2019
00145cd
Merge branch 'master' into peer-recovery-retention-leases
dnhatn Jun 28, 2019
cb6b0a9
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jul 1, 2019
b328478
Remove file committed in error
DaveCTurner Jul 1, 2019
7f7f84b
Less sync
DaveCTurner Jul 1, 2019
6bac16a
Relax condition, we may have renewed some other leases too
DaveCTurner Jul 1, 2019
9941eb6
Better test fix
DaveCTurner Jul 1, 2019
f3fbb33
Checkstyle
DaveCTurner Jul 1, 2019
fbc4477
Advance PRRLs to match GCP of tracked shards (#43751)
DaveCTurner Jul 1, 2019
b1be151
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jul 3, 2019
291ff8d
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jul 4, 2019
389c625
Remove PRRLs before performing file-based recovery (#43928)
DaveCTurner Jul 4, 2019
ac2da33
Update BWC version for PRRLs (#43958)
DaveCTurner Jul 5, 2019
d016e79
Return recovery to generic thread post-PRRL action (#44000)
DaveCTurner Jul 5, 2019
76ff6e8
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jul 5, 2019
da3c901
Skip PRRL renewal on UNASSIGNED_SEQ_NO (#44019)
DaveCTurner Jul 5, 2019
c5ed201
Only call assertNotTransportThread if asserts on (#44028)
DaveCTurner Jul 8, 2019
9523445
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jul 8, 2019
bea2627
Create missing PRRLs after primary activation (#44009)
DaveCTurner Jul 8, 2019
11e9880
Reduce number of replicas in cluster restart test
DaveCTurner Jul 8, 2019
d7f7ebc
Only create missing PRRLs when appropriate
DaveCTurner Jul 8, 2019
ba7c4be
Fix comment
DaveCTurner Jul 9, 2019
e12bde6
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jul 11, 2019
b8bcc0b
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jul 15, 2019
40ea029
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jul 20, 2019
69c94f4
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jul 23, 2019
d15684d
Use global checkpoint as starting seq in ops-based recovery (#43463)
dnhatn Jul 23, 2019
06d9be6
Adjust BWC for recovery translog stats
dnhatn Jul 23, 2019
6275cd7
Do not load global checkpoint to ReplicationTracker in local recovery…
dnhatn Jul 24, 2019
96dd543
AwaitsFix testCloseWhileRelocatingShards
DaveCTurner Jul 25, 2019
417a2ac
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jul 29, 2019
b5c897c
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jul 29, 2019
446ebf0
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jul 29, 2019
7a247e5
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jul 30, 2019
907bb55
Skip local recovery for closed or frozen indices (#44887)
dnhatn Jul 30, 2019
0b066d3
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jul 31, 2019
2bae406
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Jul 31, 2019
6960cf7
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Aug 1, 2019
5322b00
Recover peers using history from Lucene (#44853)
DaveCTurner Aug 1, 2019
77720e8
Reset starting seqno if fail to read last commit (#45106)
dnhatn Aug 1, 2019
51778da
Merge branch 'master' into peer-recovery-retention-leases
DaveCTurner Aug 2, 2019
aea938b
Revert change to BWC testing target
DaveCTurner Aug 2, 2019
09ae1e6
Disable BWC tests until backported
DaveCTurner Aug 2, 2019
89b6a3b
Remove stray file
DaveCTurner Aug 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Expand Up @@ -173,8 +173,8 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/

boolean bwc_tests_enabled = true
final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
boolean bwc_tests_enabled = false
final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/45137" /* place a PR link here when committing bwc changes */
if (bwc_tests_enabled == false) {
if (bwc_tests_disabled_issue.isEmpty()) {
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")
Expand Down
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.junit.Before;

Expand Down Expand Up @@ -260,7 +261,9 @@ public void testForgetFollower() throws IOException {
final Map<?, ?> shardStatsAsMap = (Map<?, ?>) shardStats.get(0);
final Map<?, ?> retentionLeasesStats = (Map<?, ?>) shardStatsAsMap.get("retention_leases");
final List<?> leases = (List<?>) retentionLeasesStats.get("leases");
assertThat(leases, empty());
for (final Object lease : leases) {
assertThat(((Map<?, ?>) lease).get("source"), equalTo(ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE));
}
}
}

Expand Down
Expand Up @@ -34,7 +34,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;

import org.elasticsearch.index.seqno.RetentionLeaseUtils;
import org.elasticsearch.test.NotEqualMessageBuilder;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;
Expand Down Expand Up @@ -80,7 +80,7 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
private String index;

@Before
public void setIndex() throws IOException {
public void setIndex() {
index = getTestName().toLowerCase(Locale.ROOT);
}

Expand Down Expand Up @@ -1230,4 +1230,26 @@ protected void ensureGreenLongWait(String index) throws IOException {
assertEquals("green", healthRsp.get("status"));
assertFalse((Boolean) healthRsp.get("timed_out"));
}

public void testPeerRecoveryRetentionLeases() throws IOException {
if (isRunningAgainstOldCluster()) {
XContentBuilder settings = jsonBuilder();
settings.startObject();
{
settings.startObject("settings");
settings.field("number_of_shards", between(1, 5));
settings.field("number_of_replicas", between(0, 1));
settings.endObject();
}
settings.endObject();

Request createIndex = new Request("PUT", "/" + index);
createIndex.setJsonEntity(Strings.toString(settings));
client().performRequest(createIndex);
ensureGreen(index);
} else {
ensureGreen(index);
RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index);
}
}
}
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.RetentionLeaseUtils;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.hamcrest.Matcher;
Expand Down Expand Up @@ -382,6 +383,80 @@ public void testRecoveryWithSoftDeletes() throws Exception {
ensureGreen(index);
}

public void testRetentionLeasesEstablishedWhenPromotingPrimary() throws Exception {
final String index = "recover_and_create_leases_in_promotion";
if (CLUSTER_TYPE == ClusterType.OLD) {
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5))
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), between(1, 2)) // triggers nontrivial promotion
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
createIndex(index, settings.build());
int numDocs = randomInt(10);
indexDocs(index, 0, numDocs);
if (randomBoolean()) {
client().performRequest(new Request("POST", "/" + index + "/_flush"));
}
}
ensureGreen(index);
if (CLUSTER_TYPE == ClusterType.UPGRADED) {
assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index));
}
}

public void testRetentionLeasesEstablishedWhenRelocatingPrimary() throws Exception {
final String index = "recover_and_create_leases_in_relocation";
switch (CLUSTER_TYPE) {
case OLD:
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5))
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), between(0, 1))
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
createIndex(index, settings.build());
int numDocs = randomInt(10);
indexDocs(index, 0, numDocs);
if (randomBoolean()) {
client().performRequest(new Request("POST", "/" + index + "/_flush"));
}
ensureGreen(index);
break;

case MIXED:
// trigger a primary relocation by excluding the last old node with a shard filter
final Map<?, ?> nodesMap
= ObjectPath.createFromResponse(client().performRequest(new Request("GET", "/_nodes"))).evaluate("nodes");
final List<String> oldNodeNames = new ArrayList<>();
for (Object nodeDetails : nodesMap.values()) {
final Map<?, ?> nodeDetailsMap = (Map<?, ?>) nodeDetails;
final String versionString = (String) nodeDetailsMap.get("version");
if (versionString.equals(Version.CURRENT.toString()) == false) {
oldNodeNames.add((String) nodeDetailsMap.get("name"));
}
}

if (oldNodeNames.size() == 1) {
final String oldNodeName = oldNodeNames.get(0);
logger.info("--> excluding index [{}] from node [{}]", index, oldNodeName);
final Request putSettingsRequest = new Request("PUT", "/" + index + "/_settings");
putSettingsRequest.setJsonEntity("{\"index.routing.allocation.exclude._name\":\"" + oldNodeName + "\"}");
assertOK(client().performRequest(putSettingsRequest));
ensureGreen(index);
assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index));
} else {
ensureGreen(index);
}
break;

case UPGRADED:
ensureGreen(index);
assertBusy(() -> RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index));
break;
}
}

/**
* This test creates an index in the non upgraded cluster and closes it. It then checks that the index
* is effectively closed and potentially replicated (if the version the index was created on supports
Expand Down
Expand Up @@ -747,7 +747,7 @@ public abstract int estimateNumberOfHistoryOperations(String source,
MapperService mapperService, long startingSeqNo) throws IOException;

/**
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its translog
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
*/
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;

Expand Down
Expand Up @@ -514,18 +514,30 @@ public void syncTranslog() throws IOException {
}

/**
* Creates a new history snapshot for reading operations since the provided seqno from the translog.
* Creates a new history snapshot for reading operations since the provided seqno.
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
@Override
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
}

return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
}

/**
* Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
*/
@Override
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) {
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
try (Translog.Snapshot snapshot = newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo),
Long.MAX_VALUE, false)) {
return snapshot.totalOperations();
}
}

return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo);
}

Expand Down Expand Up @@ -2571,6 +2583,10 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS

@Override
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return getMinRetainedSeqNo() <= startingSeqNo;
}

final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
// avoid scanning translog if not necessary
if (startingSeqNo > currentLocalCheckpoint) {
Expand Down Expand Up @@ -2600,15 +2616,7 @@ public final long getMinRetainedSeqNo() {
@Override
public Closeable acquireRetentionLock() {
if (softDeleteEnabled) {
final Releasable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock();
final Closeable translogRetentionLock;
try {
translogRetentionLock = translog.acquireRetentionLock();
} catch (Exception e) {
softDeletesRetentionLock.close();
throw e;
}
return () -> IOUtils.close(translogRetentionLock, softDeletesRetentionLock);
return softDeletesPolicy.acquireRetentionLock();
} else {
return translog.acquireRetentionLock();
}
Expand Down