Skip to content

Commit

Permalink
Retry synced-flush on conflict in tests (#66968)
Browse files Browse the repository at this point in the history
Closes #66631
  • Loading branch information
dnhatn committed Jan 11, 2021
1 parent 150a734 commit 6ba5ae2
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -731,13 +731,7 @@ public void testRecovery() throws Exception {
assertOK(client().performRequest(flushRequest));

if (randomBoolean()) {
// We had a bug before where we failed to perform peer recovery with sync_id from 5.x to 6.x.
// We added this synced flush so we can exercise different paths of recovery code.
try {
performSyncedFlush(index);
} catch (ResponseException ignored) {
// synced flush is optional here
}
performSyncedFlush(index, randomBoolean());
}
if (shouldHaveTranslog) {
// Update a few documents so we are sure to have a translog
Expand Down Expand Up @@ -1451,7 +1445,7 @@ public void testRecoveryWithTranslogRetentionDisabled() throws Exception {
if (randomBoolean()) {
flush(index, randomBoolean());
} else if (randomBoolean()) {
performSyncedFlush(index);
performSyncedFlush(index, randomBoolean());
}
saveInfoDocument("doc_count", Integer.toString(numDocs));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ public void testRelocationWithConcurrentIndexing() throws Exception {
throw new IllegalStateException("unknown type " + CLUSTER_TYPE);
}
if (randomBoolean()) {
syncedFlush(index);
performSyncedFlush(index, randomBoolean());
ensureGlobalCheckpointSynced(index);
}
}

Expand Down Expand Up @@ -587,22 +588,6 @@ private void assertClosedIndex(final String index, final boolean checkRoutingTab
}
}

private void syncedFlush(String index) throws Exception {
// We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation.
// A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit.
assertBusy(() -> {
try {
Response resp = performSyncedFlush(index);
Map<String, Object> result = ObjectPath.createFromResponse(resp).evaluate("_shards");
assertThat(result.get("failed"), equalTo(0));
} catch (ResponseException ex) {
throw new AssertionError(ex); // cause assert busy to retry
}
});
// ensure the global checkpoint is synced; otherwise we might trim the commit with syncId
ensureGlobalCheckpointSynced(index);
}

@SuppressWarnings("unchecked")
private void assertPeerRecoveredFiles(String reason, String index, String targetNode, Matcher<Integer> sizeMatcher) throws IOException {
Map<?, ?> recoveryStats = entityAsMap(client().performRequest(new Request("GET", index + "/_recovery")));
Expand Down Expand Up @@ -668,7 +653,8 @@ public void testUpdateDoc() throws Exception {
assertThat(XContentMapValues.extractValue("_source.updated_field", doc), equalTo(updates.get(docId)));
}
if (randomBoolean()) {
syncedFlush(index);
performSyncedFlush(index, randomBoolean());
ensureGlobalCheckpointSynced(index);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void testRecoverReplica() throws Exception {
if (randomBoolean()) {
flush(index, randomBoolean());
} else if (randomBoolean()) {
performSyncedFlush(index);
performSyncedFlush(index, randomBoolean());
}
}
ensureGreen(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1464,7 +1464,7 @@ protected static Version minimumNodeVersion() throws IOException {
return minVersion;
}

protected static Response performSyncedFlush(String indexName) throws IOException {
protected static void performSyncedFlush(String indexName, boolean retryOnConflict) throws Exception {
final Request request = new Request("POST", indexName + "/_flush/synced");
final List<String> expectedWarnings = Collections.singletonList(SyncedFlushService.SYNCED_FLUSH_DEPRECATION_MESSAGE);
if (nodeVersions.stream().allMatch(version -> version.onOrAfter(Version.V_7_6_0))) {
Expand All @@ -1476,7 +1476,22 @@ protected static Response performSyncedFlush(String indexName) throws IOExceptio
options.setWarningsHandler(warnings -> warnings.isEmpty() == false && warnings.equals(expectedWarnings) == false);
request.setOptions(options);
}
return client().performRequest(request);
// We have to spin a synced-flush request because we fire the global checkpoint sync for the last write operation.
// A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit.
assertBusy(() -> {
try {
Response resp = client().performRequest(request);
if (retryOnConflict) {
Map<String, Object> result = ObjectPath.createFromResponse(resp).evaluate("_shards");
assertThat(result.get("failed"), equalTo(0));
}
} catch (ResponseException ex) {
assertThat(ex.getResponse().getStatusLine(), equalTo(HttpStatus.SC_CONFLICT));
if (retryOnConflict) {
throw new AssertionError(ex); // cause assert busy to retry
}
}
});
}

/**
Expand Down

0 comments on commit 6ba5ae2

Please sign in to comment.