Skip to content

Commit

Permalink
Add point in time to HLRC (#72167)
Browse files Browse the repository at this point in the history
Closes #70593
  • Loading branch information
dnhatn committed May 12, 2021
1 parent ce41fd7 commit 44fc661
Show file tree
Hide file tree
Showing 21 changed files with 489 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.ActiveShardCount;
Expand Down Expand Up @@ -401,7 +403,9 @@ static void addSearchRequestParams(Params params, SearchRequest searchRequest) {
params.withPreference(searchRequest.preference());
params.withIndicesOptions(searchRequest.indicesOptions());
params.withSearchType(searchRequest.searchType().name().toLowerCase(Locale.ROOT));
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
if (searchRequest.isCcsMinimizeRoundtrips() != SearchRequest.defaultCcsMinimizeRoundtrips(searchRequest)) {
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
}
if (searchRequest.getPreFilterShardSize() != null) {
params.putParam("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize()));
}
Expand Down Expand Up @@ -430,6 +434,23 @@ static Request clearScroll(ClearScrollRequest clearScrollRequest) throws IOExcep
return request;
}

static Request openPointInTime(OpenPointInTimeRequest openRequest) {
Request request = new Request(HttpPost.METHOD_NAME, endpoint(openRequest.indices(), "_pit"));
Params params = new Params();
params.withIndicesOptions(openRequest.indicesOptions());
params.withRouting(openRequest.routing());
params.withPreference(openRequest.preference());
params.putParam("keep_alive", openRequest.keepAlive());
request.addParameters(params.asMap());
return request;
}

static Request closePointInTime(ClosePointInTimeRequest closeRequest) throws IOException {
Request request = new Request(HttpDelete.METHOD_NAME, "/_pit");
request.setEntity(createEntity(closeRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException {
Request request = new Request(HttpPost.METHOD_NAME, "/_msearch");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.ClosePointInTimeResponse;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.OpenPointInTimeResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
Expand Down Expand Up @@ -1283,6 +1287,66 @@ public final Cancellable clearScrollAsync(ClearScrollRequest clearScrollRequest,
options, ClearScrollResponse::fromXContent, listener, emptySet());
}

/**
* Open a point in time before using it in search requests.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/point-in-time-api.html"> Point in time API </a>
* @param openRequest the open request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response containing the point in time id
*/
public final OpenPointInTimeResponse openPointInTime(OpenPointInTimeRequest openRequest,
RequestOptions options) throws IOException {
return performRequestAndParseEntity(openRequest, RequestConverters::openPointInTime,
options, OpenPointInTimeResponse::fromXContent, emptySet());
}

/**
* Asynchronously open a point in time before using it in search requests
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/point-in-time-api.html"> Point in time API </a>
* @param openRequest the open request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return a cancellable that may be used to cancel the request
*/
public final Cancellable openPointInTimeAsync(OpenPointInTimeRequest openRequest,
RequestOptions options,
ActionListener<OpenPointInTimeResponse> listener) {
return performRequestAsyncAndParseEntity(openRequest, RequestConverters::openPointInTime,
options, OpenPointInTimeResponse::fromXContent, listener, emptySet());
}

/**
* Close a point in time that is opened with {@link #openPointInTime(OpenPointInTimeRequest, RequestOptions)} or
* {@link #openPointInTimeAsync(OpenPointInTimeRequest, RequestOptions, ActionListener)}.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/point-in-time-api.html#close-point-in-time-api">
* Close point in time API</a>
* @param closeRequest the close request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final ClosePointInTimeResponse closePointInTime(ClosePointInTimeRequest closeRequest,
RequestOptions options) throws IOException {
return performRequestAndParseEntity(closeRequest, RequestConverters::closePointInTime, options,
ClosePointInTimeResponse::fromXContent, emptySet());
}

/**
* Asynchronously close a point in time that is opened with {@link #openPointInTime(OpenPointInTimeRequest, RequestOptions)} or
* {@link #openPointInTimeAsync(OpenPointInTimeRequest, RequestOptions, ActionListener)}.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/point-in-time-api.html#close-point-in-time-api">
* Close point in time API</a>
* @param closeRequest the close request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return a cancellable that may be used to cancel the request
*/
public final Cancellable closePointInTimeAsync(ClosePointInTimeRequest closeRequest,
RequestOptions options,
ActionListener<ClosePointInTimeResponse> listener) {
return performRequestAsyncAndParseEntity(closeRequest, RequestConverters::closePointInTime,
options, ClosePointInTimeResponse::fromXContent, listener, emptySet());
}

/**
* Executes a request using the Search Template API.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-template.html">Search Template API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.SearchType;
Expand Down Expand Up @@ -83,6 +85,7 @@
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
Expand Down Expand Up @@ -1028,7 +1031,12 @@ public void testSearch() throws Exception {
String[] indices = randomIndicesNames(0, 5);
Map<String, String> expectedParams = new HashMap<>();
SearchRequest searchRequest = createTestSearchRequest(indices, expectedParams);

if (searchRequest.source() != null && randomBoolean()) {
PointInTimeBuilder pit = new PointInTimeBuilder(randomAlphaOfLength(100));
if (randomBoolean()) {
pit.setKeepAlive(TimeValue.timeValueMinutes(between(1, 10)));
}
}
Request request = RequestConverters.search(searchRequest, searchEndpoint);
StringJoiner endpoint = new StringJoiner("/", "/", "");
String index = String.join(",", indices);
Expand Down Expand Up @@ -1291,7 +1299,7 @@ public void testSearchTemplate() throws Exception {

assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals(endpoint.toString(), request.getEndpoint());
assertEquals(expectedParams, request.getParameters());
assertThat(request.getParameters(), equalTo(expectedParams));
assertToXContentBody(searchTemplateRequest, request.getEntity());
}

Expand Down Expand Up @@ -1410,6 +1418,55 @@ public void testExplain() throws IOException {
assertToXContentBody(explainRequest, request.getEntity());
}

public void testPointInTime() throws Exception {
// Open point in time
{
Map<String, String> expectedParams = new HashMap<>();
String[] indices = randomIndicesNames(1, 5);
OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest(indices);
String keepAlive = randomFrom("1ms", "2m", "1d");
openRequest.keepAlive(TimeValue.parseTimeValue(keepAlive, "keep_alive"));
expectedParams.put("keep_alive", keepAlive);
if (randomBoolean()) {
String routing = randomAlphaOfLengthBetween(1, 10);
openRequest.routing(routing);
expectedParams.put("routing", routing);
}
if (randomBoolean()) {
String preference = randomAlphaOfLengthBetween(1, 10);
openRequest.preference(preference);
expectedParams.put("preference", preference);
}
openRequest.indicesOptions(setRandomIndicesOptions(openRequest.indicesOptions(), expectedParams));
final Request request = RequestConverters.openPointInTime(openRequest);
assertThat(request.getParameters(), equalTo(expectedParams));
assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME));
final String expectedEndpoint = "/" + String.join(",", indices) + "/_pit";
assertThat(request.getEndpoint(), equalTo(expectedEndpoint));
}
// Search with point in time
{
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(new SearchSourceBuilder());
String pitID = randomAlphaOfLength(10);
final PointInTimeBuilder pointInTimeBuilder = new PointInTimeBuilder(pitID);
if (randomBoolean()) {
pointInTimeBuilder.setKeepAlive(randomFrom(TimeValue.timeValueSeconds(1), TimeValue.timeValueMillis(10)));
}
searchRequest.source().pointInTimeBuilder(pointInTimeBuilder);
final Request request = RequestConverters.search(searchRequest, "/_search");
assertToXContentBody(searchRequest.source(), request.getEntity());
}
// close PIT
{
String id = randomAlphaOfLengthBetween(3, 10);
Request request = RequestConverters.closePointInTime(new ClosePointInTimeRequest(id));
assertThat(request.getMethod(), equalTo(HttpDelete.METHOD_NAME));
assertThat(request.getEndpoint(), equalTo("/_pit"));
assertThat(EntityUtils.toString(request.getEntity()), equalTo("{\"id\":" + "\"" + id + "\"}"));
}
}

public void testTermVectors() throws IOException {
String index = randomAlphaOfLengthBetween(3, 10);
String id = randomAlphaOfLengthBetween(3, 10);
Expand Down Expand Up @@ -1915,9 +1972,12 @@ private static void setRandomSearchParams(SearchRequest searchRequest,
expectedParams.put("scroll", searchRequest.scroll().keepAlive().getStringRep());
}
if (randomBoolean()) {
searchRequest.setCcsMinimizeRoundtrips(randomBoolean());
boolean ccsMinimizeRoundtrips = randomBoolean();
searchRequest.setCcsMinimizeRoundtrips(ccsMinimizeRoundtrips);
if (ccsMinimizeRoundtrips == false) {
expectedParams.put("ccs_minimize_roundtrips", "false");
}
}
expectedParams.put("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
if (randomBoolean()) {
searchRequest.setMaxConcurrentShardRequests(randomIntBetween(1, Integer.MAX_VALUE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.explain.ExplainRequest;
import org.elasticsearch.action.explain.ExplainResponse;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.ClosePointInTimeResponse;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
Expand Down Expand Up @@ -66,6 +71,7 @@
import org.elasticsearch.search.aggregations.metrics.WeightedAvgAggregationBuilder;
import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
Expand Down Expand Up @@ -1370,6 +1376,43 @@ public void testSearchWithBasicLicensedQuery() throws IOException {
assertSecondHit(searchResponse, hasId("1"));
}

public void testPointInTime() throws Exception {
int numDocs = between(50, 100);
for (int i = 0; i < numDocs; i++) {
IndexRequest indexRequest = new IndexRequest("test-index").id(Integer.toString(i)).source("field", i);
highLevelClient().index(indexRequest, RequestOptions.DEFAULT);
}
highLevelClient().indices().refresh(new RefreshRequest("test-index"), RequestOptions.DEFAULT);

OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest("test-index").keepAlive(TimeValue.timeValueMinutes(between(1, 5)));
String pitID = execute(openRequest, highLevelClient()::openPointInTime, highLevelClient()::openPointInTimeAsync).getPointInTimeId();
try {
int totalHits = 0;
SearchResponse searchResponse = null;
do {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(between(5, 10)).sort("field");
PointInTimeBuilder pointInTimeBuilder = new PointInTimeBuilder(pitID);
if (randomBoolean()) {
pointInTimeBuilder.setKeepAlive(TimeValue.timeValueMinutes(between(1, 5)));
}
searchSourceBuilder.pointInTimeBuilder(pointInTimeBuilder);
if (searchResponse != null) {
SearchHit last = searchResponse.getHits().getHits()[searchResponse.getHits().getHits().length - 1];
searchSourceBuilder.searchAfter(last.getSortValues());
}
SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder);
searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync);
assertThat(searchResponse.pointInTimeId(), equalTo(pitID));
totalHits += searchResponse.getHits().getHits().length;
} while (searchResponse.getHits().getHits().length > 0);
assertThat(totalHits, equalTo(numDocs));
} finally {
ClosePointInTimeResponse closeResponse = execute(new ClosePointInTimeRequest(pitID),
highLevelClient()::closePointInTime, highLevelClient()::closePointInTimeAsync);
assertTrue(closeResponse.isSucceeded());
}
}

private static void assertCountHeader(CountResponse countResponse) {
assertEquals(0, countResponse.getSkippedShards());
assertEquals(0, countResponse.getFailedShards());
Expand Down

0 comments on commit 44fc661

Please sign in to comment.