Skip to content

Commit

Permalink
Add cross cluster support to _terms_enum (#73975)
Browse files Browse the repository at this point in the history
This commit adds the support to search cross cluster indices (with the cross cluster syntax)
 in the _terms_enum API.

Relates #71550
  • Loading branch information
jimczi committed Jun 10, 2021
1 parent 3e8706e commit 7ce2d42
Show file tree
Hide file tree
Showing 11 changed files with 635 additions and 159 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.termsenum;

import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.termsenum.action.TermsEnumAction;
import org.elasticsearch.xpack.core.termsenum.action.TermsEnumRequest;
import org.elasticsearch.xpack.core.termsenum.action.TermsEnumResponse;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class CCSTermsEnumIT extends AbstractMultiClustersTestCase {

@Override
protected Collection<String> remoteClusterAlias() {
return Arrays.asList("remote_cluster");
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
final List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
plugins.add(LocalStateCompositeXPackPlugin.class);
return plugins;
}

public void testBasic() {
Settings indexSettings = Settings.builder().put("index.number_of_replicas", 0).build();
final Client localClient = client(LOCAL_CLUSTER);
final Client remoteClient = client("remote_cluster");
String localIndex = "local_test";
assertAcked(localClient.admin().indices().prepareCreate(localIndex).setSettings(indexSettings));
localClient.prepareIndex(localIndex, "_doc").setSource("foo", "foo").get();
localClient.prepareIndex(localIndex, "_doc").setSource("foo", "foobar").get();
localClient.admin().indices().prepareRefresh(localIndex).get();

String remoteIndex = "remote_test";
assertAcked(remoteClient.admin().indices().prepareCreate(remoteIndex).setSettings(indexSettings));
remoteClient.prepareIndex(remoteIndex, "_doc").setSource("foo", "bar").get();
remoteClient.prepareIndex(remoteIndex, "_doc").setSource("foo", "foobar").get();
remoteClient.prepareIndex(remoteIndex, "_doc").setSource("foo", "zar").get();
remoteClient.admin().indices().prepareRefresh(remoteIndex).get();

// _terms_enum on a remote cluster
TermsEnumRequest req = new TermsEnumRequest("remote_cluster:remote_test")
.field("foo.keyword");
TermsEnumResponse response = client().execute(TermsEnumAction.INSTANCE, req).actionGet();
assertTrue(response.isComplete());
assertThat(response.getTerms().size(), equalTo(3));
assertThat(response.getTerms().get(0), equalTo("bar"));
assertThat(response.getTerms().get(1), equalTo("foobar"));
assertThat(response.getTerms().get(2), equalTo("zar"));

// _terms_enum on mixed clusters (local + remote)
req = new TermsEnumRequest("remote_cluster:remote_test", "local_test")
.field("foo.keyword");
response = client().execute(TermsEnumAction.INSTANCE, req).actionGet();
assertTrue(response.isComplete());
assertThat(response.getTerms().size(), equalTo(4));
assertThat(response.getTerms().get(0), equalTo("bar"));
assertThat(response.getTerms().get(1), equalTo("foo"));
assertThat(response.getTerms().get(2), equalTo("foobar"));
assertThat(response.getTerms().get(3), equalTo("zar"));

req = new TermsEnumRequest("remote_cluster:remote_test", "local_test")
.field("foo.keyword")
.searchAfter("foobar");
response = client().execute(TermsEnumAction.INSTANCE, req).actionGet();
assertTrue(response.isComplete());
assertThat(response.getTerms().size(), equalTo(1));
assertThat(response.getTerms().get(0), equalTo("zar"));

req = new TermsEnumRequest("remote_cluster:remote_test", "local_test")
.field("foo.keyword")
.searchAfter("bar");
response = client().execute(TermsEnumAction.INSTANCE, req).actionGet();
assertTrue(response.isComplete());
assertThat(response.getTerms().size(), equalTo(3));
assertThat(response.getTerms().get(0), equalTo("foo"));
assertThat(response.getTerms().get(1), equalTo("foobar"));
assertThat(response.getTerms().get(2), equalTo("zar"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;
Expand All @@ -20,7 +21,7 @@
import java.util.Set;

/**
* Internal terms enum request executed directly against a specific node, querying potentially many
* Internal terms enum request executed directly against a specific node, querying potentially many
* shards in one request
*/
public class NodeTermsEnumRequest extends TransportRequest implements IndicesRequest {
Expand All @@ -36,12 +37,27 @@ public class NodeTermsEnumRequest extends TransportRequest implements IndicesReq
private final QueryBuilder indexFilter;
private Set<ShardId> shardIds;
private String nodeId;


public NodeTermsEnumRequest(final String nodeId,
final Set<ShardId> shardIds,
TermsEnumRequest request,
long taskStartTimeMillis) {
this.field = request.field();
this.string = request.string();
this.searchAfter = request.searchAfter();
this.caseInsensitive = request.caseInsensitive();
this.size = request.size();
this.timeout = request.timeout().getMillis();
this.taskStartedTimeMillis = taskStartTimeMillis;
this.indexFilter = request.indexFilter();
this.nodeId = nodeId;
this.shardIds = shardIds;
}

public NodeTermsEnumRequest(StreamInput in) throws IOException {
super(in);
field = in.readString();
string = in.readString();
string = in.readOptionalString();
searchAfter = in.readOptionalString();
caseInsensitive = in.readBoolean();
size = in.readVInt();
Expand All @@ -56,36 +72,47 @@ public NodeTermsEnumRequest(StreamInput in) throws IOException {
}
}

public NodeTermsEnumRequest(final String nodeId, final Set<ShardId> shardIds, TermsEnumRequest request) {
this.field = request.field();
this.string = request.string();
this.searchAfter = request.searchAfter();
this.caseInsensitive = request.caseInsensitive();
this.size = request.size();
this.timeout = request.timeout().getMillis();
this.taskStartedTimeMillis = request.taskStartTimeMillis;
this.indexFilter = request.indexFilter();
this.nodeId = nodeId;
this.shardIds = shardIds;
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(field);
out.writeOptionalString(string);
out.writeOptionalString(searchAfter);
out.writeBoolean(caseInsensitive);
out.writeVInt(size);
// Adjust the amount of permitted time the shard has remaining to gather terms.
long timeSpentSoFarInCoordinatingNode = System.currentTimeMillis() - taskStartedTimeMillis;
long remainingTimeForShardToUse = (timeout - timeSpentSoFarInCoordinatingNode);
// TODO - if already timed out can we shortcut the trip somehow? Throw exception if remaining time < 0?
out.writeVLong(remainingTimeForShardToUse);
out.writeVLong(taskStartedTimeMillis);
out.writeOptionalNamedWriteable(indexFilter);
out.writeString(nodeId);
out.writeVInt(shardIds.size());
for (ShardId shardId : shardIds) {
shardId.writeTo(out);
}
}

public String field() {
return field;
}

@Nullable
public String string() {
return string;
}

@Nullable
public String searchAfter() {
return searchAfter;
}

public long taskStartedTimeMillis() {
return this.taskStartedTimeMillis;
}
/**

/**
* The time this request was materialized on a node
*/
long nodeStartedTimeMillis() {
Expand All @@ -94,12 +121,12 @@ long nodeStartedTimeMillis() {
nodeStartedTimeMillis = System.currentTimeMillis();
}
return this.nodeStartedTimeMillis;
}
}

public void startTimerOnDataNode() {
nodeStartedTimeMillis = System.currentTimeMillis();
}

public Set<ShardId> shardIds() {
return Collections.unmodifiableSet(shardIds);
}
Expand All @@ -119,28 +146,6 @@ public String nodeId() {
return nodeId;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(field);
out.writeString(string);
out.writeOptionalString(searchAfter);
out.writeBoolean(caseInsensitive);
out.writeVInt(size);
// Adjust the amount of permitted time the shard has remaining to gather terms.
long timeSpentSoFarInCoordinatingNode = System.currentTimeMillis() - taskStartedTimeMillis;
long remainingTimeForShardToUse = (timeout - timeSpentSoFarInCoordinatingNode);
// TODO - if already timed out can we shortcut the trip somehow? Throw exception if remaining time < 0?
out.writeVLong(remainingTimeForShardToUse);
out.writeVLong(taskStartedTimeMillis);
out.writeOptionalNamedWriteable(indexFilter);
out.writeString(nodeId);
out.writeVInt(shardIds.size());
for (ShardId shardId : shardIds) {
shardId.writeTo(out);
}
}

public QueryBuilder indexFilter() {
return indexFilter;
}
Expand All @@ -162,5 +167,4 @@ public IndicesOptions indicesOptions() {
public boolean remove(ShardId shardId) {
return shardIds.remove(shardId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ class NodeTermsEnumResponse extends TransportResponse {
this.complete = complete;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(terms);
out.writeOptionalString(error);
out.writeBoolean(complete);
out.writeString(nodeId);
}

public List<TermCount> terms() {
return this.terms;
}
Expand All @@ -52,17 +60,8 @@ public String getError() {
public String getNodeId() {
return nodeId;
}

public boolean getComplete() {
return complete;
}


@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(terms);
out.writeOptionalString(error);
out.writeBoolean(complete);
out.writeString(nodeId);
public boolean isComplete() {
return complete;
}
}

0 comments on commit 7ce2d42

Please sign in to comment.