Skip to content

Commit

Permalink
Harden assertions when checking target indices (elastic#105001)
Browse files Browse the repository at this point in the history
We should have checked and failed if there is an inconsistent pair of 
data node plan and target indices. This PR strengthens these checks and
adds assertions to fail hard in tests.

Relates elastic#10480
  • Loading branch information
dnhatn authored and jedrazb committed Feb 2, 2024
1 parent 40608aa commit 455bd88
Showing 1 changed file with 16 additions and 2 deletions.
Expand Up @@ -149,13 +149,20 @@ public void execute(
PhysicalPlan coordinatorPlan = new OutputExec(coordinatorAndDataNodePlan.v1(), collectedPages::add);
PhysicalPlan dataNodePlan = coordinatorAndDataNodePlan.v2();
if (dataNodePlan != null && dataNodePlan instanceof ExchangeSinkExec == false) {
listener.onFailure(new IllegalStateException("expect data node plan starts with an ExchangeSink; got " + dataNodePlan));
assert false : "expected data node plan starts with an ExchangeSink; got " + dataNodePlan;
listener.onFailure(new IllegalStateException("expected data node plan starts with an ExchangeSink; got " + dataNodePlan));
return;
}
Map<String, OriginalIndices> clusterToConcreteIndices = transportService.getRemoteClusterService()
.groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planConcreteIndices(physicalPlan).toArray(String[]::new));
QueryPragmas queryPragmas = configuration.pragmas();
if (dataNodePlan == null || clusterToConcreteIndices.values().stream().allMatch(v -> v.indices().length == 0)) {
if (dataNodePlan == null) {
if (clusterToConcreteIndices.values().stream().allMatch(v -> v.indices().length == 0) == false) {
String error = "expected no concrete indices without data node plan; got " + clusterToConcreteIndices;
assert false : error;
listener.onFailure(new IllegalStateException(error));
return;
}
var computeContext = new ComputeContext(
sessionId,
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
Expand All @@ -171,6 +178,13 @@ public void execute(
listener.map(driverProfiles -> new Result(collectedPages, driverProfiles))
);
return;
} else {
if (clusterToConcreteIndices.values().stream().allMatch(v -> v.indices().length == 0)) {
var error = "expected concrete indices with data node plan but got empty; data node plan " + dataNodePlan;
assert false : error;
listener.onFailure(new IllegalStateException(error));
return;
}
}
Map<String, OriginalIndices> clusterToOriginalIndices = transportService.getRemoteClusterService()
.groupIndices(SearchRequest.DEFAULT_INDICES_OPTIONS, PlannerUtils.planOriginalIndices(physicalPlan));
Expand Down

0 comments on commit 455bd88

Please sign in to comment.