Skip to content

Commit

Permalink
Propagate original indices in NodeTermsEnumRequest (#77776) (#78743)
Browse files Browse the repository at this point in the history
This fix ensures that we provide the original list of indices
as part of the node-level terms enum request.

Closes #77508
  • Loading branch information
jimczi committed Oct 6, 2021
1 parent 5363372 commit 2fddda8
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
*/
package org.elasticsearch.xpack.core.termsenum.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -26,22 +28,26 @@
*/
public class NodeTermsEnumRequest extends TransportRequest implements IndicesRequest {

private String field;
private String string;
private String searchAfter;
private long taskStartedTimeMillis;
private long nodeStartedTimeMillis;
private boolean caseInsensitive;
private int size;
private long timeout;
private final String field;
private final String string;
private final String searchAfter;
private final long taskStartedTimeMillis;
private final boolean caseInsensitive;
private final int size;
private final long timeout;
private final QueryBuilder indexFilter;
private Set<ShardId> shardIds;
private String nodeId;
private final Set<ShardId> shardIds;
private final String nodeId;
private final OriginalIndices originalIndices;

private long nodeStartedTimeMillis;

public NodeTermsEnumRequest(final String nodeId,
public NodeTermsEnumRequest(OriginalIndices originalIndices,
final String nodeId,
final Set<ShardId> shardIds,
TermsEnumRequest request,
long taskStartTimeMillis) {
this.originalIndices = originalIndices;
this.field = request.field();
this.string = request.string();
this.searchAfter = request.searchAfter();
Expand Down Expand Up @@ -70,6 +76,15 @@ public NodeTermsEnumRequest(StreamInput in) throws IOException {
for (int i = 0; i < numShards; i++) {
shardIds.add(new ShardId(in));
}
if (in.getVersion().onOrAfter(Version.V_7_15_1)) {
originalIndices = OriginalIndices.readOriginalIndices(in);
} else {
String[] indicesNames = shardIds.stream()
.map(ShardId::getIndexName)
.distinct()
.toArray(String[]::new);
this.originalIndices = new OriginalIndices(indicesNames, null);
}
}

@Override
Expand All @@ -92,6 +107,9 @@ public void writeTo(StreamOutput out) throws IOException {
for (ShardId shardId : shardIds) {
shardId.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_7_15_1)) {
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
}

public String field() {
Expand Down Expand Up @@ -152,16 +170,12 @@ public QueryBuilder indexFilter() {

@Override
public String[] indices() {
HashSet<String> indicesNames = new HashSet<>();
for (ShardId shardId : shardIds) {
indicesNames.add(shardId.getIndexName());
}
return indicesNames.toArray(new String[0]);
return originalIndices.indices();
}

@Override
public IndicesOptions indicesOptions() {
return null;
return originalIndices.indicesOptions();
}

public boolean remove(ShardId shardId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ protected void doExecute(Task task, TermsEnumRequest request, ActionListener<Ter
new AsyncBroadcastAction(task, request, listener).start();
}

protected NodeTermsEnumRequest newNodeRequest(final String nodeId,
protected NodeTermsEnumRequest newNodeRequest(final OriginalIndices originalIndices,
final String nodeId,
final Set<ShardId> shardIds,
TermsEnumRequest request,
long taskStartMillis) {
Expand All @@ -145,14 +146,14 @@ protected NodeTermsEnumRequest newNodeRequest(final String nodeId,
// final ClusterState clusterState = clusterService.state();
// final Set<String> indicesAndAliases = indexNameExpressionResolver.resolveExpressions(clusterState, request.indices());
// final AliasFilter aliasFilter = searchService.buildAliasFilter(clusterState, shard.getIndexName(), indicesAndAliases);
return new NodeTermsEnumRequest(nodeId, shardIds, request, taskStartMillis);
return new NodeTermsEnumRequest(originalIndices, nodeId, shardIds, request, taskStartMillis);
}

protected NodeTermsEnumResponse readShardResponse(StreamInput in) throws IOException {
return new NodeTermsEnumResponse(in);
}

protected Map<String, Set<ShardId>> getNodeBundles(ClusterState clusterState, TermsEnumRequest request, String[] concreteIndices) {
protected Map<String, Set<ShardId>> getNodeBundles(ClusterState clusterState, String[] concreteIndices) {
// Group targeted shards by nodeId
Map<String, Set<ShardId>> fastNodeBundles = new HashMap<>();
for (String indexName : concreteIndices) {
Expand All @@ -162,9 +163,7 @@ protected Map<String, Set<ShardId>> getNodeBundles(ClusterState clusterState, Te
GroupShardsIterator<ShardIterator> shards = clusterService.operationRouting()
.searchShards(clusterState, singleIndex, null, null);

Iterator<ShardIterator> shardsForIndex = shards.iterator();
while (shardsForIndex.hasNext()) {
ShardIterator copiesOfShard = shardsForIndex.next();
for (ShardIterator copiesOfShard : shards) {
ShardRouting selectedCopyOfShard = null;
for (ShardRouting copy : copiesOfShard) {
// Pick the first active node with a copy of the shard
Expand All @@ -177,7 +176,7 @@ protected Map<String, Set<ShardId>> getNodeBundles(ClusterState clusterState, Te
break;
}
String nodeId = selectedCopyOfShard.currentNodeId();
Set<ShardId> bundle = null;
final Set<ShardId> bundle;
if (fastNodeBundles.containsKey(nodeId)) {
bundle = fastNodeBundles.get(nodeId);
} else {
Expand Down Expand Up @@ -388,7 +387,7 @@ protected NodeTermsEnumResponse dataNodeOperation(NodeTermsEnumRequest request,
if (termsList.size() >= shard_size) {
break;
}
};
}

} catch (Exception e) {
error = ExceptionsHelper.stackTrace(e);
Expand All @@ -415,7 +414,7 @@ private boolean canAccess(

if (indexAccessControl != null) {
final boolean dls = indexAccessControl.getDocumentPermissions().hasDocumentLevelPermissions();
if ( dls && licenseChecker.get()) {
if (dls && licenseChecker.get()) {
// Check to see if any of the roles defined for the current user rewrite to match_all

SecurityContext securityContext = new SecurityContext(clusterService.getSettings(), threadContext);
Expand Down Expand Up @@ -467,20 +466,20 @@ protected class AsyncBroadcastAction {
private final Task task;
private final TermsEnumRequest request;
private ActionListener<TermsEnumResponse> listener;
private final ClusterState clusterState;
private final DiscoveryNodes nodes;
private final int expectedOps;
private final AtomicInteger counterOps = new AtomicInteger();
private final AtomicReferenceArray<Object> atomicResponses;
private final Map<String, Set<ShardId>> nodeBundles;
private final OriginalIndices localIndices;
private final Map<String, OriginalIndices> remoteClusterIndices;

protected AsyncBroadcastAction(Task task, TermsEnumRequest request, ActionListener<TermsEnumResponse> listener) {
this.task = task;
this.request = request;
this.listener = listener;

clusterState = clusterService.state();
ClusterState clusterState = clusterService.state();

ClusterBlockException blockException = checkGlobalBlock(clusterState, request);
if (blockException != null) {
Expand All @@ -489,7 +488,7 @@ protected AsyncBroadcastAction(Task task, TermsEnumRequest request, ActionListen

this.remoteClusterIndices = remoteClusterService.groupIndices(request.indicesOptions(), request.indices(),
idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState));
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
this.localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);

// update to concrete indices
String[] concreteIndices = localIndices == null ? new String[0] :
Expand All @@ -501,7 +500,7 @@ protected AsyncBroadcastAction(Task task, TermsEnumRequest request, ActionListen

nodes = clusterState.nodes();
logger.trace("resolving shards based on cluster state version [{}]", clusterState.version());
nodeBundles = getNodeBundles(clusterState, request, concreteIndices);
nodeBundles = getNodeBundles(clusterState, concreteIndices);
expectedOps = nodeBundles.size() + remoteClusterIndices.size();

atomicResponses = new AtomicReferenceArray<>(expectedOps);
Expand Down Expand Up @@ -556,7 +555,7 @@ protected void performOperation(final String nodeId, final Set<ShardId> shardIds
onNodeFailure(nodeId, opsIndex, null);
} else {
try {
final NodeTermsEnumRequest nodeRequest = newNodeRequest(nodeId, shardIds, request, task.getStartTime());
final NodeTermsEnumRequest nodeRequest = newNodeRequest(localIndices, nodeId, shardIds, request, task.getStartTime());
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
DiscoveryNode node = nodes.get(nodeId);
if (node == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ setup:
]
}
- do:
security.put_role:
name: "dls_alias_role"
body: >
{
"indices": [
{ "names": ["alias_security"], "privileges": ["read"], "query": "{\"term\": {\"ck\": \"const\"}}" }
]
}
- do:
security.put_role:
name: "dls_none_role"
Expand All @@ -57,6 +67,16 @@ setup:
"full_name" : "user with access to all docs in test_security index (using DLS)"
}
- do:
security.put_user:
username: "dls_alias_user"
body: >
{
"password" : "x-pack-test-password",
"roles" : [ "dls_alias_role" ],
"full_name" : "user with access to all docs in test_security index (using DLS)"
}
- do:
security.put_role:
name: "dls_some_role"
Expand Down Expand Up @@ -143,6 +163,8 @@ setup:
indices.create:
index: test_security
body:
aliases:
alias_security: {}
settings:
index:
number_of_shards: 1
Expand Down Expand Up @@ -198,6 +220,16 @@ teardown:
security.delete_role:
name: "dls_all_role"
ignore: 404

- do:
security.delete_user:
username: "dls_alias_user"
ignore: 404

- do:
security.delete_role:
name: "dls_alias_role"
ignore: 404
- do:
security.delete_role:
name: "dls_none_role"
Expand Down Expand Up @@ -285,6 +317,7 @@ teardown:
index: test_k
body: {"field": "foo"}
- length: {terms: 1}

---
"Test search after keyword field":
- do:
Expand Down Expand Up @@ -385,6 +418,7 @@ teardown:
terms_enum:
index: test_*
body: {"field": "foo", "string":"b", "timeout": "2m"}

---
"Test security":

Expand All @@ -402,6 +436,13 @@ teardown:
body: {"field": "foo", "string":"b"}
- length: {terms: 1}

- do:
headers: { Authorization: "Basic ZGxzX2FsaWFzX3VzZXI6eC1wYWNrLXRlc3QtcGFzc3dvcmQ=" } # dls_alias_user sees all docs through the alias
terms_enum:
index: alias_security
body: { "field": "foo", "string": "b" }
- length: { terms: 1 }

- do:
headers: { Authorization: "Basic ZGxzX3NvbWVfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" } # dls_some_user sees selected docs
terms_enum:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,20 @@ setup:
]
}
- do:
security.put_role:
name: "terms_enum_alias_role"
body: >
{
"cluster": ["all"],
"indices": [
{
"names": ["my_remote_cluster:terms_enum_alias"],
"privileges": ["read"]
}
]
}
- do:
security.put_user:
username: "joe_all"
Expand All @@ -30,6 +44,15 @@ setup:
"roles" : [ "terms_enum_all_role" ]
}
- do:
security.put_user:
username: "joe_alias"
body: >
{
"password": "s3krit-password",
"roles" : [ "terms_enum_alias_role" ]
}
- do:
security.put_role:
name: "terms_enum_none_role"
Expand Down Expand Up @@ -82,6 +105,10 @@ teardown:
security.delete_user:
username: "joe_all"
ignore: 404
- do:
security.delete_user:
username: "joe_alias"
ignore: 404
- do:
security.delete_user:
username: "joe_none"
Expand All @@ -94,6 +121,10 @@ teardown:
security.delete_role:
name: "terms_enum_all_role"
ignore: 404
- do:
security.delete_role:
name: "terms_enum_alias_role"
ignore: 404
- do:
security.delete_role:
name: "terms_enum_none_role"
Expand Down Expand Up @@ -123,6 +154,15 @@ teardown:
- match: { terms.0: "zar" }
- match: { complete: true }

- do:
headers: { Authorization: "Basic am9lX2FsaWFzOnMza3JpdC1wYXNzd29yZA==" } # joe_alias can see all docs through alias
terms_enum:
index: my_remote_cluster:terms_enum_alias
body: { "field": "foo", "search_after": "foobar" }
- length: { terms: 1 }
- match: { terms.0: "zar" }
- match: { complete: true }

- do:
headers: { Authorization: "Basic am9lX25vbmU6czNrcml0LXBhc3N3b3Jk" } # joe_none can't see docs
terms_enum:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ setup:
]
}
- do:
security.put_role:
name: "terms_enum_alias_role"
body: >
{
"cluster": ["monitor"],
"indices": [
{ "names": ["terms_enum_alias"], "privileges": ["read"], "query": "{\"term\": {\"ck\": \"const\"}}" }
]
}
- do:
security.put_role:
name: "terms_enum_none_role"
Expand Down Expand Up @@ -373,6 +384,8 @@ setup:
indices.create:
index: terms_enum_index
body:
aliases:
terms_enum_alias: {}
settings:
index:
number_of_shards: 1
Expand Down

0 comments on commit 2fddda8

Please sign in to comment.