Skip to content

Commit

Permalink
Update YAML Rest tests to check for product header on all responses (#…
Browse files Browse the repository at this point in the history
…83290) (#83994)

This PR adds assertions to YAML Rest tests to ensure that product headers are always
returned in rest responses. Additional work has been included to fix a number of misuses
of ThreadContext, mostly because of stashing listeners without their accompanying contexts.

BWC Rest tests have been disabled for a few cases while the fixes are backported.
# Conflicts:
#	x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherUsageTransportAction.java
  • Loading branch information
jbaiera committed Feb 16, 2022
1 parent 58e8490 commit d76ad17
Show file tree
Hide file tree
Showing 21 changed files with 162 additions and 73 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/83290.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 83290
summary: Update YAML Rest tests to check for product header on all responses
area: Infra/REST API
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ protected void finishHim(Exception failure) {
*/
protected void finishHim(Exception failure, List<Failure> indexingFailures, List<SearchFailure> searchFailures, boolean timedOut) {
logger.debug("[{}]: finishing without any catastrophic failures", task.getId());
scrollSource.close(() -> {
scrollSource.close(threadPool.getThreadContext().preserveContext(() -> {
if (failure == null) {
BulkByScrollResponse response = buildResponse(
timeValueNanos(System.nanoTime() - startTime.get()),
Expand All @@ -605,7 +605,7 @@ protected void finishHim(Exception failure, List<Failure> indexingFailures, List
} else {
listener.onFailure(failure);
}
});
}));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
$/
---
"Test cat snapshots output":
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
repository: test_cat_snapshots_1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
Expand Down Expand Up @@ -61,6 +64,7 @@ setup:

---
"Get snapshot info when verbose is false":

- do:
indices.create:
index: test_index
Expand Down Expand Up @@ -198,7 +202,6 @@ setup:
- skip:
version: " - 7.12.99"
reason: "Introduced in 7.13.0"

- do:
indices.create:
index: test_index
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"
---
"Get repository returns UUID":
- skip:
version: " - 7.12.99"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
---
setup:
- skip:
version: " - 8.1.99"
reason: "Pause BWC tests until #83290 is backported"

- do:
snapshot.create_repository:
repository: test_repo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;

import java.util.function.Supplier;

import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;

public class RestoreClusterStateListener implements ClusterStateListener {
Expand All @@ -29,43 +32,48 @@ public class RestoreClusterStateListener implements ClusterStateListener {
private final ClusterService clusterService;
private final String uuid;
private final ActionListener<RestoreSnapshotResponse> listener;
private final Supplier<ThreadContext.StoredContext> contextSupplier;

private RestoreClusterStateListener(
ClusterService clusterService,
RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreSnapshotResponse> listener
ActionListener<RestoreSnapshotResponse> listener,
Supplier<ThreadContext.StoredContext> contextSupplier
) {
this.clusterService = clusterService;
this.uuid = response.getUuid();
this.listener = listener;
this.contextSupplier = contextSupplier;
}

@Override
public void clusterChanged(ClusterChangedEvent changedEvent) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a master failure after a restore has been started, this listener might not be registered
// on the current master and as such it might miss some intermediary cluster states due to batching.
// Clean up listener in that case and acknowledge completion of restore operation to client.
clusterService.removeListener(this);
listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null));
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
RestoreInfo ri = new RestoreInfo(
prevEntry.snapshot().getSnapshotId().getName(),
prevEntry.indices(),
shards.size(),
shards.size() - RestoreService.failedShards(shards)
);
RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId());
listener.onResponse(response);
} else {
// restore not completed yet, wait for next cluster state update
try (ThreadContext.StoredContext stored = contextSupplier.get()) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a master failure after a restore has been started, this listener might not be registered
// on the current master and as such it might miss some intermediary cluster states due to batching.
// Clean up listener in that case and acknowledge completion of restore operation to client.
clusterService.removeListener(this);
listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null));
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
RestoreInfo ri = new RestoreInfo(
prevEntry.snapshot().getSnapshotId().getName(),
prevEntry.indices(),
shards.size(),
shards.size() - RestoreService.failedShards(shards)
);
RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId());
listener.onResponse(response);
} else {
// restore not completed yet, wait for next cluster state update
}
}
}

Expand All @@ -76,8 +84,11 @@ public void clusterChanged(ClusterChangedEvent changedEvent) {
public static void createAndRegisterListener(
ClusterService clusterService,
RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreSnapshotResponse> listener
ActionListener<RestoreSnapshotResponse> listener,
ThreadContext threadContext
) {
clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener));
clusterService.addListener(
new RestoreClusterStateListener(clusterService, response, listener, threadContext.newRestorableContext(true))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ protected void masterOperation(
) {
restoreService.restoreSnapshot(request, listener.delegateFailure((delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, delegatedListener);
RestoreClusterStateListener.createAndRegisterListener(
clusterService,
restoreCompletionResponse,
delegatedListener,
threadPool.getThreadContext()
);
} else {
delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void removeApplier(ClusterStateApplier applier) {
}

/**
* Add a listener for updated cluster states
* Add a listener for updated cluster states. Listeners are executed in the system thread context.
*/
public void addListener(ClusterStateListener listener) {
clusterStateListeners.add(listener);
Expand All @@ -222,7 +222,7 @@ public void addListener(ClusterStateListener listener) {
/**
* Removes a listener for updated cluster states.
*/
public void removeListener(ClusterStateListener listener) {
public void removeListener(final ClusterStateListener listener) {
clusterStateListeners.remove(listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand Down Expand Up @@ -2949,7 +2950,8 @@ static Map<String, DataStreamAlias> filterDataStreamAliases(
* @param listener listener
*/
private void addListener(Snapshot snapshot, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>()).add(listener);
snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>())
.add(ContextPreservingActionListener.wrapPreservingContext(listener, threadPool.getThreadContext()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,20 @@ public String getReasonPhrase() {
* Get a list of all of the values of all warning headers returned in the response.
*/
public List<String> getWarningHeaders() {
List<String> warningHeaders = new ArrayList<>();
return getHeaders("Warning");
}

/**
* Get a list of all the values of a given header returned in the response.
*/
public List<String> getHeaders(String name) {
List<String> headers = new ArrayList<>();
for (Header header : response.getHeaders()) {
if (header.getName().equals("Warning")) {
warningHeaders.add(header.getValue());
if (header.getName().equalsIgnoreCase(name)) {
headers.add(header.getValue());
}
}
return warningHeaders;
return headers;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ public void execute(ClientYamlTestExecutionContext executionContext) throws IOEx
final String testPath = executionContext.getClientYamlTestCandidate() != null
? executionContext.getClientYamlTestCandidate().getTestPath()
: null;
checkElasticProductHeader(response.getHeaders("X-elastic-product"));
checkWarningHeaders(response.getWarningHeaders(), testPath);
} catch (ClientYamlTestResponseException e) {
ClientYamlTestResponse restTestResponse = e.getRestTestResponse();
Expand All @@ -392,6 +393,31 @@ public void execute(ClientYamlTestExecutionContext executionContext) throws IOEx
}
}

void checkElasticProductHeader(final List<String> productHeaders) {
if (productHeaders.isEmpty()) {
fail("Response is missing required X-Elastic-Product response header");
}
boolean headerPresent = false;
final List<String> unexpected = new ArrayList<>();
for (String header : productHeaders) {
if (header.equals("Elasticsearch")) {
headerPresent = true;
break;
} else {
unexpected.add(header);
}
}
if (headerPresent == false) {
StringBuilder failureMessage = new StringBuilder();
appendBadHeaders(
failureMessage,
unexpected,
"did not get expected product header [Elasticsearch], found header" + (unexpected.size() > 1 ? "s" : "")
);
fail(failureMessage.toString());
}
}

void checkWarningHeaders(final List<String> warningHeaders) {
checkWarningHeaders(warningHeaders, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ public void testNodeSelectorByVersion() throws IOException {
doSection.getApiCallSection().getNodeSelector()
)
).thenReturn(mockResponse);
when(mockResponse.getHeaders("X-elastic-product")).thenReturn(List.of("Elasticsearch"));
doSection.execute(context);
verify(context).callApi(
"indices.get_field_mapping",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ public void onFailure(Exception e) {
assert restoreInfo.failedShards() > 0 : "Should have failed shards";
delegatedListener.onResponse(new PutFollowAction.Response(true, false, false));
}
})
}),
threadPool.getThreadContext()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
Expand Down Expand Up @@ -178,10 +179,16 @@ private void removeRetentionLeaseForShard(
) {
logger.trace("{} removing retention lease [{}] while unfollowing leader index", followerShardId, retentionLeaseId);
final ThreadContext threadContext = threadPool.getThreadContext();
// We're about to stash the thread context for this retention lease removal. The listener will be completed while the
// context is stashed. The context needs to be restored in the listener when it is completing or else it is simply wiped.
final ActionListener<ActionResponse.Empty> preservedListener = new ContextPreservingActionListener<>(
threadContext.newRestorableContext(true),
listener
);
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
// we have to execute under the system context so that if security is enabled the removal is authorized
threadContext.markAsSystemContext();
CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, listener);
CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, preservedListener);
}
}

Expand Down

0 comments on commit d76ad17

Please sign in to comment.